Skip to content

Commit 2b8a0bb

Browse files
committed
UPD | event loop
1 parent 8cf76e5 commit 2b8a0bb

File tree

9 files changed

+185
-63
lines changed

9 files changed

+185
-63
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,7 @@ set(MANAPIHTTP_HTTP_FILES
780780
src/include/ManapiEventLoopCurl.hpp
781781
include/std/ManapiRef.hpp
782782
include/std/ManapiScopePtr.hpp
783+
include/cache/ManapiTL.hpp
783784
)
784785

785786
set(MANAPIHTTP_MODULES

include/ManapiEventLoop.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ namespace manapi {
104104

105105
class event_loop : public std::enable_shared_from_this<event_loop> {
106106
friend async::context;
107+
friend timerpool;
107108
public:
108109
event_loop();
109110

@@ -335,6 +336,10 @@ namespace manapi {
335336
MANAPIHTTP_NODISCARD bool is_active () const MANAPIHTTP_NOEXCEPT;
336337

337338
void custom_event_loop (std::move_only_function<void()> block_cb) MANAPIHTTP_NOEXCEPT;
339+
340+
void increase_deps () MANAPIHTTP_NOEXCEPT;
341+
342+
void decrease_deps () MANAPIHTTP_NOEXCEPT;
338343
protected:
339344
#if MANAPIHTTP_CURL_DEPENDENCY
340345
void handle_curl_exec_connections ();
@@ -347,6 +352,8 @@ namespace manapi {
347352
#if MANAPIHTTP_CURL_DEPENDENCY
348353
void wait_all_ (bool shutdown) MANAPIHTTP_NOEXCEPT;
349354

355+
void timerpool_init_ (std::shared_ptr<manapi::timerpool> tp);
356+
350357
static manapi::ev::status_or<std::shared_ptr<ev::io>> handle_curl_watcher_gen(event_loop *data, socket_t fd) MANAPIHTTP_NOEXCEPT;
351358

352359
static socket_t handle_curl_open_socket (void *cbp, int socktype, void *addr);
@@ -406,5 +413,7 @@ namespace manapi {
406413
std::shared_ptr<manapi::logger> logger_;
407414

408415
std::move_only_function<void()> custom_event_loop_;
416+
417+
std::size_t deps_;
409418
};
410419
}

include/ManapiTimerPool.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace manapi {
3232
* timerpool based on libuv timer event
3333
* @param events event loop
3434
*/
35-
static manapi::status_or<timerpool> create (std::shared_ptr<event_loop> events) MANAPIHTTP_NOEXCEPT;
35+
static manapi::status_or<std::shared_ptr<timerpool>> create (std::shared_ptr<event_loop> events) MANAPIHTTP_NOEXCEPT;
3636

3737
/* deconstructor */
3838
~timerpool();

include/cache/ManapiTL.hpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
#include <map>
5+
#include <set>
6+
7+
#include <manapihttp/ManapiErrors.hpp>
8+
9+
namespace manapi {
10+
template<typename K, typename V>
11+
class tl_cache {
12+
typedef typename std::pair<K, V> key_value_pair_t;
13+
public:
14+
tl_cache () = default;
15+
16+
MANAPIHTTP_NODISCARD std::size_t size () const MANAPIHTTP_NOEXCEPT {
17+
return this->m_data.size();
18+
}
19+
20+
MANAPIHTTP_NODISCARD bool contains (const K &key) const MANAPIHTTP_NOEXCEPT {
21+
return this->m_data.contains(key);
22+
}
23+
24+
void put (const K &key, const V &value, std::chrono::milliseconds duration) {
25+
this->cleanup();
26+
auto id = std::make_pair(std::chrono::steady_clock::now() + duration, value);
27+
auto it = this->m_data.insert({key, id});
28+
if (!it.second) {
29+
this->m_sorted.erase(std::make_pair(it.first->second.first, it.first->first));
30+
it.first->second = id;
31+
}
32+
try {
33+
this->m_sorted.insert(std::make_pair(id.first, it.first->first));
34+
}
35+
catch (...) {
36+
this->m_data.erase(it.first);
37+
std::rethrow_exception(std::current_exception());
38+
}
39+
}
40+
41+
MANAPIHTTP_NODISCARD manapi::status_or<V*> get (const K &key) MANAPIHTTP_NOEXCEPT {
42+
this->cleanup();
43+
44+
auto it = this->m_data.find(key);
45+
if (it == this->m_data.end())
46+
return manapi::status_not_found();
47+
return &it->second.second;
48+
}
49+
50+
void remove (const K &key) MANAPIHTTP_NOEXCEPT {
51+
auto it = this->m_data.find(key);
52+
if (it != this->m_data.end()) {
53+
this->m_sorted.erase(std::make_pair(it->second.first, it->first));
54+
this->m_data.erase(it);
55+
}
56+
57+
this->cleanup();
58+
}
59+
60+
void clear () MANAPIHTTP_NOEXCEPT {
61+
this->m_data.clear();
62+
this->m_sorted.clear();
63+
}
64+
65+
void cleanup () MANAPIHTTP_NOEXCEPT {
66+
auto now = std::chrono::steady_clock::now();
67+
while (!this->m_sorted.empty()) {
68+
auto it = this->m_sorted.begin();
69+
if (it->first >= now) break;
70+
this->remove(it->second);
71+
}
72+
}
73+
private:
74+
std::map<K, std::pair<std::chrono::steady_clock::time_point, V>> m_data;
75+
std::set<std::pair<std::chrono::steady_clock::time_point, K>> m_sorted;
76+
};
77+
}

src/ManapiEventLoop.cpp

Lines changed: 64 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ namespace manapi::ev::internal {
355355
// std::move_only_function<void()> adding_curl_async_cb{nullptr};
356356
// std::queue<std::shared_ptr<ev::io>> curl_fds{};
357357
std::map<CURL*, curl_res_value_t> curl_res{};
358-
std::shared_ptr<ev::timer> timeout_watcher{nullptr};
358+
manapi::timer timeout_watcher{nullptr};
359359
std::map<socket_t, std::shared_ptr<ev::io>> watchers;
360360
};
361361
#endif
@@ -655,7 +655,10 @@ void manapi::ev::callback_close_cb(uv_handle_t *s) MANAPIHTTP_NOEXCEPT {
655655
}
656656
}
657657

658-
manapi::event_loop::event_loop() = default;
658+
manapi::event_loop::event_loop() {
659+
this->deps_ = 0;
660+
this->flags = 0;
661+
}
659662

660663
manapi::ev::status_or<std::shared_ptr<manapi::event_loop>> manapi::event_loop::create(std::shared_ptr<threadpool> taskpool, std::shared_ptr<manapi::logger> logger) {
661664
auto ev = std::make_shared<manapi::event_loop>();
@@ -714,6 +717,7 @@ manapi::ev::status_or<std::shared_ptr<manapi::event_loop>> manapi::event_loop::c
714717
}
715718

716719
ev->callback_watcher_->adding_async = adding_async_res.unwrap();
720+
ev->callback_watcher_->adding_async->unref();
717721
ev->callback_watcher_->adding_async_cb = [ev = ev.get()] ()
718722
-> void { ev->callback_watcher_->adding_async->send(); };
719723

@@ -740,30 +744,6 @@ manapi::ev::status_or<std::shared_ptr<manapi::event_loop>> manapi::event_loop::c
740744
goto err;
741745
}
742746

743-
#if MANAPIHTTP_CURL_DEPENDENCY
744-
745-
/* curl fetch timeout */
746-
auto timeout_watcher_res = ev->create_watcher_timer([ev = ev.get()] (const std::shared_ptr<ev::timer> &w) -> void {
747-
ev->handle_curl_exec_connections();
748-
ev->handle_curl_check_connections();
749-
750-
w->repeat(50);
751-
w->again();
752-
});
753-
754-
if (!timeout_watcher_res) {
755-
status = timeout_watcher_res.err();
756-
goto err;
757-
}
758-
759-
ev->curl_watcher->timeout_watcher = timeout_watcher_res.unwrap();
760-
761-
if (auto rhs = ev->curl_watcher->timeout_watcher->start(50, 0)) {
762-
status = manapi::ev::status_internal("timeout_watcher::start", rhs);
763-
goto err;
764-
}
765-
#endif
766-
767747
/* interrupted */
768748
auto interrupt_watcher_res = ev->create_watcher_async([ev = ev.get()] (const std::shared_ptr<ev::async> &w)
769749
-> void { async::run(ev->stop()); });
@@ -783,30 +763,25 @@ manapi::ev::status_or<std::shared_ptr<manapi::event_loop>> manapi::event_loop::c
783763
}
784764

785765
err: {
786-
if (ev) {
787-
if (ev->callback_watcher_) {
788-
std::lock_guard<std::mutex> lk (ev->callback_watcher_->adding_mx);
789-
ev->stop_watcher(std::move(ev->callback_watcher_->adding_async));
790-
ev->callback_watcher_->adding_async_cb = nullptr;
791-
}
792-
#if MANAPIHTTP_CURL_DEPENDENCY
793-
if (ev->curl_watcher) {
794-
ev->stop_watcher(std::move(ev->curl_watcher->timeout_watcher));
795-
}
796-
#endif
797-
ev->stop_watcher(std::move(ev->interrupted_watcher_));
798-
ev->stop_watcher(std::move(ev->prepare_tasks_));
799-
ev->stop_watcher(std::move(ev->idle_tasks_));
800-
}
801766
return std::move(status);
802767
}
803768
}
804769

805770
manapi::event_loop::~event_loop() {
771+
if (this->callback_watcher_) {
772+
std::lock_guard<std::mutex> lk (this->callback_watcher_->adding_mx);
773+
this->stop_watcher(std::move(this->callback_watcher_->adding_async));
774+
this->callback_watcher_->adding_async_cb = nullptr;
775+
}
776+
this->stop_watcher(std::move(this->interrupted_watcher_));
777+
this->stop_watcher(std::move(this->prepare_tasks_));
778+
this->stop_watcher(std::move(this->idle_tasks_));
779+
806780
#if MANAPIHTTP_CURL_DEPENDENCY
807781
this->curl_watcher->curl_multi.reset();
808782
#endif
809783
this->etaskpool_.reset();
784+
810785
}
811786

812787
void manapi::event_loop::setup_handle_interrupt() MANAPIHTTP_NOEXCEPT {
@@ -849,19 +824,9 @@ void manapi::event_loop::wait_all_(bool shutdown) MANAPIHTTP_NOEXCEPT {
849824
}
850825

851826
this->stop_watcher(std::move(this->interrupted_watcher_));
852-
this->stop_watcher(std::move(this->callback_watcher_->adding_async));
853827
this->stop_watcher(std::move(this->idle_tasks_));
854828
this->stop_watcher(std::move(this->prepare_tasks_));
855-
#if MANAPIHTTP_CURL_DEPENDENCY
856-
this->stop_watcher(std::move(this->curl_watcher->timeout_watcher));
857-
#endif
858-
manapi::timer checker;
859-
MANAPIHTTP_MUST_ALLOC_START
860-
checker = manapi::async::current()->timerpool()->append_interval_sync(500, [this] (const manapi::timer &) -> void {
861-
this->handle_curl_exec_connections();
862-
this->handle_curl_check_connections();
863-
}).unwrap();
864-
MANAPIHTTP_MUST_ALLOC_END
829+
865830
if (this->loop_) {
866831
auto etaskpool = dynamic_cast<ethreadpool *>(this->etaskpool_.get());
867832

@@ -909,6 +874,7 @@ void manapi::event_loop::wait_all_(bool shutdown) MANAPIHTTP_NOEXCEPT {
909874
});
910875
MANAPIHTTP_MUST_ALLOC_END
911876

877+
this->flags |= EVENT_LOOP_FLAG_ACTIVE;
912878
while (etaskpool->try_task()) {}
913879

914880
etaskpool->set_notify();
@@ -918,24 +884,38 @@ void manapi::event_loop::wait_all_(bool shutdown) MANAPIHTTP_NOEXCEPT {
918884

919885
manapi::async::current()->timerpool()->stop();
920886

921-
this->flags |= EVENT_LOOP_FLAG_ACTIVE;
922887
auto const rhs = ::uv_run(this->loop(), UV_RUN_DEFAULT);
923888
if (this->flags & EVENT_LOOP_FLAG_ACTIVE) {
924889
this->flags ^= EVENT_LOOP_FLAG_ACTIVE;
925890
}
891+
926892
if (!rhs && !::uv_loop_alive(this->loop()) && !this->etaskpool_->tasks_size()) {
927-
manapi_log_trace(manapi::debug::LOG_TRACE_HIGH,
928-
"%s:%s", "eventloop", "uv_loop_alive has returned 0");
929-
break;
893+
894+
std::lock_guard<std::mutex> lk (this->callback_watcher_->adding_mx);
895+
if (shutdown && this->callback_watcher_->adding_async) {
896+
if (!uv_has_ref((uv_handle_t *)this->callback_watcher_->adding_async->custom())) {
897+
this->callback_watcher_->adding_async->ref();
898+
}
899+
if (!this->deps_) {
900+
this->stop_watcher(std::move(this->callback_watcher_->adding_async));
901+
}
902+
}
903+
else {
904+
manapi_log_trace(manapi::debug::LOG_TRACE_HIGH,
905+
"%s:%s", "eventloop", "uv_loop_alive has returned 0");
906+
break;
907+
}
930908
}
931909
}
932910

933-
checker.stop();
934911
etaskpool->set_notify_cb(nullptr);
935912

936913
if (shutdown) {
937914
manapi_log_trace(manapi::debug::LOG_TRACE_MEDIUM, "eventloop:well done");
938915

916+
if (this->curl_watcher->timeout_watcher)
917+
this->curl_watcher->timeout_watcher.stop();
918+
939919
if (auto rhs = ::uv_loop_close(this->loop_.get())) {
940920
#ifndef MANAPIHTTP_DISABLE_TRACE_HARD
941921
::uv_print_all_handles(this->loop(), stderr);
@@ -956,6 +936,16 @@ void manapi::event_loop::wait_all_(bool shutdown) MANAPIHTTP_NOEXCEPT {
956936
}
957937
}
958938

939+
void manapi::event_loop::timerpool_init_(std::shared_ptr<manapi::timerpool> tp) {
940+
#if MANAPIHTTP_CURL_DEPENDENCY
941+
/* curl fetch timeout */
942+
this->curl_watcher->timeout_watcher = tp->append_interval_sync(200, [this] (manapi::timer t) -> void {
943+
this->handle_curl_exec_connections();
944+
this->handle_curl_check_connections();
945+
}).unwrap();
946+
#endif
947+
}
948+
959949
size_t manapi::event_loop::subscribe_finish(int priority, std::move_only_function<manapi::future<void>()> cb) {
960950
auto id = *reinterpret_cast<const std::size_t *> (&cb);
961951

@@ -1145,6 +1135,7 @@ manapi::status manapi::event_loop::custom_callback(std::move_only_function<void(
11451135
}
11461136
this->callback_watcher_->callback_data.push_back({nullptr});
11471137
this->callback_watcher_->callback_data.back().cb = std::move(*cb);
1138+
this->deps_++;
11481139
lk.unlock();
11491140
this->callback_watcher_->adding_mcv.unlock();
11501141
this->callback_watcher_->adding_async_cb();
@@ -1201,6 +1192,8 @@ void manapi::event_loop::custom_watcher_callback_async(const std::shared_ptr<ev:
12011192
auto data = std::move(list.front());
12021193
list.pop_front();
12031194

1195+
this->decrease_deps();
1196+
12041197
try {
12051198
data.cb(this);
12061199
}
@@ -1615,6 +1608,20 @@ void manapi::event_loop::custom_event_loop(std::move_only_function<void()> block
16151608
this->custom_event_loop_ = std::move(block_cb);
16161609
}
16171610

1611+
void manapi::event_loop::increase_deps() MANAPIHTTP_NOEXCEPT {
1612+
std::lock_guard<std::mutex> lk (this->callback_watcher_->adding_mx);
1613+
assert(this->callback_watcher_->adding_async);
1614+
this->deps_++;
1615+
}
1616+
1617+
void manapi::event_loop::decrease_deps() MANAPIHTTP_NOEXCEPT {
1618+
std::lock_guard<std::mutex> lk (this->callback_watcher_->adding_mx);
1619+
assert(this->callback_watcher_->adding_async);
1620+
if (!--this->deps_ && uv_has_ref((uv_handle_t *)this->callback_watcher_->adding_async->custom())) {
1621+
this->callback_watcher_->adding_async->unref();
1622+
}
1623+
}
1624+
16181625
manapi::ev::status_or<std::shared_ptr<manapi::ev::io>> manapi::event_loop::create_watcher_fd(int fd, ev::io_cb callback) MANAPIHTTP_NOEXCEPT {
16191626
try {
16201627
auto w = std::make_shared<ev::io>();

src/ManapiGrpc.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,7 @@ const grpc_event_engine::experimental::EventEngine::ResolvedAddress & manapi::ne
821821

822822
manapi::net::wgrpc::event_engine_wrapper::event_engine_wrapper() : grpc_event_engine::experimental::EventEngine() {
823823
this->primary = manapi::async::current();
824+
this->flags = 0;
824825
}
825826

826827
manapi::net::wgrpc::event_engine_wrapper::~event_engine_wrapper() = default;

src/ManapiTimerPool.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ manapi::timerpool::timerpool() {
2929

3030
}
3131

32-
manapi::status_or<manapi::timerpool> manapi::timerpool::create(std::shared_ptr<event_loop> events) MANAPIHTTP_NOEXCEPT {
32+
manapi::status_or<std::shared_ptr<manapi::timerpool>> manapi::timerpool::create(std::shared_ptr<event_loop> events) MANAPIHTTP_NOEXCEPT {
3333
try {
34-
timerpool d;
35-
d.data_ = std::make_shared<data_t>(sorted_storage(), std::move(events), 0, nullptr);
34+
auto d = std::make_shared<timerpool>();
35+
d->data_ = std::make_shared<data_t>(sorted_storage(), (events), 0, nullptr);
36+
events->timerpool_init_(d);
3637
return std::move(d);
3738
}
3839
catch (std::exception const &e) {

0 commit comments

Comments
 (0)