Skip to content

Commit de342fe

Browse files
committed
UPD | event loop & priorities with subscribe_finish(...)
1 parent 506b393 commit de342fe

File tree

11 files changed

+101
-51
lines changed

11 files changed

+101
-51
lines changed

conan/tquic/all/conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def source(self):
7575
boringssl = os.path.join('src', 'deps', 'boringssl')
7676

7777
if len(os.listdir(boringssl)) == 0:
78-
# boringssl not exists! Lets fix that!
78+
# boringssl doesn't exist! Lets fix that!
7979
get(self, 'https://github.com/google/boringssl/archive/refs/tags/0.20241209.0.zip', destination=boringssl, strip_root=True)
8080

8181
def generate(self):

include/ManapiEventLoop.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ namespace manapi {
115115

116116
manapi::future<> stop ();
117117

118-
size_t subscribe_finish (std::move_only_function<manapi::future<void>()> cb);
118+
size_t subscribe_finish (int priority, std::move_only_function<manapi::future<void>()> cb);
119119

120120
void unsubscribe_finish (std::size_t id);
121121

@@ -345,7 +345,7 @@ namespace manapi {
345345
void custom_watcher_callback_async (const std::shared_ptr<ev::async> &w);
346346
private:
347347
#if MANAPIHTTP_CURL_DEPENDENCY
348-
void wait_all_ () MANAPIHTTP_NOEXCEPT;
348+
void wait_all_ (bool shutdown) MANAPIHTTP_NOEXCEPT;
349349

350350
static manapi::ev::status_or<std::shared_ptr<ev::io>> handle_curl_watcher_gen(event_loop *data, socket_t fd) MANAPIHTTP_NOEXCEPT;
351351

@@ -387,7 +387,7 @@ namespace manapi {
387387

388388
std::shared_ptr<threadpool> taskpool_;
389389

390-
std::map <size_t, std::move_only_function<manapi::future<void>()>> map_finish_cb;
390+
std::map <size_t, std::pair<int, std::move_only_function<manapi::future<void>()>>> map_finish_cb;
391391

392392
std::map <size_t, std::move_only_function<void()>> map_clean_up_cb;
393393

include/cache/ManapiLRU.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#pragma once
22

33
#include <cassert>
4-
#include <unordered_map>
4+
#include <map>
55
#include <cstddef>
66

77
#include "./../ManapiErrors.hpp"
@@ -124,6 +124,6 @@ namespace manapi {
124124

125125
manapi::chain<key_value_pair_t> m_list;
126126

127-
std::unordered_map<K, item_t> m_items;
127+
std::map<K, item_t> m_items;
128128
};
129129
}

include/ext/pq/AsyncPostgreValue.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ namespace manapi::ext::pq {
225225

226226
template<typename T>
227227
struct string_traits {
228-
static inline T from_string (std::string_view text_) { return std::string{text_}; }
228+
static inline T from_string (std::string_view text_) { return T{text_}; }
229229
static inline void to_string (std::string_view text_, T const &value) {
230230
assert(text_.size() >= value.size()); memcpy((void*)text_.data(), value.data(), value.size());
231231
}

include/ext/pq/AsyncPostgreValueTypes.hpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,17 +156,21 @@ namespace manapi::ext::pq {
156156

157157
/* string */
158158
template<> struct string_traits <const char *> {
159-
static constexpr std::string from_string (std::string_view text_) {
160-
return std::string{text_};
159+
static constexpr const char *from_string (std::string_view text_) {
160+
return text_.data();
161161
}
162162
static constexpr void to_string (std::string_view text_, const char *value) {
163163
memcpy((void*)text_.data(), value, strlen(value));
164164
}
165165
MANAPIHTTP_NODISCARD static size_t size (const char *value) {
166-
return strlen(value);
166+
return ::strlen(value);
167167
}
168168
};
169169

170+
template<> MANAPIHTTP_NODISCARD inline std::string_view from_string (std::string_view text_) {
171+
return string_traits<std::string_view>::from_string(text_);
172+
}
173+
170174
template<> MANAPIHTTP_NODISCARD inline unsigned int from_string (std::string_view text_) {
171175
return integral_traits<unsigned int>::from_string(text_);
172176
}
@@ -253,9 +257,14 @@ namespace manapi::ext::pq {
253257
return string_traits<std::string>::size(v);
254258
}
255259

260+
template<> MANAPIHTTP_NODISCARD inline size_t size_of (const std::string_view &v) {
261+
return string_traits<std::string_view>::size(v);
262+
}
263+
256264
template<> MANAPIHTTP_NODISCARD inline size_t size_of (const pq::blob &v) {
257265
return string_traits<pq::blob>::size(v);
258266
}
267+
259268
template<> MANAPIHTTP_NODISCARD inline size_t size_of (const pq::text &v) {
260269
return string_traits<pq::text>::size(v);
261270
}
@@ -340,6 +349,10 @@ namespace manapi::ext::pq {
340349
return float_traits <float>::to_string(text_, v);
341350
}
342351

352+
template<> inline void to_string (std::string_view text_, const std::string_view &v) {
353+
return string_traits <std::string_view>::to_string(text_, v);
354+
}
355+
343356
template<> inline void to_string (std::string_view text_, const pq::uuid &v) {
344357
auto uuid_ = manapi::crypto::strhex2strdec(v).unwrap();
345358
assert(text_.size() >= uuid_.size());

main.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@ int main() {
1818
ctx->run(0, [server_ctx] (auto cb) -> void {
1919
using http = manapi::net::http::server;
2020

21+
manapi::before_delete bd ([] () -> void {
22+
manapi::async::current()->etaskpool()->append_task([] () -> void {
23+
std::cout << "hello world!\n";
24+
});
25+
});
26+
27+
manapi::async::current()->eventloop()->subscribe_finish(500, [] () -> manapi::future<> {
28+
29+
std::cout << "finish it!\n";
30+
co_return;
31+
});
32+
2133
auto route = manapi::net::http::server::create(server_ctx).unwrap();
2234
auto db = manapi::ext::pq::db::create().unwrap();
2335

@@ -27,7 +39,8 @@ int main() {
2739
manapi::ext::pq::result res = manapi::unwrap(co_await db->exec(manapi::ext::pq::kSlave, "SELECT * FROM test;"));
2840
std::string content;
2941
for (auto row : res) {
30-
content += row["text"].as<std::string>() + "\n";
42+
content += row["text"].as<std::string_view>();
43+
content += " | ";
3144
}
3245

3346
resp.replacers({

src/ManapiEventLoop.cpp

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,7 @@ manapi::future<> manapi::event_loop::stop() {
843843
manapi_log_trace(manapi::debug::LOG_TRACE_MEDIUM, "eventloop:stop() has been finished");
844844
}
845845

846-
void manapi::event_loop::wait_all_() MANAPIHTTP_NOEXCEPT {
846+
void manapi::event_loop::wait_all_(bool shutdown) MANAPIHTTP_NOEXCEPT {
847847
if (!(this->flags & EVENT_LOOP_FLAG_STOPPING)) {
848848
manapi::async::run (this->stop());
849849
}
@@ -855,23 +855,22 @@ void manapi::event_loop::wait_all_() MANAPIHTTP_NOEXCEPT {
855855
#if MANAPIHTTP_CURL_DEPENDENCY
856856
this->stop_watcher(std::move(this->curl_watcher->timeout_watcher));
857857
#endif
858-
std::shared_ptr<ev::idle> idle_tasks;
859-
858+
manapi::timer checker;
860859
MANAPIHTTP_MUST_ALLOC_START
861-
manapi::async::current()->timerpool()->append_interval_sync(500, [this] (const manapi::timer &) -> void {
860+
checker = manapi::async::current()->timerpool()->append_interval_sync(500, [this] (const manapi::timer &) -> void {
862861
this->handle_curl_exec_connections();
863862
this->handle_curl_check_connections();
864-
});
863+
}).unwrap();
865864
MANAPIHTTP_MUST_ALLOC_END
866865
if (this->loop_) {
867-
while (::uv_loop_alive(this->loop()) || this->etaskpool_->tasks_size()) {
868-
auto etaskpool = dynamic_cast<ethreadpool *>(this->etaskpool_.get());
866+
auto etaskpool = dynamic_cast<ethreadpool *>(this->etaskpool_.get());
869867

868+
while (true) {
870869
MANAPIHTTP_MUST_ALLOC_START
871-
etaskpool->set_notify_cb([&] () -> void {
870+
etaskpool->set_notify_cb([etaskpool, this] () -> void {
872871
MANAPIHTTP_MUST_ALLOC_START
873872
auto idle_tasks_res = this->create_watcher_idle(
874-
[&idle_tasks, etaskpool, this] (const ev::shared_idle &w) -> void {
873+
[etaskpool, this] (const ev::shared_idle &w) -> void {
875874
while (true) {
876875
if (!etaskpool->try_task()) {
877876
break;
@@ -880,7 +879,7 @@ void manapi::event_loop::wait_all_() MANAPIHTTP_NOEXCEPT {
880879
etaskpool->set_notify();
881880

882881
auto const loop_ = this->loop();
883-
this->stop_watcher(std::move(idle_tasks));
882+
this->stop_watcher(w);
884883

885884
// try again (idle_tasks can be the last one)
886885
manapi::async::current()->timerpool()->stop();
@@ -891,11 +890,17 @@ void manapi::event_loop::wait_all_() MANAPIHTTP_NOEXCEPT {
891890
});
892891

893892
if (!idle_tasks_res) {
893+
if (!this->loop_) {
894+
manapi_log_warn("New tasks were added when event loop is deactivated");
895+
manapi::print_stacktrace();
896+
return;
897+
}
898+
894899
idle_tasks_res.err().log();
895900
throw std::bad_alloc{};
896901
}
897902

898-
idle_tasks = idle_tasks_res.unwrap();
903+
auto idle_tasks = idle_tasks_res.unwrap();
899904
if (auto rhs = idle_tasks->start()) {
900905
manapi_log_error("%s:%s failed due to %s", "event loop", "idle taskpool start",
901906
ev::strerror(rhs));
@@ -925,31 +930,36 @@ void manapi::event_loop::wait_all_() MANAPIHTTP_NOEXCEPT {
925930
}
926931
}
927932

928-
manapi_log_trace(manapi::debug::LOG_TRACE_MEDIUM, "eventloop:well done");
933+
checker.stop();
934+
etaskpool->set_notify_cb(nullptr);
935+
936+
if (shutdown) {
937+
manapi_log_trace(manapi::debug::LOG_TRACE_MEDIUM, "eventloop:well done");
929938

930-
if (auto rhs = ::uv_loop_close(this->loop_.get())) {
939+
if (auto rhs = ::uv_loop_close(this->loop_.get())) {
931940
#ifndef MANAPIHTTP_DISABLE_TRACE_HARD
932-
::uv_print_all_handles(this->loop(), stderr);
941+
::uv_print_all_handles(this->loop(), stderr);
933942
#endif
934-
manapi_log_error("%s failed due to %s", "uv_loop_close", ev::strerror(rhs));
935-
}
936-
else {
937-
this->loop_.reset();
938-
}
943+
manapi_log_error("%s failed due to %s", "uv_loop_close", ev::strerror(rhs));
944+
}
945+
else {
946+
this->loop_.reset();
947+
}
939948

940-
if (this->flags & EVENT_LOOP_FLAG_STOPPING) {
941-
this->flags ^= EVENT_LOOP_FLAG_STOPPING;
942-
}
943-
if (this->flags & EVENT_LOOP_FLAG_ACTIVE) {
944-
this->flags ^= EVENT_LOOP_FLAG_ACTIVE;
949+
if (this->flags & EVENT_LOOP_FLAG_STOPPING) {
950+
this->flags ^= EVENT_LOOP_FLAG_STOPPING;
951+
}
952+
if (this->flags & EVENT_LOOP_FLAG_ACTIVE) {
953+
this->flags ^= EVENT_LOOP_FLAG_ACTIVE;
954+
}
945955
}
946956
}
947957
}
948958

949-
size_t manapi::event_loop::subscribe_finish(std::move_only_function<manapi::future<void>()> cb) {
959+
size_t manapi::event_loop::subscribe_finish(int priority, std::move_only_function<manapi::future<void>()> cb) {
950960
auto id = *reinterpret_cast<const std::size_t *> (&cb);
951961

952-
if (!this->map_finish_cb.insert({id, std::move(cb)}).second) {
962+
if (!this->map_finish_cb.insert({id, std::make_pair(priority, std::move(cb))}).second) {
953963
THROW_MANAPIHTTP_EXCEPTION (ERR_INTERNAL, "index {} exists", id);
954964
}
955965

@@ -1080,14 +1090,27 @@ manapi::ev::status_or<std::shared_ptr<manapi::ev::udp>> manapi::event_loop::crea
10801090
}
10811091

10821092
manapi::future<> manapi::event_loop::_call_on_finish_cb() {
1093+
using item_t = std::pair<int, std::move_only_function<manapi::future<void>()>>;
1094+
std::vector<item_t> values;
1095+
values.reserve(this->map_finish_cb.size());
10831096
while (!this->map_finish_cb.empty()) {
1084-
const auto it = this->map_finish_cb.begin();
1085-
std::size_t address = it->first;
1086-
if (it->second) {
1087-
auto callback = std::move(it->second);
1088-
co_await async::invoke(std::move(callback));
1097+
const auto it = this->map_finish_cb.extract(this->map_finish_cb.begin());
1098+
values.push_back(std::move(it.mapped()));
1099+
}
1100+
1101+
std::sort(values.begin(), values.end(), +[](const item_t& a, const item_t& b)
1102+
-> bool { return a.first > b.first; });
1103+
1104+
for (auto &it : values) {
1105+
if (it.second) {
1106+
try {
1107+
auto callback = std::move(it.second);
1108+
co_await async::invoke(std::move(callback));
1109+
}
1110+
catch (std::exception const &e) {
1111+
manapi_log_ferror("finish callback failed msg=%s", e.what());
1112+
}
10891113
}
1090-
this->map_finish_cb.erase(address);
10911114
}
10921115
}
10931116

src/ManapiGrpc.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,6 +1682,7 @@ manapi::future<manapi::status> manapi::net::wgrpc::server::start(std::move_only_
16821682

16831683
try {
16841684
this->data_->finishid = manapi::async::current()->eventloop()->subscribe_finish(
1685+
std::numeric_limits<int>::min(),
16851686
[data = this->data_] () -> manapi::future<> {
16861687
co_await data->ctx.storage().unsubscribe(std::move(data->worker));
16871688
co_await server::stop_(data);

src/ManapiHttp.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ manapi::future<manapi::status> manapi::net::http::server::start() {
7272
co_return status_already_exists("already running");
7373
}
7474

75-
data2->event_id = async::current()->eventloop()->subscribe_finish([data2] ()
75+
data2->event_id = async::current()->eventloop()->subscribe_finish(std::numeric_limits<int>::min(), [data2] ()
7676
-> future<> {
7777
auto res = co_await stop_(data2, true);
7878
manapi_log_trace(manapi::debug::LOG_TRACE_HIGH, "http:Stop status=%.*s", res.msg().size(), res.msg().data());
@@ -138,7 +138,7 @@ manapi::future<manapi::status> manapi::net::http::server::stop_(std::shared_ptr<
138138
auto lk = co_await data->mx->lock_guard();
139139

140140
if (data->stopping.exchange(true)) {
141-
co_return status_not_found("not exists");
141+
co_return status_not_found("doesn't exist");
142142
}
143143

144144
if (!evloop) {

src/json/ManapiJsonMask.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,7 @@ manapi::json_error::status manapi::json_mask::recursive_valid(const manapi::json
942942
if (fit == p.end())
943943
{
944944
if (it.second["none"].as_bool()) { continue; }
945-
return json_error::status_invalid_argument("json_mask: item not exists", std::format("key = {}", it.first), 0, json_format_path(path));
945+
return json_error::status_invalid_argument("json_mask: item doesn't exist", std::format("key = {}", it.first), 0, json_format_path(path));
946946
}
947947
ev::buff_t path_part;
948948
path_part.base = (char*)it.first.data();
@@ -964,7 +964,7 @@ manapi::json_error::status manapi::json_mask::recursive_valid(const manapi::json
964964
auto fit = p.find(it.first);
965965
// dont exists
966966
if (fit == p.end())
967-
return json_error::status_invalid_argument("json_mask: item not exists", std::format("key = {}", it.first), 0, json_format_path(path));
967+
return json_error::status_invalid_argument("json_mask: item doesn't exist", std::format("key = {}", it.first), 0, json_format_path(path));
968968
ev::buff_t path_part;
969969
path_part.base = (char*)it.first.data();
970970
path_part.len = static_cast<decltype(path_part.len)>(it.first.size());

0 commit comments

Comments
 (0)