23 using base = chain<TStrategies...>;
25 using operator_traits =
typename TStrategy::template operator_traits<typename base::value_type>;
30 using optimal_disposables_strategy =
typename TStrategy::template updated_optimal_disposables_strategy<typename base::optimal_disposables_strategy>;
31 using value_type =
typename operator_traits::result_type;
33 chain(
const TStrategy& strategy,
const TStrategies&... strategies)
34 : m_strategy(strategy)
35 , m_strategies(strategies...)
39 chain(
const TStrategy& strategy,
const chain<TStrategies...>& strategies)
40 : m_strategy(strategy)
41 , m_strategies(strategies)
45 template<rpp::constra
int::observer_of_type<value_type> Observer>
46 void subscribe(Observer&&
observer)
const
48 [[maybe_unused]]
const auto drain_on_exit = own_current_thread_if_needed();
51 m_strategies.subscribe(m_strategy.template lift_with_disposables_strategy<typename base::value_type, typename base::optimal_disposables_strategy>(std::forward<Observer>(
observer)));
53 m_strategies.subscribe(m_strategy.template lift<typename base::value_type>(std::forward<Observer>(
observer)));
57 m_strategy.subscribe(std::forward<Observer>(
observer), m_strategies);
62 static auto own_current_thread_if_needed()
64 if constexpr (
requires {
requires operator_traits::own_current_queue; })
65 return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
71 RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy;
72 RPP_NO_UNIQUE_ADDRESS chain<TStrategies...> m_strategies;