Skip to content

Commit 580f430

Browse files
authored
Version 0.1.4 (#64)
1 parent 93890e3 commit 580f430

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2077
-451
lines changed

.github/workflows/ci.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ jobs:
2424
strategy:
2525
matrix:
2626
conf:
27-
- name: Ubuntu (Clang 11 - TSAN)
27+
- name: Ubuntu (Clang 12 - TSAN)
2828
os: ubuntu-20.04
29-
cc: clang-11
30-
cxx: clang++-11
29+
cc: clang-12
30+
cxx: clang++-12
3131
tsan: YES
3232

33-
- name: Ubuntu (Clang 11 - no TSAN)
33+
- name: Ubuntu (Clang 12 - no TSAN)
3434
os: ubuntu-20.04
35-
cc: clang-11
36-
cxx: clang++-11
35+
cc: clang-12
36+
cxx: clang++-12
3737
tsan: NO
3838

3939
- name: macOS (Clang 11 - no TSAN)
@@ -79,10 +79,11 @@ jobs:
7979
"${{ steps.tools.outputs.ninja }}"
8080
${{ steps.cores.outputs.plus_one }}]==])
8181
82-
- name: Install clang 11
82+
- name: Install clang 12
83+
working-directory: ${{ env.HOME }}
8384
run: |
8485
sudo apt-get update
85-
sudo apt-get install clang-11 libc++-11-dev libc++abi-11-dev
86+
sudo apt-get install clang-12 libc++-12-dev libc++abi-12-dev
8687
if: ${{ startsWith(matrix.conf.os, 'ubuntu') }}
8788

8889
- name: Build examples

CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cmake_minimum_required(VERSION 3.16)
22

33
project(concurrencpp
4-
VERSION 0.1.3
4+
VERSION 0.1.4
55
LANGUAGES CXX)
66

