ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
blocking_observable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/constraints.hpp> // OriginalObservable type
13#include <rpp/subscribers/specific_subscriber.hpp> // create subscriber
14#include <rpp/observers/state_observer.hpp> // wrap subscribers
15
16#include <rpp/observables/details/member_overload.hpp> // overload operators
17#include <rpp/operators/fwd.hpp> // forwarding of member_overaloads
18#include <rpp/defs.hpp> // RPP_EMPTY_BASES
19
20#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
21
22#include <future>
23
24namespace rpp
25{
32template<constraint::decayed_type Type, constraint::observable_of_type<Type> OriginalObservable>
33class RPP_EMPTY_BASES blocking_observable final : public details::member_overload<Type, blocking_observable<Type, OriginalObservable>, details::subscribe_tag>
34{
35public:
36 blocking_observable(const OriginalObservable& original)
37 : m_original{original} {}
38
39 blocking_observable(OriginalObservable&& original)
40 : m_original{std::move(original)} {}
41
42 friend struct details::member_overload<Type, blocking_observable<Type, OriginalObservable>, details::subscribe_tag>;
43
44private:
45 template<constraint::subscriber_of_type<Type> TSub>
46 void subscribe_impl(TSub&& subscriber) const noexcept
47 {
48 if (!subscriber.is_subscribed())
49 return;
50
51 std::promise<bool> is_success{};
52 const auto future = is_success.get_future();
53 auto sub = subscriber.get_subscription();
54 m_original.subscribe(create_subscriber_with_state<Type>(std::move(sub),
55 utils::forwarding_on_next{},
56 [&](const std::exception_ptr& err, const auto& sub)
57 {
58 sub.on_error(err);
59 is_success.set_value(false);
60 },
61 [&](const auto& sub)
62 {
63 sub.on_completed();
64 is_success.set_value(true);
65 },
66 std::forward<TSub>(subscriber)));
67 future.wait();
68 }
69
70private:
71 OriginalObservable m_original;
72};
73
74template<constraint::observable TObs>
75blocking_observable(const TObs&)->blocking_observable<rpp::utils::extract_observable_type_t<TObs>, TObs>;
76} // namespace rpp
blocking alternative of observable: provides interface where each function do blocking subscribe on o...
Definition: blocking_observable.hpp:34
Definition: member_overload.hpp:19