ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
subscribe_on.hpp
1
2// ReactivePlusPlus library
3//
4// Copyright Aleksey Loginov 2022 - present.
5// Distributed under the Boost Software License, Version 1.0.
6// (See accompanying file LICENSE_1_0.txt or copy at
7// https://www.boost.org/LICENSE_1_0.txt)
8//
9// Project home: https://github.com/victimsnino/ReactivePlusPlus
10//
11
12#pragma once
13
14#include <rpp/operators/fwd/subscribe_on.hpp>
15#include <rpp/sources/create.hpp>
16#include <rpp/subscribers/constraints.hpp>
17
18IMPLEMENTATION_FILE (subscribe_on_tag);
19
20namespace rpp::details
21{
22template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs, schedulers::constraint::scheduler TScheduler>
23auto subscribe_on_impl(TObs&& obs, const TScheduler& scheduler)
24{
25 return source::create<Type>([obs = std::forward<TObs>(obs), scheduler]<constraint::subscriber_of_type<Type> TSub>(TSub&& subscriber)
26 {
27 auto worker = scheduler.create_worker(subscriber.get_subscription());
28 worker.schedule([obs, subscriber = std::forward<TSub>(subscriber)]() mutable ->schedulers::optional_duration
29 {
30 obs.subscribe(std::move(subscriber));
31 return {};
32 });
33 });
34}
35} // namespace rpp::details