ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
chain_strategy.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/fwd.hpp>
13#include <rpp/observers/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/schedulers/current_thread.hpp>
17
18namespace rpp::details::observables
19{
20 template<typename TStrategy, typename... TStrategies>
21 class chain
22 {
23 using base = chain<TStrategies...>;
24
25 using operator_traits = typename TStrategy::template operator_traits<typename base::value_type>;
26
28
29 public:
30 using optimal_disposables_strategy = typename TStrategy::template updated_optimal_disposables_strategy<typename base::optimal_disposables_strategy>;
31 using value_type = typename operator_traits::result_type;
32
33 chain(const TStrategy& strategy, const TStrategies&... strategies)
34 : m_strategy(strategy)
35 , m_strategies(strategies...)
36 {
37 }
38
39 chain(const TStrategy& strategy, const chain<TStrategies...>& strategies)
40 : m_strategy(strategy)
41 , m_strategies(strategies)
42 {
43 }
44
45 template<rpp::constraint::observer_of_type<value_type> Observer>
46 void subscribe(Observer&& observer) const
47 {
48 [[maybe_unused]] const auto drain_on_exit = own_current_thread_if_needed();
49
51 m_strategies.subscribe(m_strategy.template lift_with_disposables_strategy<typename base::value_type, typename base::optimal_disposables_strategy>(std::forward<Observer>(observer)));
53 m_strategies.subscribe(m_strategy.template lift<typename base::value_type>(std::forward<Observer>(observer)));
54 else
55 {
57 m_strategy.subscribe(std::forward<Observer>(observer), m_strategies);
58 }
59 }
60
61 private:
62 static auto own_current_thread_if_needed()
63 {
64 if constexpr (requires { requires operator_traits::own_current_queue; })
65 return rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
66 else
67 return rpp::utils::none{};
68 }
69
70 private:
71 RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy;
72 RPP_NO_UNIQUE_ADDRESS chain<TStrategies...> m_strategies;
73 };
74
75 template<typename TStrategy>
76 class chain<TStrategy>
77 {
78 public:
79 using optimal_disposables_strategy = typename TStrategy::optimal_disposables_strategy;
80 using value_type = typename TStrategy::value_type;
81
82 chain(const TStrategy& strategy)
83 : m_strategy(strategy)
84 {
85 }
86
87 template<rpp::constraint::observer Observer>
88 void subscribe(Observer&& observer) const
89 {
90 m_strategy.subscribe(std::forward<Observer>(observer));
91 }
92
93 private:
94 RPP_NO_UNIQUE_ADDRESS TStrategy m_strategy;
95 };
96
97 template<typename New, typename Old>
99 {
100 using type = chain<New, Old>;
101 };
102
103 template<typename New, typename... Args>
104 struct make_chain<New, chain<Args...>>
105 {
106 using type = chain<New, Args...>;
107 };
108
109 template<typename New, typename Old>
110 using make_chain_t = typename make_chain<New, Old>::type;
111} // namespace rpp::details::observables
Definition chain_strategy.hpp:22
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition fwd.hpp:163
Same as rpp::constraint::operator_lift but with custom disposables logic. For example,...
Definition fwd.hpp:134
Accept downstream observer and return new upstream (of type Type) observer.
Definition fwd.hpp:121
Simple operator defining logic how to subscribe passed observer to passed observable....
Definition fwd.hpp:109
Definition chain_strategy.hpp:99
Definition utils.hpp:25