ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
take_until.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// TC Wang 2022 - present.
5// Distributed under the Boost Software License, Version 1.0.
6// (See accompanying file LICENSE_1_0.txt or copy at
7// https://www.boost.org/LICENSE_1_0.txt)
8//
9// Project home: https://github.com/victimsnino/ReactivePlusPlus
10//
11
12#pragma once
13
14#include <rpp/operators/lift.hpp> // required due to operator uses lift
15#include <rpp/operators/merge.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
17#include <rpp/operators/fwd/take_until.hpp>
18#include <rpp/subscribers/constraints.hpp>
19#include <rpp/utils/functors.hpp>
20#include <rpp/utils/spinlock.hpp>
21
22IMPLEMENTATION_FILE(take_until_tag);
23
24namespace rpp::details
25{
26using take_until_state = early_unsubscribe_state;
27
28using take_until_on_next = merge_forwarding_on_next;
29using take_until_on_error = merge_on_error;
30using take_until_on_completed = early_unsubscribe_on_completed;
31
32
37{
38 void operator()(auto&&, const auto& subscriber, const std::shared_ptr<take_until_state>& state) const
39 {
40 // Unsubscribe all sources due to we obtained "stop event"
41 state->children_subscriptions.unsubscribe();
42 subscriber.on_completed();
43 }
44};
45
48
50{
51 using take_until_state::take_until_state;
52
53 // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (main observable), but we have small probability to get error from "until observable" immediately
54 utils::spinlock spinlock{};
55};
59template<constraint::decayed_type Type, constraint::observable TTriggerObservable>
61{
62 using TriggerType = utils::extract_observable_type_t<TTriggerObservable>;
63
64 TTriggerObservable m_until_observable;
65
66 template<constraint::subscriber_of_type<Type> TSub>
67 auto operator()(TSub&& in_subscriber) const
68 {
69 auto state = std::make_shared<take_until_state_with_serialized_spinlock>(in_subscriber.get_subscription());
70 // change subscriber to serialized to avoid manual using of mutex
71 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
72
73 // Subscribe to trigger observable
74 m_until_observable.subscribe(create_subscriber_with_state<TriggerType>(state->children_subscriptions.make_child(),
78 subscriber,
79 state));
80
81 auto subscription = state->children_subscriptions.make_child();
82 return create_subscriber_with_state<Type>(std::move(subscription),
83 take_until_on_next{},
86 std::move(subscriber),
87 std::move(state));
88 }
89};
90
91} // namespace rpp::details
Definition: early_unsubscribe.hpp:39
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
"combine_latest" operator (an OperatorFn used by "lift").
Definition: take_until.hpp:61
Definition: take_until.hpp:37