Skip to content

Commit 0fad684

Browse files
authored
Merge pull request #9 from David-Haim/v.0.0.6
v.0.0.6
2 parents 1dc07c0 + 3bf27dc commit 0fad684

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1780
-3229
lines changed

README.md

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# concurrencpp, the C++ concurrency library
32

43
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
@@ -247,6 +246,8 @@ This executor is good for long running tasks, like objects that run a work loop,
247246

248247
* **manual executor** - an executor that does not execute coroutines by itself. Application code can execute previously enqueued tasks by manually invoking its execution methods.
249248

249+
* **derivable executor** - a base class for user defined executors. Although inheriting directly from `concurrencpp::executor` is possible, `derivable_executor` uses the `CRTP` pattern that provides some optimization opportunities for the compiler.
250+
250251
* **inline executor** - mainly used to override the behavior of other executors. Enqueuing a task is equivalent to invoking it inline.
251252

252253
#### Using executors
@@ -358,6 +359,7 @@ class result{
358359
In either way, after resuming, if the result is a valid value, it is returned.
359360
Otherwise, operator co_await rethrows the asynchronous exception.
360361
Throws concurrencpp::errors::empty_result if *this is empty.
362+
Throws std::invalid_argument if executor is null.
361363
If this result is ready and force_rescheduling=true, throws any exception that executor::enqueue may throw.
362364
*/
363365
auto await_via(
@@ -380,6 +382,7 @@ class result{
380382
if force_rescheduling = false, then the current coroutine resumes immediately in the calling thread of execution.
381383
In either way, after resuming, *this is returned in a non-empty form and guaranteed that its status is not result_status::idle.
382384
Throws concurrencpp::errors::empty_result if *this is empty.
385+
Throws std::invalid_argument if executor is null.
383386
If this result is ready and force_rescheduling=true, throws any exception that executor::enqueue may throw.
384387
*/
385388
auto resolve_via(
@@ -574,7 +577,8 @@ result<void> make_ready_result();
574577

575578
/*
576579
Creates a ready result object from an exception pointer.
577-
The result object will re-throw exception_ptr when calling get, await or await_via
580+
The result object will re-throw exception_ptr when calling get, await or await_via.
581+
Throws std::invalid_argument if exception_ptr is null.
578582
*/
579583
template<class type>
580584
result<type> make_exceptional_result(std::exception_ptr exception_ptr);
@@ -642,8 +646,7 @@ when_any(iterator_type begin, iterator_type end);
642646
### Timers and Timer queues
643647
644648
concurrencpp also provides timers and timer queues.
645-
Timers are objects that schedule actions to run on an executor within a well-defined interval of time. There are three types of timers - *regular timers*, *onshot-timers* and *delay objects*.
646-
A timer queue is a concurrencpp worker that manages a collection of timers and processes them in just one thread of execution. In order to create timers, one must use the timer queue in conjunction with an executor.
649+
Timers are objects that define actions which run on an executor within a well-defined interval of time. There are three types of timers - *regular timers*, *onshot-timers* and *delay objects*.
647650
648651
Timers have four properties that describe them:
649652
@@ -652,16 +655,38 @@ Timers have four properties that describe them:
652655
1. Due time - from the time of creation, the interval in milliseconds in which the timer will be scheduled to run for the first time
653656
1. Frequency - from the time the timer callable was scheduled for the first time, the interval in milliseconds the callable will be schedule to run periodically, until the timer is destructed or cancelled.
654657
658+
A timer queue is a concurrencpp worker that manages a collection of timers and processes them in just one thread of execution.
659+
When a timer deadline (whether its due-time or frequency) has reached, the timer queue "fires" the timer by scheduling its callable to run on the timer given executor.
660+
Just like executors, timer queues also adhere to the RAII concpet. When the runtime object gets out of scope, It shuts down the timer queue, cancelling all pending timers.
661+
After a timer queue has been shut down, any subsequent call to `make_timer`, `make_onshot_timer` and `make_delay_object` will throw an `errors::timer_queue_shutdown` exceptions.
662+
Applications must not try to shut down timer queues by themselves.
663+
655664
#### `timer_queue` API:
656665
```cpp
657666
class timer_queue {
658667
/*
659668
Destroyes *this and cancels all associated timers.
660669
*/
661670
~timer_queue() noexcept;
671+
672+
/*
673+
Shuts down this timer_queue.
674+
After this call, invocation of any method besides shutdown
675+
and shutdown_requested will throw an errors::timer_queue_shutdown.
676+
If shutdown had been called before, this method has no effect.
677+
*/
678+
void shutdown() noexcept;
679+
680+
/*
681+
Returns true if shutdown had been called before, false otherwise.
682+
*/
683+
bool shutdown_requested() const noexcept;
684+
662685
663686
/*
664-
Creates a new running timer where *this is the associated timer_queue
687+
Creates a new running timer where *this is the associated timer_queue.
688+
Throws std::invalid_argument if executor is null.
689+
Throws errors::timer_queue_shutdown if shutdown had been called before.
665690
*/
666691
template<class callable_type>
667692
timer make_timer(
@@ -671,7 +696,9 @@ class timer_queue {
671696
callable_type&& callable);
672697
673698
/*
674-
Creates a new running timer where *this is associated timer_queue
699+
Creates a new running timer where *this is associated timer_queue.
700+
Throws std::invalid_argument if executor is null.
701+
Throws errors::timer_queue_shutdown if shutdown had been called before.
675702
*/
676703
template<class callable_type, class ... argumet_types>
677704
timer make_timer(
@@ -682,7 +709,9 @@ class timer_queue {
682709
argumet_types&& ... arguments);
683710
684711
/*
685-
Creates a new one shot timer where *this is associated timer_queue
712+
Creates a new one shot timer where *this is associated timer_queue.
713+
Throws std::invalid_argument if executor is null.
714+
Throws errors::timer_queue_shutdown if shutdown had been called before.
686715
*/
687716
template<class callable_type>
688717
timer make_one_shot_timer(
@@ -691,7 +720,9 @@ class timer_queue {
691720
callable_type&& callable);
692721
693722
/*
694-
Creates a new one shot timer where *this is associated timer_queue
723+
Creates a new one shot timer where *this is associated timer_queue.
724+
Throws std::invalid_argument if executor is null.
725+
Throws errors::timer_queue_shutdown if shutdown had been called before.
695726
*/
696727
template<class callable_type, class ... argumet_types>
697728
timer make_one_shot_timer(
@@ -701,7 +732,9 @@ class timer_queue {
701732
argumet_types&& ... arguments);
702733
703734
/*
704-
Creates a new delay object where *this is associated timer_queue
735+
Creates a new delay object where *this is associated timer_queue.
736+
Throws std::invalid_argument if executor is null.
737+
Throws errors::timer_queue_shutdown if shutdown had been called before.
705738
*/
706739
result<void> make_delay_object(size_t due_time, std::shared_ptr<concurrencpp::executor> executor);
707740
};
@@ -937,7 +970,7 @@ class runtime {
937970

938971
#### Creating user-defined executors
939972

940-
As mentioned before, Applications can create their own custom executor type by implementing the `executor` interface. There are a few points to consider when implementing user defined executors:
973+
As mentioned before, Applications can create their own custom executor type by inheriting the `derivable_executor` class. There are a few points to consider when implementing user defined executors:
941974
The most important thing is to remember that executors are used from multiple threads, so implemented methods must be thread-safe.
942975
Another important thing is to handle shutdown correctly: `shutdown`, `shutdown_requested` and `enqueue` should all monitor the executor state and behave accordingly when invoked:
943976
* `shutdown` should tell underlying threads to quit and then join them. `shutdown` must also destroy each unexecuted `coroutine_handle` by calling `coroutine_handle::destroy`.
@@ -956,7 +989,7 @@ Another important thing is to handle shutdown correctly: `shutdown`, `shutdown_r
956989
#include <mutex>
957990
#include <condition_variable>
958991

959-
class logging_executor : public concurrencpp::executor {
992+
class logging_executor : public concurrencpp::derivable_executor<logging_executor> {
960993

961994
private:
962995
mutable std::mutex _lock;
@@ -990,7 +1023,7 @@ private:
9901023

9911024
public:
9921025
logging_executor(std::string_view prefix) :
993-
executor("logging_executor"),
1026+
derivable_executor<logging_executor>("logging_executor"),
9941027
_shutdown_requested(false),
9951028
_prefix(prefix) {
9961029
_thread = std::thread([this] {

concurrencpp/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ project(concurrencpp)
33

44
add_library(
55
concurrencpp
6-
src/executors/executor.cpp
6+
77
src/runtime/runtime.cpp
88
src/threads/thread.cpp
9+
src/results/promises.cpp
910
src/results/result_core.cpp
1011
src/executors/executor.cpp
12+
src/executors/executor.cpp
1113
src/executors/manual_executor.cpp
1214
src/executors/thread_executor.cpp
1315
src/executors/thread_pool_executor.cpp

concurrencpp/src/errors.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <stdexcept>
66

77
namespace concurrencpp::errors {
8-
98
struct empty_object : public std::runtime_error {
109
empty_object(const std::string& message) : runtime_error(message) {}
1110
};
@@ -37,6 +36,10 @@ namespace concurrencpp::errors {
3736
struct executor_shutdown : public std::runtime_error {
3837
executor_shutdown(const std::string& message) : runtime_error(message) {}
3938
};
39+
40+
struct timer_queue_shutdown : public std::runtime_error {
41+
timer_queue_shutdown(const std::string& message) : runtime_error(message) {}
42+
};
4043
}
4144

4245
#endif //ERRORS_H
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#ifndef CONCURRENCPP_DERIVABLE_EXECUTOR_H
2+
#define CONCURRENCPP_DERIVABLE_EXECUTOR_H
3+
4+
#include "executor.h"
5+
6+
namespace concurrencpp {
7+
template<class concrete_executor_type>
8+
class derivable_executor : public executor {
9+
10+
private:
11+
concrete_executor_type* self() noexcept {
12+
return static_cast<concrete_executor_type*>(this);
13+
}
14+
15+
public:
16+
derivable_executor(std::string_view name) : executor(name) {}
17+
18+
template<class callable_type, class ... argument_types>
19+
void post(callable_type&& callable, argument_types&& ... arguments) {
20+
return do_post(self(), std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
21+
}
22+
23+
template<class callable_type, class ... argument_types>
24+
auto submit(callable_type&& callable, argument_types&& ... arguments) {
25+
return do_submit(self(), std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
26+
}
27+
28+
template<class callable_type>
29+
void bulk_post(std::span<callable_type> callable_list) {
30+
return do_bulk_post(self(), callable_list);
31+
}
32+
33+
template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
34+
std::vector<concurrencpp::result<return_type>> bulk_submit(std::span<callable_type> callable_list) {
35+
return do_bulk_submit(self(), callable_list);
36+
}
37+
};
38+
}
39+
40+
#endif

concurrencpp/src/executors/executor.h

Lines changed: 57 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ namespace concurrencpp {
1616
class executor {
1717

1818
private:
19-
template<class callable_type, class ... argument_types>
19+
template<class executor_type, class callable_type, class ... argument_types>
2020
static null_result post_bridge(
2121
executor_tag,
22-
executor*,
22+
executor_type*,
2323
callable_type callable,
2424
argument_types... arguments) {
2525
callable(arguments...);
@@ -35,10 +35,10 @@ namespace concurrencpp {
3535
co_return;
3636
}
3737

38-
template<class return_type, class callable_type, class ... argument_types>
38+
template<class return_type, class executor_type, class callable_type, class ... argument_types>
3939
static result<return_type> submit_bridge(
4040
executor_tag,
41-
executor*,
41+
executor_type*,
4242
callable_type callable,
4343
argument_types... arguments) {
4444
co_return callable(arguments...);
@@ -52,51 +52,37 @@ namespace concurrencpp {
5252
co_return callable();
5353
}
5454

55-
public:
56-
executor(std::string_view name) : name(name) {}
57-
58-
virtual ~executor() noexcept = default;
59-
60-
const std::string name;
61-
62-
virtual void enqueue(std::experimental::coroutine_handle<> task) = 0;
63-
virtual void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) = 0;
64-
65-
virtual int max_concurrency_level() const noexcept = 0;
66-
67-
virtual bool shutdown_requested() const noexcept = 0;
68-
virtual void shutdown() noexcept = 0;
69-
70-
template<class callable_type, class ... argument_types>
71-
void post(callable_type&& callable, argument_types&& ... arguments) {
55+
protected:
56+
template<class executor_type, class callable_type, class ... argument_types>
57+
static void do_post(executor_type* executor_ptr, callable_type&& callable, argument_types&& ... arguments) {
7258
static_assert(
7359
std::is_invocable_v<callable_type, argument_types...>,
74-
"concurrencpp::executor::post - <<callable_type>> is not invocable with <<argument_types...>>");
60+
"concurrencpp::executor::post - <<callable_type>> is not invokable with <<argument_types...>>");
7561

7662
post_bridge(
77-
executor_tag{},
78-
this,
63+
{},
64+
executor_ptr,
7965
std::forward<callable_type>(callable),
8066
std::forward<argument_types>(arguments)...);
8167
}
8268

83-
template<class callable_type, class ... argument_types>
84-
auto submit(callable_type&& callable, argument_types&& ... arguments) {
69+
template<class executor_type, class callable_type, class ... argument_types>
70+
static auto do_submit(executor_type* executor_ptr, callable_type&& callable, argument_types&& ... arguments) {
8571
static_assert(
8672
std::is_invocable_v<callable_type, argument_types...>,
87-
"concurrencpp::executor::submit - <<callable_type>> is not invocable with <<argument_types...>>");
73+
"concurrencpp::executor::submit - <<callable_type>> is not invokable with <<argument_types...>>");
8874

8975
using return_type = typename std::invoke_result_t<callable_type, argument_types...>;
9076

9177
return submit_bridge<return_type>(
92-
executor_tag{},
93-
this,
78+
{},
79+
executor_ptr,
9480
std::forward<callable_type>(callable),
9581
std::forward<argument_types>(arguments)...);
9682
}
9783

98-
template<class callable_type>
99-
void bulk_post(std::span<callable_type> callable_list) {
84+
template<class executor_type, class callable_type>
85+
static void do_bulk_post(executor_type* executor_ptr, std::span<callable_type> callable_list) {
10086
std::vector<std::experimental::coroutine_handle<>> accumulator;
10187
accumulator.reserve(callable_list.size());
10288

@@ -105,11 +91,12 @@ namespace concurrencpp {
10591
}
10692

10793
assert(!accumulator.empty());
108-
enqueue(accumulator);
94+
executor_ptr->enqueue(accumulator);
10995
}
11096

111-
template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
112-
std::vector<concurrencpp::result<return_type>> bulk_submit(std::span<callable_type> callable_list) {
97+
template<class executor_type, class callable_type, class return_type = std::invoke_result_t<callable_type>>
98+
static std::vector<concurrencpp::result<return_type>>
99+
do_bulk_submit(executor_type* executor_ptr, std::span<callable_type> callable_list) {
113100
std::vector<std::experimental::coroutine_handle<>> accumulator;
114101
accumulator.reserve(callable_list.size());
115102

@@ -121,9 +108,44 @@ namespace concurrencpp {
121108
}
122109

123110
assert(!accumulator.empty());
124-
enqueue(accumulator);
111+
executor_ptr->enqueue(accumulator);
125112
return results;
126113
}
114+
115+
public:
116+
executor(std::string_view name) : name(name) {}
117+
118+
virtual ~executor() noexcept = default;
119+
120+
const std::string name;
121+
122+
virtual void enqueue(std::experimental::coroutine_handle<> task) = 0;
123+
virtual void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) = 0;
124+
125+
virtual int max_concurrency_level() const noexcept = 0;
126+
127+
virtual bool shutdown_requested() const noexcept = 0;
128+
virtual void shutdown() noexcept = 0;
129+
130+
template<class callable_type, class ... argument_types>
131+
void post(callable_type&& callable, argument_types&& ... arguments) {
132+
return do_post(this, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
133+
}
134+
135+
template<class callable_type, class ... argument_types>
136+
auto submit(callable_type&& callable, argument_types&& ... arguments) {
137+
return do_submit(this, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
138+
}
139+
140+
template<class callable_type>
141+
void bulk_post(std::span<callable_type> callable_list) {
142+
return do_bulk_post(this, callable_list);
143+
}
144+
145+
template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
146+
std::vector<concurrencpp::result<return_type>> bulk_submit(std::span<callable_type> callable_list) {
147+
return do_bulk_submit(this, callable_list);
148+
}
127149
};
128150
}
129151

concurrencpp/src/executors/executor_all.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef CONCURRENCPP_EXECUTORS_ALL_H
22
#define CONCURRENCPP_EXECUTORS_ALL_H
33

4+
#include "derivable_executor.h"
45
#include "inline_executor.h"
56
#include "thread_pool_executor.h"
67
#include "thread_executor.h"

0 commit comments

Comments
 (0)