13#include <rpp/defs.hpp>
14#include <rpp/observables/fwd.hpp>
15#include <rpp/observables/interface_observable.hpp>
16#include <rpp/schedulers/trampoline_scheduler.hpp>
17#include <rpp/subscribers/dynamic_subscriber.hpp>
18#include <rpp/utils/operator_declaration.hpp>
19#include <rpp/utils/utilities.hpp>
37template<constra
int::decayed_type Type, constra
int::on_subscribe_fn<Type> OnSubscribeFn>
42 : m_on_subscribe{std::move(on_subscribe) } {}
45 : m_on_subscribe{on_subscribe } {}
54 template <
typename...Args>
56 template <
typename...Args>
59 friend struct details::member_overload<Type, specific_observable<Type, OnSubscribeFn>, details::subscribe_tag>;
64 template<constraint::subscriber_of_type<Type> TSub>
65 composite_subscription subscribe_impl(const TSub& subscriber) const
67 if (subscriber.is_subscribed())
68 actual_subscribe(subscriber);
70 return subscriber.get_subscription();
73 template<constra
int::subscriber_of_type<Type> TSub>
74 void actual_subscribe(
const TSub& subscriber)
const
81 const auto drain_on_exit_if_needed = schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
84 m_on_subscribe(subscriber);
88 if (subscriber.is_subscribed())
89 subscriber.on_error(std::current_exception());
99 RPP_NO_UNIQUE_ADDRESS OnSubscribeFn m_on_subscribe;
102template<
typename OnSub>
103specific_observable(OnSub on_subscribe) -> specific_observable<utils::extract_subscriber_type_t<utils::function_argument_t<OnSub>>, OnSub>;
Type-less observable (or partially untyped) that has the notion of Type but hides the notion of on_su...
Definition: dynamic_observable.hpp:89
Type-full observable (or typed) that has the notion of Type and upstream observables for C++ compiler...
Definition: specific_observable.hpp:39
auto as_dynamic() const &
Converts rpp::specific_observable to rpp::dynamic_observable via type-erasure mechanism.
Definition: specific_observable.hpp:55
Base part of observable. Mostly used to provide some interface functions used by all observables.
Definition: interface_observable.hpp:74