ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
merge.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/immediate_scheduler.hpp>
14#include <rpp/operators/lift.hpp> // required due to operator uses lift
15#include <rpp/operators/details/early_unsubscribe.hpp> // early_unsubscribe
16#include <rpp/operators/details/serialized_subscriber.hpp> // make_serialized_subscriber
17#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
18#include <rpp/operators/fwd/merge.hpp> // own forwarding
19#include <rpp/sources/just.hpp> // just
20#include <rpp/subscribers/constraints.hpp> // constraint::subscriber
21#include <rpp/utils/functors.hpp> // forwarding_on_next
22
23#include <array>
24#include <atomic>
25#include <memory>
26
27IMPLEMENTATION_FILE(merge_tag);
28
29namespace rpp::details
30{
32{
33 using early_unsubscribe_state::early_unsubscribe_state;
34
35 std::atomic_size_t count_of_on_completed_needed{};
36};
37
38using merge_forwarding_on_next = utils::forwarding_on_next;
40
42{
43 void operator()(const constraint::subscriber auto& sub,
44 const std::shared_ptr<merge_state>& state) const
45 {
46 if (state->count_of_on_completed_needed.fetch_sub(1, std::memory_order::acq_rel) == 1)
47 sub.on_completed();
48 }
49};
50
52{
53 template<constraint::observable TObs>
54 void operator()(const TObs& new_observable,
55 const constraint::subscriber auto& sub,
56 const std::shared_ptr<merge_state>& state) const
57 {
58 using ValueType = utils::extract_observable_type_t<TObs>;
59
60 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
61
62 new_observable.subscribe(create_subscriber_with_state<ValueType>(state->children_subscriptions.make_child(),
63 merge_forwarding_on_next{},
66 sub,
67 state));
68 }
69};
70
72{
73 using merge_state::merge_state;
74
75 std::mutex mutex{};
76};
77
78template<constraint::decayed_type Type>
80{
81 using ValueType = utils::extract_observable_type_t<Type>;
82
83 template<constraint::subscriber_of_type<ValueType> TSub>
84 auto operator()(TSub&& in_subscriber) const
85 {
86 auto state = std::make_shared<merge_state_with_serialized_mutex>(in_subscriber.get_subscription());
87 // change subscriber to serialized to avoid manual using of mutex
88 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<std::mutex>{state, &state->mutex});
89
90 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
91
92 auto subscription = state->children_subscriptions.make_child();
93 return create_subscriber_with_state<Type>(std::move(subscription),
97 std::move(subscriber),
98 std::move(state));
99 }
100};
101
102template<constraint::decayed_type Type, constraint::observable_of_type<Type> ... TObservables>
103auto merge_with_impl(TObservables&&... observables)
104{
105 return source::just(rpp::schedulers::immediate{}, std::forward<TObservables>(observables).as_dynamic()...).merge();
106}
107} // namespace rpp::details
immediately calls provided schedulable or waits for time_point (in the caller-thread)
Definition: immediate_scheduler.hpp:28
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
Definition: merge.hpp:80
Definition: merge.hpp:42
Definition: merge.hpp:52
Definition: merge.hpp:32