ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
combining_strategy.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/schedulers/current_thread.hpp>
19#include <rpp/utils/utils.hpp>
20
21#include <memory>
22
23namespace rpp::operators::details
24{
25 template<rpp::constraint::observer Observer>
26 class combining_disposable : public composite_disposable
27 {
28 public:
29 explicit combining_disposable(Observer&& observer, size_t on_completed_needed)
30 : m_observer_with_mutex{std::move(observer)}
31 , m_on_completed_needed{on_completed_needed}
32 {
33 }
34
35 rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() { return m_observer_with_mutex; }
36
37 bool decrement_on_completed()
38 {
39 // just need atomicity, not guarding anything
40 return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1;
41 }
42
43 private:
44 rpp::utils::value_with_mutex<Observer> m_observer_with_mutex{};
45
46 std::atomic_size_t m_on_completed_needed;
47 };
48
49 template<typename TDisposable>
51 {
52 // `Auto` due to we have to dispose disposables during on_completed anyway
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
54
55 std::shared_ptr<TDisposable> disposable{};
56
57 void set_upstream(const rpp::disposable_wrapper& d) const
58 {
59 disposable->add(d);
60 }
61
62 bool is_disposed() const
63 {
64 return disposable->is_disposed();
65 }
66
67 void on_error(const std::exception_ptr& err) const
68 {
69 disposable->get_observer_under_lock()->on_error(err);
70 }
71
72 void on_completed() const
73 {
74 if (disposable->decrement_on_completed())
75 disposable->get_observer_under_lock()->on_completed();
76 }
77 };
78
79 template<template<typename...> typename TDisposable, template<auto, typename...> typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables>
81 {
82 RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<TObservables...> observables;
83 RPP_NO_UNIQUE_ADDRESS TSelector selector;
84
85 template<rpp::constraint::decayed_type T>
87 {
88 static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>, "Selector is not callable with passed T type");
89
90 using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
91
92 constexpr static bool own_current_queue = true;
93 };
94
95 template<rpp::details::observables::constraint::disposables_strategy Prev>
96 using updated_optimal_disposables_strategy = ::rpp::details::observables::fixed_disposables_strategy<1>;
97
98 template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
99 auto lift(Observer&& observer) const
100 {
101 return observables.apply(&subscribe_impl<Type, Observer>, std::forward<Observer>(observer), selector);
102 }
103
104 private:
105 template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
106 static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables)
107 {
108 using Disposable = TDisposable<Observer, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>;
109
110 const auto disposable = disposable_wrapper_impl<Disposable>::make(std::forward<Observer>(observer), selector);
111 auto locked = disposable.lock();
112 locked->get_observer_under_lock()->set_upstream(disposable.as_weak());
113
114 subscribe<std::decay_t<Type>>(locked, std::index_sequence_for<TObservables...>{}, observables...);
115
116 return rpp::observer<Type, TStrategy<0, std::decay_t<Observer>, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>>{std::move(locked)};
117 }
118
119 template<typename ExpectedValue, rpp::constraint::observer Observer, size_t... I>
120 static void subscribe(const std::shared_ptr<TDisposable<Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>& disposable, std::index_sequence<I...>, const TObservables&... observables)
121 {
122 (..., observables.subscribe(rpp::observer<rpp::utils::extract_observable_type_t<TObservables>, TStrategy<I + 1, Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>{disposable}));
123 }
124 };
125} // namespace rpp::operators::details
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition tuple.hpp:105
Definition utils.hpp:260
Definition fwd.hpp:80
Definition fwd.hpp:250
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
Definition disposables_strategy.hpp:29
Definition combining_strategy.hpp:51
Definition combining_strategy.hpp:81