ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
subscribe_on.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/observables/observable.hpp>
17
18namespace rpp::operators::details
19{
20 template<typename TObservableChainStrategy>
22 {
23 RPP_NO_UNIQUE_ADDRESS TObservableChainStrategy observable;
24
25 using Type = typename TObservableChainStrategy::value_type;
26
27 template<rpp::constraint::observer_strategy<Type> ObserverStrategy>
28 rpp::schedulers::optional_delay_from_now operator()(observer<Type, ObserverStrategy>& observer) const
29 {
30 observable.subscribe(std::move(observer));
31 return rpp::schedulers::optional_delay_from_now{};
32 }
33 };
34
35 template<rpp::schedulers::constraint::scheduler TScheduler>
37 {
38 template<rpp::constraint::decayed_type T>
40 {
41 using result_type = T;
42 };
43
44 template<rpp::details::observables::constraint::disposables_strategy Prev>
45 using updated_optimal_disposables_strategy = Prev;
46
47 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
48
49 template<rpp::constraint::observer Observer, typename... Strategies>
50 void subscribe(Observer&& observer, const rpp::details::observables::chain<Strategies...>& observable_strategy) const
51 {
52 const auto worker = scheduler.create_worker();
53 worker.schedule(subscribe_on_schedulable<rpp::details::observables::chain<Strategies...>>{observable_strategy}, std::forward<Observer>(observer));
54 }
55 };
56} // namespace rpp::operators::details
57
58namespace rpp::operators
59{
74 template<rpp::schedulers::constraint::scheduler Scheduler>
75 auto subscribe_on(Scheduler&& scheduler)
76 {
77 return details::subscribe_on_t<std::decay_t<Scheduler>>{std::forward<Scheduler>(scheduler)};
78 }
79} // namespace rpp::operators
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition fwd.hpp:250
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:75
Definition subscribe_on.hpp:37