77
include(cmake/coroutineOptions.cmake)
@@ -55,6 +55,7 @@ set(concurrencpp_headers
5555
include/concurrencpp/results/impl/result_state.h
5656
include/concurrencpp/results/impl/shared_result_state.h
5757
include/concurrencpp/results/impl/lazy_result_state.h
58+
include/concurrencpp/results/impl/generator_state.h
5859
include/concurrencpp/results/constants.h
5960
include/concurrencpp/results/make_result.h
6061
include/concurrencpp/results/promises.h
@@ -67,10 +68,12 @@ set(concurrencpp_headers
6768
include/concurrencpp/results/result_fwd_declarations.h
6869
include/concurrencpp/results/when_result.h
6970
include/concurrencpp/results/resume_on.h
71+
include/concurrencpp/results/generator.h
7072
include/concurrencpp/runtime/constants.h
7173
include/concurrencpp/runtime/runtime.h
7274
include/concurrencpp/threads/binary_semaphore.h
7375
include/concurrencpp/threads/thread.h
76+
include/concurrencpp/threads/cache_line.h
7477
include/concurrencpp/timers/constants.h
7578
include/concurrencpp/timers/timer.h
7679
include/concurrencpp/timers/timer_queue.h

README.md

Lines changed: 459 additions & 170 deletions
Large diffs are not rendered by default.

example/13_generator/CMakeLists.txt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
cmake_minimum_required(VERSION 3.16)
2+
3+
project(13_generator LANGUAGES CXX)
4+
5+
include(FetchContent)
6+
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
7+
FetchContent_MakeAvailable(concurrencpp)
8+
9+
include(../../cmake/coroutineOptions.cmake)
10+
11+
add_executable(13_generator source/main.cpp)
12+
13+
target_compile_features(13_generator PRIVATE cxx_std_20)
14+
15+
target_link_libraries(13_generator PRIVATE concurrencpp::concurrencpp)
16+
17+
target_coroutine_options(13_generator)

example/13_generator/source/main.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#include <fstream>
2+
#include <iostream>
3+
#include <filesystem>
4+
5+
#include "concurrencpp/concurrencpp.h"
6+
7+
concurrencpp::generator<std::string_view> read_lines(std::string_view text) {
8+
std::string_view::size_type pos = 0;
9+
std::string_view::size_type prev = 0;
10+
11+
while ((pos = text.find('\n', prev)) != std::string::npos) {
12+
co_yield text.substr(prev, pos - prev);
13+
prev = pos + 1;
14+
}
15+
16+
co_yield text.substr(prev);
17+
}
18+
19+
concurrencpp::result<void> read_file_lines(const std::filesystem::path& path,
20+
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor,
21+
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor) {
22+
// make sure we don't block in a thread that is used for cpu-processing
23+
co_await concurrencpp::resume_on(background_executor);
24+
25+
std::ifstream stream(path.c_str(), std::ios::binary | std::ios::in);
26+
std::string file_content(std::istreambuf_iterator<char>(stream), {});
27+
28+
// make sure we don't process cpu-bound tasks on the background executor
29+
co_await concurrencpp::resume_on(thread_pool_executor);
30+
31+
for (const auto& line : read_lines(file_content)) {
32+
std::cout << "read a new line. size: " << line.size() << std::endl;
33+
std::cout << "line: " << std::endl;
34+
std::cout << line;
35+
std::cout << "\n==============" << std::endl;
36+
}
37+
}
38+
39+
int main(const int argc, const char* argv[]) {
40+
if (argc < 2) {
41+
const auto help_msg = "please pass all necessary arguments\n argv[1] - the file to be read\n";
42+
std::cerr << help_msg << std::endl;
43+
return -1;
44+
}
45+
46+
const auto file_path = std::string(argv[1]);
47+
48+
concurrencpp::runtime runtime;
49+
const auto thread_pool_executor = runtime.thread_pool_executor();
50+
const auto background_executor = runtime.background_executor();
51+
52+
read_file_lines(file_path, thread_pool_executor, background_executor).get();
53+
return 0;
54+
}

example/5_prime_number_finder/source/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ concurrencpp::result<std::vector<int>> find_prime_numbers(std::shared_ptr<concur
6060
found_primes_in_range.emplace_back(std::move(result));
6161
}
6262

63-
auto all_done = co_await concurrencpp::when_all(found_primes_in_range.begin(), found_primes_in_range.end());
63+
auto all_done = co_await concurrencpp::when_all(executor, found_primes_in_range.begin(), found_primes_in_range.end());
6464

6565
std::vector<int> found_primes;
6666
for (auto& done_result : all_done) {

example/7_when_all/source/main.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ int example_job(int task_num, int dummy_value, int sleeping_time_ms) {
1919
return dummy_value;
2020
}
2121

22-
concurrencpp::result<void> consume_all_tasks(std::vector<concurrencpp::result<int>> results) {
23-
auto all_done = co_await concurrencpp::when_all(results.begin(), results.end());
22+
concurrencpp::result<void> consume_all_tasks(std::shared_ptr<concurrencpp::thread_pool_executor> resume_executor,
23+
std::vector<concurrencpp::result<int>> results) {
24+
auto all_done = co_await concurrencpp::when_all(resume_executor, results.begin(), results.end());
2425

2526
for (auto& done_result : all_done) {
2627
std::cout << co_await done_result << std::endl;
@@ -32,6 +33,7 @@ concurrencpp::result<void> consume_all_tasks(std::vector<concurrencpp::result<in
3233
int main(int argc, const char* argv[]) {
3334
concurrencpp::runtime runtime;
3435
auto background_executor = runtime.background_executor();
36+
auto thread_pool_executor = runtime.thread_pool_executor();
3537
std::vector<concurrencpp::result<int>> results;
3638

3739
std::srand(static_cast<unsigned>(std::time(nullptr)));
@@ -41,6 +43,6 @@ int main(int argc, const char* argv[]) {
4143
results.emplace_back(background_executor->submit(example_job, i, i * 15, sleeping_time_ms));
4244
}
4345

44-
consume_all_tasks(std::move(results)).get();
46+
consume_all_tasks(thread_pool_executor, std::move(results)).get();
4547
return 0;
4648
}

example/8_when_any/source/main.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ int example_job(int task_num, int dummy_value, int sleeping_time_ms) {
1919
return dummy_value;
2020
}
2121

22-
concurrencpp::result<void> consume_tasks_as_they_finish(std::vector<concurrencpp::result<int>> results) {
22+
concurrencpp::result<void> consume_tasks_as_they_finish(std::shared_ptr<concurrencpp::thread_pool_executor> resume_executor,
23+
std::vector<concurrencpp::result<int>> results) {
2324
while (!results.empty()) {
24-
auto when_any = co_await concurrencpp::when_any(results.begin(), results.end());
25+
auto when_any = co_await concurrencpp::when_any(resume_executor, results.begin(), results.end());
2526
auto finished_task = std::move(when_any.results[when_any.index]);
2627

2728
const auto done_value = co_await finished_task;
@@ -39,6 +40,7 @@ concurrencpp::result<void> consume_tasks_as_they_finish(std::vector<concurrencpp
3940
int main(int argc, const char* argv[]) {
4041
concurrencpp::runtime runtime;
4142
auto background_executor = runtime.background_executor();
43+
auto thread_pool_executor = runtime.thread_pool_executor();
4244
std::vector<concurrencpp::result<int>> results;
4345

4446
std::srand(static_cast<unsigned>(std::time(nullptr)));
@@ -48,6 +50,6 @@ int main(int argc, const char* argv[]) {
4850
results.emplace_back(background_executor->submit(example_job, i, i * 15, sleeping_time_ms));
4951
}
5052

51-
consume_tasks_as_they_finish(std::move(results)).get();
53+
consume_tasks_as_they_finish(thread_pool_executor, std::move(results)).get();
5254
return 0;
5355
}

example/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ foreach(example IN ITEMS
1515
10_regular_timer
1616
11_oneshot_timer
1717
12_delay_object
18+
13_generator
1819
)
1920
add_subdirectory("${CMAKE_CURRENT_SOURCE_DIR}/${example}"
2021
"${CMAKE_CURRENT_BINARY_DIR}/${example}")

include/concurrencpp/concurrencpp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "concurrencpp/results/shared_result_awaitable.h"
1616
#include "concurrencpp/results/promises.h"
1717
#include "concurrencpp/results/resume_on.h"
18+
#include "concurrencpp/results/generator.h"
1819
#include "concurrencpp/executors/executor_all.h"
1920

2021
#endif

include/concurrencpp/errors.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ namespace concurrencpp::errors {
2424
using empty_object::empty_object;
2525
};
2626

27+
struct empty_generator : public empty_object {
28+
using empty_object::empty_object;
29+
};
30+
2731
struct broken_task : public std::runtime_error {
2832
using runtime_error::runtime_error;
2933
};

include/concurrencpp/executors/executor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ namespace concurrencpp {
9898

9999
virtual int max_concurrency_level() const noexcept = 0;
100100

101-
virtual bool shutdown_requested() const noexcept = 0;
102-
virtual void shutdown() noexcept = 0;
101+
virtual bool shutdown_requested() const = 0;
102+
virtual void shutdown() = 0;
103103

104104
template<class callable_type, class... argument_types>
105105
void post(callable_type&& callable, argument_types&&... arguments) {

include/concurrencpp/executors/inline_executor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ namespace concurrencpp {
3535
return details::consts::k_inline_executor_max_concurrency_level;
3636
}
3737

38-
void shutdown() noexcept override {
38+
void shutdown() override {
3939
m_abort.store(true, std::memory_order_relaxed);
4040
}
4141

42-
bool shutdown_requested() const noexcept override {
42+
bool shutdown_requested() const override {
4343
return m_abort.load(std::memory_order_relaxed);
4444
}
4545
};

include/concurrencpp/executors/manual_executor.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
#ifndef CONCURRENCPP_MANUAL_EXECUTOR_H
22
#define CONCURRENCPP_MANUAL_EXECUTOR_H
33

4+
#include "concurrencpp/threads/cache_line.h"
45
#include "concurrencpp/executors/derivable_executor.h"
56

67
#include <deque>
78
#include <chrono>
89

910
namespace concurrencpp {
10-
class alignas(64) manual_executor final : public derivable_executor<manual_executor> {
11+
class alignas(CRCPP_CACHE_LINE_ALIGNMENT) manual_executor final : public derivable_executor<manual_executor> {
1112

1213
private:
1314
mutable std::mutex m_lock;
@@ -18,13 +19,13 @@ namespace concurrencpp {
1819

1920
template<class clock_type, class duration_type>
2021
static std::chrono::system_clock::time_point to_system_time_point(
21-
std::chrono::time_point<clock_type, duration_type> time_point) {
22+
std::chrono::time_point<clock_type, duration_type> time_point) noexcept(noexcept(clock_type::now())) {
2223
const auto src_now = clock_type::now();
2324
const auto dst_now = std::chrono::system_clock::now();
2425
return dst_now + std::chrono::duration_cast<std::chrono::milliseconds>(time_point - src_now);
2526
}
2627

27-
static std::chrono::system_clock::time_point time_point_from_now(std::chrono::milliseconds ms) {
28+
static std::chrono::system_clock::time_point time_point_from_now(std::chrono::milliseconds ms) noexcept {
2829
return std::chrono::system_clock::now() + ms;
2930
}
3031

@@ -42,11 +43,11 @@ namespace concurrencpp {
4243

4344
int max_concurrency_level() const noexcept override;
4445

45-
void shutdown() noexcept override;
46-
bool shutdown_requested() const noexcept override;
46+
void shutdown() override;
47+
bool shutdown_requested() const override;
4748

48-
size_t size() const noexcept;
49-
bool empty() const noexcept;
49+
size_t size() const;
50+
bool empty() const;
5051

5152
size_t clear();
5253

include/concurrencpp/executors/thread_executor.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define CONCURRENCPP_THREAD_EXECUTOR_H
33

44
#include "concurrencpp/threads/thread.h"
5+
#include "concurrencpp/threads/cache_line.h"
56
#include "concurrencpp/executors/derivable_executor.h"
67

78
#include <list>
@@ -10,7 +11,7 @@
1011
#include <condition_variable>
1112

1213
namespace concurrencpp {
13-
class alignas(64) thread_executor final : public derivable_executor<thread_executor> {
14+
class alignas(CRCPP_CACHE_LINE_ALIGNMENT) thread_executor final : public derivable_executor<thread_executor> {
1415

1516
private:
1617
std::mutex m_lock;
@@ -32,8 +33,8 @@ namespace concurrencpp {
3233

3334
int max_concurrency_level() const noexcept override;
3435

35-
bool shutdown_requested() const noexcept override;
36-
void shutdown() noexcept override;
36+
bool shutdown_requested() const override;
37+
void shutdown() override;
3738
};
3839
} // namespace concurrencpp
3940

0 commit comments

Comments
 (0)