13#include <rpp/defs.hpp>
14#include <rpp/operators/lift.hpp>
15#include <rpp/operators/details/subscriber_with_state.hpp>
16#include <rpp/operators/fwd/repeat.hpp>
17#include <rpp/sources/create.hpp>
18#include <rpp/subscribers/constraints.hpp>
19#include <rpp/utils/functors.hpp>
21IMPLEMENTATION_FILE(repeat_tag);
25template<constra
int::decayed_type Type,
typename SpecificObservable,
typename Predicate>
30 state_t(
const SpecificObservable& observable, Predicate&& predicate)
31 : observable{observable}
32 , predicate{std::move(predicate)} {}
34 RPP_NO_UNIQUE_ADDRESS SpecificObservable observable;
35 RPP_NO_UNIQUE_ADDRESS Predicate predicate;
40 : m_state{std::make_shared<state_t>(observable, std::move(predicate))} {}
43 void operator()(
const auto& sub)
const
45 if (sub.is_subscribed())
47 if (m_state->predicate())
48 subscribe_subscriber_for_repeat(sub);
57 m_state->observable.subscribe(create_subscriber_with_state<Type>(subscriber.get_subscription().make_child(),
58 utils::forwarding_on_next{},
59 utils::forwarding_on_error{},
64 std::shared_ptr<state_t> m_state;
72 bool operator()() {
return m_count && m_count--; }
77template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs,
typename CreatePredicateFn>
81 : m_observable{std::move(observable)}
82 , m_create_predicate{std::move(create_predicate)} {}
85 : m_observable{observable}
86 , m_create_predicate{std::move(create_predicate)} {}
88 template<constra
int::subscriber_of_type<Type> TSub>
89 void operator()(
const TSub& subscriber)
const
95 RPP_NO_UNIQUE_ADDRESS TObs m_observable;
96 RPP_NO_UNIQUE_ADDRESS CreatePredicateFn m_create_predicate;
99template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs,
typename CreatePredicateFn>
100auto create_repeat_on_subscribe(TObs&& observable, CreatePredicateFn&& create_predicate)
102 return source::create<Type>(
repeat_on_subscribe<Type, std::decay_t<TObs>, std::decay_t<CreatePredicateFn>>(std::forward<TObs>(observable),
103 std::forward<CreatePredicateFn>(create_predicate)));
106template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs>
107auto repeat_impl(TObs&& observable,
size_t count)
109 return create_repeat_on_subscribe<Type>(std::forward<TObs>(observable), [count] {
return counted_repeat_predicate{count}; });
112template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs>
113auto repeat_impl(TObs&& observable)
115 return create_repeat_on_subscribe<Type>(std::forward<TObs>(observable), [] {
return [] {
return true; }; });
Definition: repeat.hpp:27
Definition: constraints.hpp:19
Definition: repeat.hpp:68
Definition: repeat.hpp:79