ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
lift.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include "rpp/observables/specific_observable.hpp"
14#include "rpp/subscribers/constraints.hpp"
15#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
16#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
17#include <rpp/operators/fwd/lift.hpp> // own forwarding
18#include <rpp/sources/create.hpp> // create observable
19#include <utility>
20
21IMPLEMENTATION_FILE(lift_tag);
22
23namespace rpp::details
24{
25template<constraint::decayed_type Type, constraint::decayed_type OnNext, constraint::decayed_type OnError, constraint::decayed_type OnCompleted>
27{
28 RPP_NO_UNIQUE_ADDRESS OnNext on_next;
29 RPP_NO_UNIQUE_ADDRESS OnError on_error;
30 RPP_NO_UNIQUE_ADDRESS OnCompleted on_completed;
31
32 template<constraint::subscriber TSub>
33 auto operator()(TSub&& subscriber) const
34 {
35 auto subscription = subscriber.get_subscription();
36 return create_subscriber_with_state<Type>(std::move(subscription),
37 on_next,
38 on_error,
39 on_completed,
40 std::forward<TSub>(subscriber));
41 }
42};
43
44template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn>
45using subscriber_type_of_list_fn = utils::extract_subscriber_type_t<utils::decayed_invoke_result_t<OperatorFn, dynamic_subscriber<NewType>>>;
46
54template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ...ChildLiftArgs>
55struct lift_on_subscribe : public lift_on_subscribe<NewType, OperatorFn, lift_on_subscribe<subscriber_type_of_list_fn<NewType, OperatorFn>, ChildLiftArgs...>> {};
56
57template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename TOnSubscribe>
58struct lift_on_subscribe<NewType, OperatorFn, TOnSubscribe>
59{
60 using T = subscriber_type_of_list_fn<NewType, OperatorFn>;
61 RPP_NO_UNIQUE_ADDRESS specific_observable<T, TOnSubscribe> _this;
62 RPP_NO_UNIQUE_ADDRESS OperatorFn op;
63
64 template<constraint::subscriber_of_type<NewType> TSub>
65 void operator()(TSub&& subscriber) const
66 {
67 _this.subscribe(op(std::forward<TSub>(subscriber)));
68 }
69};
70
71template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename ...ChildLiftArgs>
72auto lift_impl_internal(OperatorFn&& op, specific_observable<ObservableValue, lift_on_subscribe<ObservableValue, ChildLiftArgs...>>&& _this)
73{
74 return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, ChildLiftArgs...>>({ std::move(_this), std::forward<OperatorFn>(op) });
75}
76
77template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename ...ChildLiftArgs>
78auto lift_impl_internal(OperatorFn&& op, const specific_observable<ObservableValue, lift_on_subscribe<ObservableValue, ChildLiftArgs...>>& _this)
79{
80 return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, ChildLiftArgs...>>({ _this, std::forward<OperatorFn>(op) });
81}
82
83template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename OnSubscribe>
84auto lift_impl_internal(OperatorFn&& op, specific_observable<ObservableValue, OnSubscribe>&& _this)
85{
86 return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, OnSubscribe>>({ std::move(_this), std::forward<OperatorFn>(op) });
87}
88
89template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename ObservableValue, typename OnSubscribe>
90auto lift_impl_internal(OperatorFn&& op, const specific_observable<ObservableValue, OnSubscribe>& _this)
91{
92 return observable::create<NewType, lift_on_subscribe<NewType, std::decay_t<OperatorFn>, OnSubscribe>>({ _this, std::forward<OperatorFn>(op) });
93}
94
95template<constraint::decayed_type NewType, lift_fn<NewType> OperatorFn, typename TObs>
96auto lift_impl(OperatorFn&& op, TObs&& _this)
97{
98 return lift_impl_internal<NewType>(std::forward<OperatorFn>(op), std::forward<TObs>(_this));
99}
100} // namespace rpp::details
Type-full observable (or typed) that has the notion of Type and upstream observables for C++ compiler...
Definition: specific_observable.hpp:39
Functor of "lift" operator for on_subscribe overload function.
Definition: lift.hpp:55