Skip to content

Commit 59e070a

Browse files
alexmalyshevlehecka
authored andcommitted
Update Observables to match Flowables, make it actually work (#407)
It didn't really compile in the first place... Copy some Flowable tests over to use Observable.
1 parent 6bf1f6f commit 59e070a

File tree

4 files changed

+239
-44
lines changed

4 files changed

+239
-44
lines changed

experimental/yarpl/include/yarpl/Flowables.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class Flowables {
9191
}
9292

9393
template <typename T>
94-
static Reference<Flowable<T>> error(const std::exception_ptr ex) {
94+
static Reference<Flowable<T>> error(std::exception_ptr ex) {
9595
auto lambda = [ex](Subscriber<T>& subscriber, int64_t) {
9696
subscriber.onError(ex);
9797
return std::make_tuple(static_cast<int64_t>(0), true);

experimental/yarpl/include/yarpl/Observables.h

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,57 +10,36 @@ namespace observable {
1010
class Observables {
1111
public:
1212
static Reference<Observable<int64_t>> range(int64_t start, int64_t end) {
13-
auto lambda = [ start, end, i = start ](
14-
Subscriber<int64_t> & subscriber, int64_t requested) mutable {
15-
int64_t emitted = 0;
16-
bool done = false;
17-
18-
while (i < end && emitted < requested) {
19-
subscriber.onNext(i++);
20-
++emitted;
13+
auto lambda = [ start, end ](Reference<Observer<int64_t>> observer) {
14+
for (int64_t i = start; i < end; ++i) {
15+
observer->onNext(i);
2116
}
22-
23-
if (i >= end) {
24-
subscriber.onComplete();
25-
done = true;
26-
}
27-
28-
return std::make_tuple(requested, done);
17+
observer->onComplete();
2918
};
3019

3120
return Observable<int64_t>::create(std::move(lambda));
3221
}
3322

3423
template <typename T>
3524
static Reference<Observable<T>> just(const T& value) {
36-
auto lambda = [value](Subscriber<T>& subscriber, int64_t) {
25+
auto lambda = [value](Reference<Observer<T>> observer) {
3726
// # requested should be > 0. Ignoring the actual parameter.
38-
subscriber.onNext(value);
39-
subscriber.onComplete();
40-
return std::make_tuple(static_cast<int64_t>(1), true);
27+
observer->onNext(value);
28+
observer->onComplete();
4129
};
4230

4331
return Observable<T>::create(std::move(lambda));
4432
}
4533

4634
template <typename T>
47-
static Reference<Observable<T>> just(std::initializer_list<T> list) {
48-
auto lambda = [ list, it = list.begin() ](
49-
Subscriber<T> & subscriber, int64_t requested) mutable {
50-
int64_t emitted = 0;
51-
bool done = false;
52-
53-
while (it != list.end() && emitted < requested) {
54-
subscriber.onNext(*it++);
55-
++emitted;
56-
}
35+
static Reference<Observable<T>> justN(std::initializer_list<T> list) {
36+
std::vector<T> vec(list);
5737

58-
if (it == list.end()) {
59-
subscriber.onComplete();
60-
done = true;
38+
auto lambda = [v = std::move(vec)](Reference<Observer<T>> observer) {
39+
for (auto const& elem : v) {
40+
observer->onNext(elem);
6141
}
62-
63-
return std::make_tuple(static_cast<int64_t>(emitted), done);
42+
observer->onComplete();
6443
};
6544

6645
return Observable<T>::create(std::move(lambda));
@@ -70,14 +49,37 @@ class Observables {
7049
typename T,
7150
typename OnSubscribe,
7251
typename = typename std::enable_if<std::is_callable<
73-
OnSubscribe(Reference<Subscriber<T>>),
52+
OnSubscribe(Reference<Observer<T>>),
7453
void>::value>::type>
75-
7654
static Reference<Observable<T>> create(OnSubscribe&& function) {
7755
return Reference<Observable<T>>(new FromPublisherOperator<T, OnSubscribe>(
7856
std::forward<OnSubscribe>(function)));
7957
}
8058

59+
template <typename T>
60+
static Reference<Observable<T>> empty() {
61+
auto lambda = [](Reference<Observer<T>> observer) {
62+
observer->onComplete();
63+
};
64+
return Observable<T>::create(std::move(lambda));
65+
}
66+
67+
template <typename T>
68+
static Reference<Observable<T>> error(std::exception_ptr ex) {
69+
auto lambda = [ex](Reference<Observer<T>> observer) {
70+
observer->onError(ex);
71+
};
72+
return Observable<T>::create(std::move(lambda));
73+
}
74+
75+
template <typename T, typename ExceptionType>
76+
static Reference<Observable<T>> error(const ExceptionType& ex) {
77+
auto lambda = [ex](Reference<Observer<T>> observer) {
78+
observer->onError(std::make_exception_ptr(ex));
79+
};
80+
return Observable<T>::create(std::move(lambda));
81+
}
82+
8183
private:
8284
Observables() = delete;
8385
};

experimental/yarpl/test/FlowableTest.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ namespace yarpl {
1010
namespace flowable {
1111
namespace {
1212

13+
void unreachable() {
14+
EXPECT_TRUE(false);
15+
}
16+
1317
template <typename T>
1418
class CollectingSubscriber : public Subscriber<T> {
1519
public:
@@ -56,15 +60,15 @@ class CollectingSubscriber : public Subscriber<T> {
5660
return error_;
5761
}
5862

59-
std::string errorMsg() const {
63+
const std::string& errorMsg() const {
6064
return errorMsg_;
6165
}
6266

6367
private:
6468
std::vector<T> values_;
69+
std::string errorMsg_;
6570
bool complete_{false};
6671
bool error_{false};
67-
std::string errorMsg_;
6872
};
6973

7074
/// Construct a pipeline with a collecting subscriber against the supplied
@@ -194,10 +198,6 @@ TEST(FlowableTest, FlowableEmpty) {
194198
EXPECT_EQ(collector->error(), false);
195199
}
196200

197-
void unreachable() {
198-
EXPECT_TRUE(false);
199-
}
200-
201201
TEST(FlowableTest, SubscribersComplete) {
202202
EXPECT_EQ(0u, Refcounted::objects());
203203

experimental/yarpl/test/Observable_test.cpp

Lines changed: 194 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,90 @@
33
#include <folly/Baton.h>
44
#include <gtest/gtest.h>
55
#include <atomic>
6-
#include "Tuple.h"
6+
77
#include "yarpl/Observable.h"
8+
#include "yarpl/Observables.h"
89
#include "yarpl/ThreadScheduler.h"
910
#include "yarpl/flowable/Subscriber.h"
1011
#include "yarpl/flowable/Subscribers.h"
1112
#include "yarpl/observable/Observers.h"
1213
#include "yarpl/observable/Subscriptions.h"
1314

15+
#include "Tuple.h"
16+
1417
// TODO can we eliminate need to import both of these?
1518
using namespace yarpl;
1619
using namespace yarpl::observable;
1720

21+
namespace {
22+
23+
void unreachable() {
24+
EXPECT_TRUE(false);
25+
}
26+
27+
template <typename T>
28+
class CollectingObserver : public Observer<T> {
29+
public:
30+
static_assert(
31+
std::is_copy_constructible<T>::value,
32+
"CollectingSubscriber needs to copy the value in order to collect it");
33+
34+
void onNext(T next) override {
35+
Observer<T>::onNext(next);
36+
values_.push_back(std::move(next));
37+
}
38+
39+
void onComplete() override {
40+
Observer<T>::onComplete();
41+
complete_ = true;
42+
}
43+
44+
void onError(std::exception_ptr ex) override {
45+
Observer<T>::onError(ex);
46+
error_ = true;
47+
48+
try {
49+
std::rethrow_exception(ex);
50+
} catch (const std::exception& e) {
51+
errorMsg_ = e.what();
52+
}
53+
}
54+
55+
const std::vector<T>& values() const {
56+
return values_;
57+
}
58+
59+
bool complete() const {
60+
return complete_;
61+
}
62+
63+
bool error() const {
64+
return error_;
65+
}
66+
67+
const std::string& errorMsg() const {
68+
return errorMsg_;
69+
}
70+
71+
private:
72+
std::vector<T> values_;
73+
std::string errorMsg_;
74+
bool complete_{false};
75+
bool error_{false};
76+
};
77+
78+
/// Construct a pipeline with a collecting observer against the supplied
79+
/// observable. Return the items that were sent to the observer. If some
80+
/// exception was sent, the exception is thrown.
81+
template <typename T>
82+
std::vector<T> run(Reference<Observable<T>> observable) {
83+
auto collector = make_ref<CollectingObserver<T>>();
84+
observable->subscribe(collector);
85+
return collector->values();
86+
}
87+
88+
} // namespace
89+
1890
TEST(Observable, SingleOnNext) {
1991
{
2092
ASSERT_EQ(std::size_t{0}, Refcounted::objects());
@@ -262,3 +334,124 @@ TEST(Observable, toFlowableWithCancel) {
262334
}
263335
ASSERT_EQ(std::size_t{0}, Refcounted::objects());
264336
}
337+
338+
TEST(Observable, Just) {
339+
ASSERT_EQ(0u, Refcounted::objects());
340+
341+
EXPECT_EQ(run(Observables::just(22)), std::vector<int>{22});
342+
EXPECT_EQ(
343+
run(Observables::justN({12, 34, 56, 98})),
344+
std::vector<int>({12, 34, 56, 98}));
345+
EXPECT_EQ(
346+
run(Observables::justN({"ab", "pq", "yz"})),
347+
std::vector<const char*>({"ab", "pq", "yz"}));
348+
349+
EXPECT_EQ(0u, Refcounted::objects());
350+
}
351+
352+
TEST(Observable, Range) {
353+
ASSERT_EQ(0u, Refcounted::objects());
354+
355+
auto observable = Observables::range(10, 14);
356+
EXPECT_EQ(run(std::move(observable)), std::vector<int64_t>({10, 11, 12, 13}));
357+
358+
EXPECT_EQ(0u, Refcounted::objects());
359+
}
360+
361+
TEST(Observable, RangeWithMap) {
362+
ASSERT_EQ(0u, Refcounted::objects());
363+
364+
auto observable = Observables::range(1, 4)
365+
->map([](int64_t v) { return v * v; })
366+
->map([](int64_t v) { return v * v; })
367+
->map([](int64_t v) { return std::to_string(v); });
368+
EXPECT_EQ(
369+
run(std::move(observable)), std::vector<std::string>({"1", "16", "81"}));
370+
371+
EXPECT_EQ(0u, Refcounted::objects());
372+
}
373+
374+
// TODO: Hits ASAN errors.
375+
TEST(Observable, DISABLED_SimpleTake) {
376+
ASSERT_EQ(0u, Refcounted::objects());
377+
378+
EXPECT_EQ(
379+
run(Observables::range(0, 100)->take(3)),
380+
std::vector<int64_t>({0, 1, 2}));
381+
382+
EXPECT_EQ(0u, Refcounted::objects());
383+
}
384+
385+
TEST(Observable, Error) {
386+
auto observable = Observables::error<int>(std::runtime_error("something broke!"));
387+
auto collector = make_ref<CollectingObserver<int>>();
388+
observable->subscribe(collector);
389+
390+
EXPECT_EQ(collector->complete(), false);
391+
EXPECT_EQ(collector->error(), true);
392+
EXPECT_EQ(collector->errorMsg(), "something broke!");
393+
}
394+
395+
TEST(Observable, ErrorPtr) {
396+
auto observable = Observables::error<int>(
397+
std::make_exception_ptr(std::runtime_error("something broke!")));
398+
auto collector = make_ref<CollectingObserver<int>>();
399+
observable->subscribe(collector);
400+
401+
EXPECT_EQ(collector->complete(), false);
402+
EXPECT_EQ(collector->error(), true);
403+
EXPECT_EQ(collector->errorMsg(), "something broke!");
404+
}
405+
406+
TEST(Observable, Empty) {
407+
auto observable = Observables::empty<int>();
408+
auto collector = make_ref<CollectingObserver<int>>();
409+
observable->subscribe(collector);
410+
411+
EXPECT_EQ(collector->complete(), true);
412+
EXPECT_EQ(collector->error(), false);
413+
}
414+
415+
TEST(Observable, ObserversComplete) {
416+
EXPECT_EQ(0u, Refcounted::objects());
417+
418+
auto observable = Observables::empty<int>();
419+
EXPECT_EQ(1u, Refcounted::objects());
420+
421+
bool completed = false;
422+
423+
auto observer = Observers::create<int>(
424+
[](int) { unreachable(); },
425+
[](std::exception_ptr) { unreachable(); },
426+
[&] { completed = true; }
427+
);
428+
429+
observable->subscribe(std::move(observer));
430+
observable.reset();
431+
432+
EXPECT_EQ(0u, Refcounted::objects());
433+
434+
EXPECT_TRUE(completed);
435+
}
436+
437+
TEST(Observable, ObserversError) {
438+
EXPECT_EQ(0u, Refcounted::objects());
439+
440+
auto observable = Observables::error<int>(std::runtime_error("Whoops"));
441+
EXPECT_EQ(1u, Refcounted::objects());
442+
443+
bool errored = false;
444+
445+
auto observer = Observers::create<int>(
446+
[](int) { unreachable(); },
447+
[&](std::exception_ptr) { errored = true; },
448+
[] { unreachable(); }
449+
);
450+
451+
observable->subscribe(std::move(observer));
452+
observable.reset();
453+
454+
EXPECT_EQ(0u, Refcounted::objects());
455+
456+
EXPECT_TRUE(errored);
457+
}

0 commit comments

Comments
 (0)