12#include <rpp/observables/fwd.hpp>
13#include <rpp/observers/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/schedulers/current_thread.hpp>
18namespace rpp::details::observables
20 template<
typename TStrategy,
typename... TStrategies>
23 using base = chain<TStrategies...>;
25 using operator_traits =
typename TStrategy::template operator_traits<typename base::value_type>;
30 using optimal_disposables_strategy =
typename TStrategy::template updated_optimal_disposables_strategy<typename base::optimal_disposables_strategy>;
31 using value_type =
typename operator_traits::result_type;
33 chain(
const TStrategy& strategy,
const TStrategies&... strategies)
34 : m_strategy(strategy)
35 , m_strategies(strategies...)
39 chain(
const TStrategy& strategy,
const chain<TStrategies...>& strategies)
40 : m_strategy(strategy)
41 , m_strategies(strategies)
45 template<rpp::constra
int::observer_of_type<value_type> Observer>
46 void subscribe(Observer&& observer)
const
48 [[maybe_unused]]
const auto drain_on_exit = own_current_thread_if_needed();
51 m_strategies.subscribe(m_strategy.template lift_with_disposables_strategy<typename base::value_type, typename base::optimal_disposables_strategy>(std::forward<Observer>(observer)));
53 m_strategies.subscribe(m_strategy.template lift<typename base::value_type>(std::forward<Observer>(observer)));
57 m_strategy.subscribe(std::forward<Observer>(observer), m_strategies);
62 static auto own_current_thread_if_needed()
64 if constexpr (
requires {
requires operator_traits::own_current_queue; })
65 return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
71 RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy;
72 RPP_NO_UNIQUE_ADDRESS chain<TStrategies...> m_strategies;
75 template<
typename TStrategy>
79 using optimal_disposables_strategy =
typename TStrategy::optimal_disposables_strategy;
80 using value_type =
typename TStrategy::value_type;
82 chain(
const TStrategy& strategy)
83 : m_strategy(strategy)
87 template<rpp::constra
int::observer Observer>
88 void subscribe(Observer&&
observer)
const
90 m_strategy.subscribe(std::forward<Observer>(
observer));
94 RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy;
97 template<
typename New,
typename Old>
103 template<
typename New,
typename... Args>
109 template<
typename New,
typename Old>
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Same as rpp::constraint::operator_lift but with custom disposables logic. For example,...
Definition fwd.hpp:134
Accept downstream observer and return new upstream (of type Type) observer.
Definition fwd.hpp:121
Simple operator defining logic how to subscribe passed observer to passed observable....
Definition fwd.hpp:109
Definition chain_strategy.hpp:99