ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
repeat.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
14#include <rpp/operators/lift.hpp> // required due to operator uses lift
15#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
16#include <rpp/operators/fwd/repeat.hpp> // own forwarding
17#include <rpp/sources/create.hpp> // create observable
18#include <rpp/subscribers/constraints.hpp> // constraint::subscriber
19#include <rpp/utils/functors.hpp> // forwarding_on_next
20
21IMPLEMENTATION_FILE(repeat_tag);
22
23namespace rpp::details
24{
25template<constraint::decayed_type Type, typename SpecificObservable, typename Predicate>
27{
28 struct state_t
29 {
30 state_t(const SpecificObservable& observable, Predicate&& predicate)
31 : observable{observable}
32 , predicate{std::move(predicate)} {}
33
34 RPP_NO_UNIQUE_ADDRESS SpecificObservable observable;
35 RPP_NO_UNIQUE_ADDRESS Predicate predicate;
36 };
37
38public:
39 repeat_on_completed(const SpecificObservable& observable, Predicate&& predicate)
40 : m_state{std::make_shared<state_t>(observable, std::move(predicate))} {}
41
42
43 void operator()(const auto& sub) const
44 {
45 if (sub.is_subscribed())
46 {
47 if (m_state->predicate())
48 subscribe_subscriber_for_repeat(sub);
49 else
50 sub.on_completed();
51 }
52 }
53
54private:
55 void subscribe_subscriber_for_repeat(const constraint::subscriber auto& subscriber) const
56 {
57 m_state->observable.subscribe(create_subscriber_with_state<Type>(subscriber.get_subscription().make_child(),
58 utils::forwarding_on_next{},
59 utils::forwarding_on_error{},
60 *this,
61 subscriber));
62 }
63
64 std::shared_ptr<state_t> m_state;
65};
66
68{
69 counted_repeat_predicate(size_t count)
70 : m_count{count} {}
71
72 bool operator()() { return m_count && m_count--; }
73private:
74 size_t m_count{};
75};
76
77template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs, typename CreatePredicateFn>
79{
80 repeat_on_subscribe(TObs&& observable, CreatePredicateFn&& create_predicate)
81 : m_observable{std::move(observable)}
82 , m_create_predicate{std::move(create_predicate)} {}
83
84 repeat_on_subscribe(const TObs& observable, CreatePredicateFn&& create_predicate)
85 : m_observable{observable}
86 , m_create_predicate{std::move(create_predicate)} {}
87
88 template<constraint::subscriber_of_type<Type> TSub>
89 void operator()(const TSub& subscriber) const
90 {
91 repeat_on_completed<Type, std::decay_t<TObs>, std::invoke_result_t<CreatePredicateFn>>{m_observable, m_create_predicate()}(subscriber);
92 }
93
94private:
95 RPP_NO_UNIQUE_ADDRESS TObs m_observable;
96 RPP_NO_UNIQUE_ADDRESS CreatePredicateFn m_create_predicate;
97};
98
99template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs, typename CreatePredicateFn>
100auto create_repeat_on_subscribe(TObs&& observable, CreatePredicateFn&& create_predicate)
101{
102 return source::create<Type>(repeat_on_subscribe<Type, std::decay_t<TObs>, std::decay_t<CreatePredicateFn>>(std::forward<TObs>(observable),
103 std::forward<CreatePredicateFn>(create_predicate)));
104}
105
106template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs>
107auto repeat_impl(TObs&& observable, size_t count)
108{
109 return create_repeat_on_subscribe<Type>(std::forward<TObs>(observable), [count] { return counted_repeat_predicate{count}; });
110}
111
112template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs>
113auto repeat_impl(TObs&& observable)
114{
115 return create_repeat_on_subscribe<Type>(std::forward<TObs>(observable), [] { return [] { return true; }; });
116}
117} // namespace rpp::details
Definition: repeat.hpp:27
Definition: constraints.hpp:19
Definition: repeat.hpp:79