ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
specific_observable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/defs.hpp>
14#include <rpp/observables/fwd.hpp>
15#include <rpp/observables/interface_observable.hpp> // base_class
16#include <rpp/schedulers/trampoline_scheduler.hpp>
17#include <rpp/subscribers/dynamic_subscriber.hpp>
18#include <rpp/utils/operator_declaration.hpp> // for header include
19#include <rpp/utils/utilities.hpp> // copy_assignable_callable
20
21#include <utility>
22
23namespace rpp
24{
37template<constraint::decayed_type Type, constraint::on_subscribe_fn<Type> OnSubscribeFn>
38class specific_observable : public interface_observable<Type, specific_observable<Type, OnSubscribeFn>>
39{
40public:
41 specific_observable(OnSubscribeFn&& on_subscribe)
42 : m_on_subscribe{std::move(on_subscribe) } {}
43
44 specific_observable(const OnSubscribeFn& on_subscribe)
45 : m_on_subscribe{on_subscribe } {}
46
47 specific_observable(const specific_observable& other) = default;
48 specific_observable(specific_observable&& other) noexcept = default;
49 specific_observable& operator=(const specific_observable& other) = default;
50 specific_observable& operator=(specific_observable&& other) noexcept = default;
54 template <typename...Args>
55 [[nodiscard]] auto as_dynamic() const & requires details::is_header_included<details::dynamic_observable_tag, Args...> { return rpp::dynamic_observable<Type>{*this}; }
56 template <typename...Args>
57 [[nodiscard]] auto as_dynamic() && requires details::is_header_included<details::dynamic_observable_tag, Args...> { return rpp::dynamic_observable<Type>{std::move(*this)}; }
58
59 friend struct details::member_overload<Type, specific_observable<Type, OnSubscribeFn>, details::subscribe_tag>;
60
61private:
62
63 // used by rpp::details::member_overload<Type, specific_observable<Type, OnSubscribeFn>, rpp::details::subscribe_tag>;
64 template<constraint::subscriber_of_type<Type> TSub>
65 composite_subscription subscribe_impl(const TSub& subscriber) const
66 {
67 if (subscriber.is_subscribed())
68 actual_subscribe(subscriber);
69
70 return subscriber.get_subscription();
71 }
72
73 template<constraint::subscriber_of_type<Type> TSub>
74 void actual_subscribe(const TSub& subscriber) const
75 {
76 // take ownership over current thread as early as possible to delay all next "current_thread" schedulings. For example, scheduling of emissions from "just" to delay it till whole chain is subscribed and ready to listened emissions
77 // For example, if we have
78 // rpp::source::just(rpp::schedulers::current_thread{}, 1,2).combine_latest(rpp::source::just(rpp::schedulers::current_thread{}, 1,2))
79 //
80 // then we expect to see emissions like (1,1) (2,1) (2,2) instead of (2,1) (2,2). TO do it we need to "take ownership" over queue to prevent ANY immediate schedulings from ANY next subscriptions
81 const auto drain_on_exit_if_needed = schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
82 try
83 {
84 m_on_subscribe(subscriber);
85 }
86 catch (...)
87 {
88 if (subscriber.is_subscribed())
89 subscriber.on_error(std::current_exception());
90 else
91 throw;
92 }
93 }
94
95private:
99 RPP_NO_UNIQUE_ADDRESS OnSubscribeFn m_on_subscribe;
100};
101
102template<typename OnSub>
103specific_observable(OnSub on_subscribe) -> specific_observable<utils::extract_subscriber_type_t<utils::function_argument_t<OnSub>>, OnSub>;
104} // namespace rpp
Type-less observable (or partially untyped) that has the notion of Type but hides the notion of on_su...
Definition: dynamic_observable.hpp:89
Type-full observable (or typed) that has the notion of Type and upstream observables for C++ compiler...
Definition: specific_observable.hpp:39
auto as_dynamic() const &
Converts rpp::specific_observable to rpp::dynamic_observable via type-erasure mechanism.
Definition: specific_observable.hpp:55
Base part of observable. Mostly used to provide some interface functions used by all observables.
Definition: interface_observable.hpp:74