Skip to content

Commit e87afd4

Browse files
committed
Refactor ReactorPoll, Fixed a crash caused by adding and removing events in the Poll event handler on the Cygwin platform. Fix GH-5760 GH-5759 GH-5758
1 parent 3139c77 commit e87afd4

File tree

7 files changed

+88
-128
lines changed

7 files changed

+88
-128
lines changed

ext-src/swoole_pgsql.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525

2626
using swoole::Coroutine;
2727
using swoole::Reactor;
28+
using swoole::translate_events_to_poll;
2829
using swoole::coroutine::Socket;
29-
using swoole::coroutine::translate_events_to_poll;
3030

3131
static SW_THREAD_LOCAL bool swoole_pgsql_blocking = true;
3232

include/swoole_coroutine_system.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#pragma once
1919

2020
#include "swoole_coroutine.h"
21-
#include "swoole_file.h"
2221

2322
#include <vector>
2423

@@ -39,9 +38,6 @@ struct PollSocket {
3938
}
4039
};
4140

42-
int16_t translate_events_to_poll(int events);
43-
int translate_events_from_poll(int16_t events);
44-
4541
class System {
4642
public:
4743
static void init_reactor(Reactor *reactor);

include/swoole_reactor.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,9 +297,7 @@ class Reactor {
297297
return error_handler[fd_type] ? error_handler[fd_type] : default_error_handler;
298298
default:
299299
abort();
300-
break;
301300
}
302-
return nullptr;
303301
}
304302

305303
ReactorHandler get_error_handler(const FdType fd_type) const {
@@ -339,6 +337,10 @@ class Reactor {
339337
_socket->events = events;
340338
}
341339

340+
bool _exists(const network::Socket *_socket) {
341+
return sockets_.find(_socket->fd) != sockets_.end();
342+
}
343+
342344
void _del(network::Socket *_socket) {
343345
_socket->events = 0;
344346
_socket->removed = 1;
@@ -377,6 +379,9 @@ class Reactor {
377379
return events & SW_EVENT_ERROR;
378380
}
379381
};
382+
383+
int16_t translate_events_to_poll(int events);
384+
int translate_events_from_poll(int16_t events);
380385
} // namespace swoole
381386

382387
#define SW_REACTOR_CONTINUE \

src/coroutine/hook.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030

3131
using swoole::AsyncEvent;
3232
using swoole::Coroutine;
33+
using swoole::translate_events_from_poll;
34+
using swoole::translate_events_to_poll;
3335
using swoole::async::dispatch;
3436
using swoole::coroutine::async;
3537
using swoole::coroutine::PollSocket;
3638
using swoole::coroutine::Socket;
3739
using swoole::coroutine::System;
38-
using swoole::coroutine::translate_events_from_poll;
39-
using swoole::coroutine::translate_events_to_poll;
4040

4141
#ifdef SW_USE_IOURING
4242
using swoole::Iouring;

src/coroutine/system.cc

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -379,36 +379,6 @@ static int socket_poll_error_callback(Reactor *reactor, Event *event) {
379379
return SW_OK;
380380
}
381381

