ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
blocking_observable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/fwd.hpp>
13
14#include <rpp/defs.hpp>
15#include <rpp/operators/details/strategy.hpp>
16
17#include <condition_variable>
18#include <mutex>
19
20namespace rpp::details::observables
21{
22 class blocking_disposable final : public base_disposable
23 {
24 public:
25 void wait()
26 {
27 std::unique_lock lock{m_mutex};
28 m_cv.wait(lock, [this] { return m_completed; });
29 }
30
31 void base_dispose_impl(interface_disposable::Mode) noexcept override
32 {
33 {
34 std::lock_guard lock{m_mutex};
35 m_completed = true;
36 }
37 m_cv.notify_all();
38 }
39
40 private:
41 std::mutex m_mutex{};
42 std::condition_variable m_cv{};
43 bool m_completed{};
44 };
45
46 template<rpp::constraint::decayed_type Type, rpp::constraint::observable_strategy<Type> Strategy>
47 class blocking_strategy
48 {
49 public:
50 using value_type = Type;
51 using optimal_disposables_strategy = typename Strategy::optimal_disposables_strategy::template add<1>;
52
53 blocking_strategy(observable<Type, Strategy>&& observable)
54 : m_original{std::move(observable)}
55 {
56 }
57
58 blocking_strategy(const observable<Type, Strategy>& observable)
59 : m_original{observable}
60 {
61 }
62
63 template<rpp::constraint::observer_strategy<Type> ObserverStrategy>
64 void subscribe(observer<Type, ObserverStrategy>&& obs) const
65 {
67 obs.set_upstream(d);
68 m_original.subscribe(std::move(obs));
69
70 if (!d.is_disposed())
71 if (const auto locked = d.lock())
72 locked->wait();
73 }
74
75 private:
76 RPP_NO_UNIQUE_ADDRESS observable<Type, Strategy> m_original;
77 };
78} // namespace rpp::details::observables
79
80namespace rpp
81{
90 template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
91 class blocking_observable : public observable<Type, details::observables::blocking_strategy<Type, Strategy>>
92 {
93 public:
94 using observable<Type, details::observables::blocking_strategy<Type, Strategy>>::observable;
95 };
96} // namespace rpp
Extension over rpp::observable with set of blocking operators - it waits till completion of underlyin...
Definition blocking_observable.hpp:92
Definition blocking_observable.hpp:23
Definition blocking_observable.hpp:48
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172