382-
int16_t translate_events_to_poll(int events) {
383-
int16_t poll_events = 0;
384-
385-
if (events & SW_EVENT_READ) {
386-
poll_events |= POLLIN;
387-
}
388-
if (events & SW_EVENT_WRITE) {
389-
poll_events |= POLLOUT;
390-
}
391-
392-
return poll_events;
393-
}
394-
395-
int translate_events_from_poll(int16_t events) {
396-
int sw_events = 0;
397-
398-
if (events & POLLIN) {
399-
sw_events |= SW_EVENT_READ;
400-
}
401-
if (events & POLLOUT) {
402-
sw_events |= SW_EVENT_WRITE;
403-
}
404-
// ignore ERR and HUP, because event is already processed at IN and OUT handler.
405-
if ((((events & POLLERR) || (events & POLLHUP)) && !((events & POLLIN) || (events & POLLOUT)))) {
406-
sw_events |= SW_EVENT_ERROR;
407-
}
408-
409-
return sw_events;
410-
}
411-
412382
bool System::socket_poll(std::unordered_map<int, PollSocket> &fds, double timeout) {
413383
if (timeout == 0) {
414384
struct pollfd *event_list = (struct pollfd *) sw_calloc(fds.size(), sizeof(struct pollfd));

src/reactor/base.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ Reactor::Reactor(int max_event, Type _type) {
7777
switch (type_) {
7878
#ifdef HAVE_EPOLL
7979
case TYPE_EPOLL:
80-
impl = make_reactor_epoll(this, max_event);
80+
impl = make_reactor_poll(this, max_event);
8181
break;
8282
#endif
8383
#ifdef HAVE_KQUEUE

src/reactor/poll.cc

Lines changed: 77 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,12 @@
2121
#include "swoole_reactor.h"
2222

2323
namespace swoole {
24-
2524
using network::Socket;
2625

2726
class ReactorPoll final : public ReactorImpl {
28-
uint32_t max_fd_num;
29-
Socket **fds_;
3027
pollfd *events_;
31-
bool exists(int fd) const;
32-
void set_events(int index, int events) const;
28+
int max_events_;
29+
int set_events() const;
3330

3431
public:
3532
ReactorPoll(Reactor *_reactor, int max_events);
@@ -48,34 +45,30 @@ ReactorImpl *make_reactor_poll(Reactor *_reactor, int max_events) {
4845
}
4946

5047
ReactorPoll::ReactorPoll(Reactor *_reactor, int max_events) : ReactorImpl(_reactor) {
51-
fds_ = new Socket *[max_events];
5248
events_ = new pollfd[max_events];
53-
54-
max_fd_num = max_events;
49+
max_events_ = max_events;
5550
reactor_->max_event_num = max_events;
5651
}
5752

5853
ReactorPoll::~ReactorPoll() {
59-
delete[] fds_;
6054
delete[] events_;
6155
}
6256

63-
void ReactorPoll::set_events(const int index, const int events) const {
64-
events_[index].events = 0;
65-
if (Reactor::isset_read_event(events)) {
66-
events_[index].events |= POLLIN;
67-
}
68-
if (Reactor::isset_write_event(events)) {
69-
events_[index].events |= POLLOUT;
70-
}
71-
if (Reactor::isset_error_event(events)) {
72-
events_[index].events |= POLLHUP;
57+
int ReactorPoll::set_events() const {
58+
const auto sockets = reactor_->get_sockets();
59+
int count = 0;
60+
for (const auto pair : sockets) {
61+
const auto _socket = pair.second;
62+
events_[count].fd = _socket->fd;
63+
events_[count].events = translate_events_to_poll(_socket->events);
64+
events_[count].revents = 0;
65+
count++;
7366
}
67+
return count;
7468
}
7569

7670
int ReactorPoll::add(Socket *socket, const int events) {
77-
int fd = socket->fd;
78-
if (exists(fd)) {
71+
if (reactor_->_exists(socket)) {
7972
swoole_error_log(
8073
SW_LOG_WARNING,
8174
SW_ERROR_EVENT_ADD_FAILED,
@@ -89,45 +82,38 @@ int ReactorPoll::add(Socket *socket, const int events) {
8982
return SW_ERR;
9083
}
9184

92-
const int cur = reactor_->get_event_num();
93-
if (reactor_->get_event_num() == max_fd_num) {
85+
if (reactor_->get_event_num() == static_cast<size_t>(max_events_)) {
9486
swoole_error_log(
95-
SW_LOG_WARNING, SW_ERROR_EVENT_ADD_FAILED, "too many sockets, the max events is %d", max_fd_num);
87+
SW_LOG_WARNING, SW_ERROR_EVENT_ADD_FAILED, "too many sockets, the max events is %d", max_events_);
9688
swoole_print_backtrace_on_error();
9789
return SW_ERR;
9890
}
9991

92+
swoole_trace("fd=%d, events=%d", socket->fd, events);
10093
reactor_->_add(socket, events);
10194

102-
swoole_trace("fd=%d, events=%d", fd, events);
103-
104-
fds_[cur] = socket;
105-
events_[cur].fd = fd;
106-
set_events(cur, events);
107-
10895
return SW_OK;
10996
}
11097

11198
int ReactorPoll::set(Socket *socket, int events) {
112-
swoole_trace("fd=%d, events=%d", socket->fd, events);
113-
114-
SW_LOOP_N(reactor_->get_event_num()) {
115-
if (events_[i].fd == socket->fd) {
116-
set_events(i, events);
117-
reactor_->_set(socket, events);
118-
return SW_OK;
119-
}
99+
if (!reactor_->_exists(socket)) {
100+
swoole_error_log(
101+
SW_LOG_WARNING,
102+
SW_ERROR_SOCKET_NOT_EXISTS,
103+
"[Reactor#%d] failed to set events[fd=%d, fd_type=%d, events=%d], the socket#%d has already been removed",
104+
reactor_->id,
105+
socket->fd,
106+
socket->fd_type,
107+
events,
108+
socket->fd);
109+
swoole_print_backtrace_on_error();
110+
return SW_ERR;
120111
}
121112

122-
swoole_error_log(SW_LOG_WARNING,
123-
SW_ERROR_SOCKET_NOT_EXISTS,
124-
"[Reactor#%d] failed to set events[fd=%d, fd_type=%d, events=%d], the socket#%d is not exists",
125-
reactor_->id,
126-
socket->fd,
127-
socket->fd_type,
128-
events,
129-
socket->fd);
130-
return SW_ERR;
113+
swoole_trace("fd=%d, events=%d", socket->fd, events);
114+
reactor_->_set(socket, events);
115+
116+
return SW_OK;
131117
}
132118

133119
int ReactorPoll::del(Socket *socket) {
@@ -144,62 +130,46 @@ int ReactorPoll::del(Socket *socket) {
144130
return SW_ERR;
145131
}
146132

147-
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) {
148-
if (events_[i].fd == socket->fd) {
149-
for (; i < reactor_->get_event_num(); i++) {
150-
if (i == reactor_->get_event_num() - 1) {
151-
fds_[i] = nullptr;
152-
events_[i].fd = 0;
153-
events_[i].events = 0;
154-
} else {
155-
fds_[i] = fds_[i + 1];
156-
events_[i] = events_[i + 1];
157-
}
158-
}
159-
reactor_->_del(socket);
160-
return SW_OK;
161-
}
133+
if (!reactor_->_exists(socket)) {
134+
swoole_error_log(SW_LOG_WARNING,
135+
SW_ERROR_SOCKET_NOT_EXISTS,
136+
"[Reactor#%d] failed to delete events[fd=%d, fd_type=%d], the socket#%d is not exists",
137+
reactor_->id,
138+
socket->fd,
139+
socket->fd_type,
140+
socket->fd);
141+
swoole_print_backtrace_on_error();
142+
return SW_ERR;
162143
}
163144

164-
swoole_error_log(SW_LOG_WARNING,
165-
SW_ERROR_SOCKET_NOT_EXISTS,
166-
"[Reactor#%d] failed to delete events[fd=%d, fd_type=%d], the socket#%d is not exists",
167-
reactor_->id,
168-
socket->fd,
169-
socket->fd_type,
170-
socket->fd);
171-
swoole_print_backtrace_on_error();
172-
173-
return SW_ERR;
145+
reactor_->_del(socket);
146+
return SW_OK;
174147
}
175148

176149
int ReactorPoll::wait() {
177150
Event event;
178151
ReactorHandler handler;
179152

180-
int ret;
181153
reactor_->before_wait();
182154

183155
while (reactor_->running) {
184156
reactor_->execute_begin_callback();
185-
186-
ret = poll(events_, reactor_->get_event_num(), reactor_->get_timeout_msec());
157+
const int event_num = set_events();
158+
int ret = poll(events_, event_num, reactor_->get_timeout_msec());
187159
if (ret < 0) {
188160
if (!reactor_->catch_error()) {
189-
swoole_sys_warning("[Reactor#%d] poll(nfds=%zu, timeout=%d) failed",
161+
swoole_sys_warning("[Reactor#%d] poll(nfds=%d, timeout=%d) failed",
190162
reactor_->id,
191-
reactor_->get_event_num(),
163+
event_num,
192164
reactor_->get_timeout_msec());
193165
break;
194-
} else {
195-
goto _continue;
196166
}
197167
} else if (ret == 0) {
198168
reactor_->execute_end_callbacks(true);
199169
SW_REACTOR_CONTINUE;
200170
} else {
201-
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) {
202-
event.socket = fds_[i];
171+
for (int i = 0; i < event_num; i++) {
172+
event.socket = reactor_->get_socket(events_[i].fd);
203173
event.fd = events_[i].fd;
204174
event.reactor_id = reactor_->id;
205175
event.type = event.socket->fd_type;
@@ -245,20 +215,39 @@ int ReactorPoll::wait() {
245215
}
246216
}
247217
}
248-
_continue:
249218
reactor_->execute_end_callbacks(false);
250219
SW_REACTOR_CONTINUE;
251220
}
252221
return SW_OK;
253222
}
254223

255-
bool ReactorPoll::exists(int fd) const {
256-
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) {
257-
if (events_[i].fd == fd) {
258-
return true;
259-
}
224+
int16_t translate_events_to_poll(int events) {
225+
int16_t poll_events = 0;
226+
227+
if (events & SW_EVENT_READ) {
228+
poll_events |= POLLIN;
229+
}
230+
if (events & SW_EVENT_WRITE) {
231+
poll_events |= POLLOUT;
260232
}
261-
return false;
233+
234+
return poll_events;
262235
}
263236

237+
int translate_events_from_poll(int16_t events) {
238+
int sw_events = 0;
239+
240+
if (events & POLLIN) {
241+
sw_events |= SW_EVENT_READ;
242+
}
243+
if (events & POLLOUT) {
244+
sw_events |= SW_EVENT_WRITE;
245+
}
246+
// ignore ERR and HUP, because event is already processed at IN and OUT handler.
247+
if ((((events & POLLERR) || (events & POLLHUP)) && !((events & POLLIN) || (events & POLLOUT)))) {
248+
sw_events |= SW_EVENT_ERROR;
249+
}
250+
251+
return sw_events;
252+
}
264253
} // namespace swoole

0 commit comments

Comments
 (0)