Skip to content

Commit fb695d2

Browse files
committed
Linux平台下适配recvmmsg接口
1 parent 79c10fe commit fb695d2

File tree

8 files changed

+229
-78
lines changed

8 files changed

+229
-78
lines changed

src/Network/BufferSock.cpp

+128
Original file line numberDiff line numberDiff line change
@@ -466,4 +466,132 @@ BufferList::Ptr BufferList::create(List<std::pair<Buffer::Ptr, bool> > list, Sen
466466
#endif
467467
}
468468

469+
#if defined(__linux) || defined(__linux__)
470+
class SocketRecvmmsgBuffer : public SocketRecvBuffer {
471+
public:
472+
SocketRecvmmsgBuffer(size_t count, size_t size)
473+
: _size(size)
474+
, _iovec(count)
475+
, _mmsgs(count)
476+
, _buffers(count)
477+
, _address(count) {
478+
for (auto i = 0u; i < count; ++i) {
479+
auto buf = BufferRaw::create();
480+
buf->setCapacity(size);
481+
482+
_buffers[i] = buf;
483+
auto &mmsg = _mmsgs[i];
484+
auto &addr = _address[i];
485+
mmsg.msg_len = 0;
486+
mmsg.msg_hdr.msg_name = &addr;
487+
mmsg.msg_hdr.msg_namelen = sizeof(addr);
488+
mmsg.msg_hdr.msg_iov = &_iovec[i];
489+
mmsg.msg_hdr.msg_iov->iov_base = buf->data();
490+
mmsg.msg_hdr.msg_iov->iov_len = buf->getCapacity() - 1;
491+
mmsg.msg_hdr.msg_iovlen = 1;
492+
mmsg.msg_hdr.msg_control = nullptr;
493+
mmsg.msg_hdr.msg_controllen = 0;
494+
mmsg.msg_hdr.msg_flags = 0;
495+
}
496+
}
497+
498+
ssize_t recvFromSocket(int fd, ssize_t &count) override {
499+
for (auto i = 0; i < _last_count; ++i) {
500+
auto &mmsg = _mmsgs[i];
501+
mmsg.msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
502+
auto &buf = _buffers[i];
503+
if (!buf) {
504+
auto raw = BufferRaw::create();
505+
raw->setCapacity(_size);
506+
buf = raw;
507+
mmsg.msg_hdr.msg_iov->iov_base = buf->data();
508+
}
509+
}
510+
do {
511+
count = recvmmsg(fd, &_mmsgs[0], _mmsgs.size(), 0, nullptr);
512+
} while (-1 == count && UV_EINTR == get_uv_error(true));
513+
514+
_last_count = count;
515+
if (count <= 0) {
516+
return count;
517+
}
518+
519+
ssize_t nread = 0;
520+
for (auto i = 0; i < count; ++i) {
521+
auto &mmsg = _mmsgs[i];
522+
nread += mmsg.msg_len;
523+
524+
auto buf = std::static_pointer_cast<BufferRaw>(_buffers[i]);
525+
buf->setSize(mmsg.msg_len);
526+
buf->data()[mmsg.msg_len] = '\0';
527+
}
528+
return nread;
529+
}
530+
531+
Buffer::Ptr &getBuffer(size_t index) override { return _buffers[index]; }
532+
533+
struct sockaddr_storage &getAddress(size_t index) override { return _address[index]; }
534+
535+
private:
536+
size_t _size;
537+
ssize_t _last_count { 0 };
538+
std::vector<struct iovec> _iovec;
539+
std::vector<struct mmsghdr> _mmsgs;
540+
std::vector<Buffer::Ptr> _buffers;
541+
std::vector<struct sockaddr_storage> _address;
542+
};
543+
#endif
544+
545+
class SocketRecvFromBuffer : public SocketRecvBuffer {
546+
public:
547+
SocketRecvFromBuffer(size_t size): _size(size) {}
548+
549+
ssize_t recvFromSocket(int fd, ssize_t &count) override {
550+
ssize_t nread;
551+
socklen_t len = sizeof(_address);
552+
if (!_buffer) {
553+
allocBuffer();
554+
}
555+
556+
do {
557+
nread = recvfrom(fd, _buffer->data(), _buffer->getCapacity() - 1, 0, (struct sockaddr *)&_address, &len);
558+
} while (-1 == nread && UV_EINTR == get_uv_error(true));
559+
560+
if (nread > 0) {
561+
count = 1;
562+
_buffer->data()[nread] = '\0';
563+
std::static_pointer_cast<BufferRaw>(_buffer)->setSize(nread);
564+
}
565+
return nread;
566+
}
567+
568+
Buffer::Ptr &getBuffer(size_t index) override { return _buffer; }
569+
570+
struct sockaddr_storage &getAddress(size_t index) override { return _address; }
571+
572+
private:
573+
void allocBuffer() {
574+
auto buf = BufferRaw::create();
575+
buf->setCapacity(_size);
576+
_buffer = std::move(buf);
577+
}
578+
579+
private:
580+
size_t _size;
581+
Buffer::Ptr _buffer;
582+
struct sockaddr_storage _address;
583+
};
584+
585+
static constexpr auto kPacketCount = 32;
586+
static constexpr auto kBufferCapacity = 4 * 1024u;
587+
588+
SocketRecvBuffer::Ptr SocketRecvBuffer::create(bool is_udp) {
589+
#if defined(__linux) || defined(__linux__)
590+
if (is_udp) {
591+
return std::make_shared<SocketRecvmmsgBuffer>(kPacketCount, kBufferCapacity);
592+
}
593+
#endif
594+
return std::make_shared<SocketRecvFromBuffer>(kPacketCount * kBufferCapacity);
595+
}
596+
469597
} //toolkit

src/Network/BufferSock.h

+13
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,18 @@ class BufferList : public noncopyable {
6969
ObjectStatistic<BufferList> _statistic;
7070
};
7171

72+
class SocketRecvBuffer {
73+
public:
74+
using Ptr = std::shared_ptr<SocketRecvBuffer>;
75+
76+
virtual ~SocketRecvBuffer() = default;
77+
78+
virtual ssize_t recvFromSocket(int fd, ssize_t &count) = 0;
79+
virtual Buffer::Ptr &getBuffer(size_t index) = 0;
80+
virtual struct sockaddr_storage &getAddress(size_t index) = 0;
81+
82+
static Ptr create(bool is_udp);
83+
};
84+
7285
}
7386
#endif //ZLTOOLKIT_BUFFERSOCK_H

src/Network/Socket.cpp

+27-23
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,27 @@ Socket::~Socket() {
8080
}
8181

8282
void Socket::setOnRead(onReadCB cb) {
83+
onMultiReadCB cb2;
84+
if (cb) {
85+
cb2 = [cb](Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count) {
86+
for (auto i = 0u; i < count; ++i) {
87+
cb(buf[i], (struct sockaddr *)(addr + i), sizeof(struct sockaddr_storage));
88+
}
89+
};
90+
}
91+
setOnMultiRead(std::move(cb2));
92+
}
93+
94+
void Socket::setOnMultiRead(onMultiReadCB cb) {
8395
LOCK_GUARD(_mtx_event);
8496
if (cb) {
85-
_on_read = std::move(cb);
97+
_on_multi_read = std::move(cb);
8698
} else {
87-
_on_read = [](const Buffer::Ptr &buf, struct sockaddr *, int) { WarnL << "Socket not set read callback, data ignored: " << buf->size(); };
99+
_on_multi_read = [](Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count) {
100+
for (auto i = 0u; i < count; ++i) {
101+
WarnL << "Socket not set read callback, data ignored: " << buf[i]->size();
102+
}
103+
};
88104
}
89105
}
90106

@@ -246,7 +262,7 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) {
246262
}
247263

248264
// tcp客户端或udp
249-
auto read_buffer = _poller->getSharedBuffer();
265+
auto read_buffer = _poller->getSharedBuffer(sock->type() == SockNum::Sock_UDP);
250266
auto result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, sock, read_buffer](int event) {
251267
auto strong_self = weak_self.lock();
252268
if (!strong_self) {
@@ -267,20 +283,11 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) {
267283
return -1 != result;
268284
}
269285

270-
ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept {
271-
ssize_t ret = 0, nread = 0;
272-
auto data = buffer->data();
273-
// 最后一个字节设置为'\0'
274-
auto capacity = buffer->getCapacity() - 1;
275-
276-
struct sockaddr_storage addr;
277-
socklen_t len = sizeof(addr);
286+
ssize_t Socket::onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &buffer) noexcept {
287+
ssize_t ret = 0, nread = 0, count = 0;
278288

279289
while (_enable_recv) {
280-
do {
281-
nread = recvfrom(sock->rawFd(), data, capacity, 0, (struct sockaddr *)&addr, &len);
282-
} while (-1 == nread && UV_EINTR == get_uv_error(true));
283-
290+
nread = buffer->recvFromSocket(sock->rawFd(), count);
284291
if (nread == 0) {
285292
if (sock->type() == SockNum::Sock_TCP) {
286293
emitErr(SockException(Err_eof, "end of file"));
@@ -302,21 +309,18 @@ ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) n
302309
return ret;
303310
}
304311

312+
ret += nread;
305313
if (_enable_speed) {
306314
// 更新接收速率
307315
_recv_speed += nread;
308316
}
309317

310-
ret += nread;
311-
data[nread] = '\0';
312-
// 设置buffer有效数据大小
313-
buffer->setSize(nread);
314-
315-
// 触发回调
316-
LOCK_GUARD(_mtx_event);
318+
auto &buf = buffer->getBuffer(0);
319+
auto &addr = buffer->getAddress(0);
317320
try {
318321
// 此处捕获异常,目的是防止数据未读尽,epoll边沿触发失效的问题
319-
_on_read(buffer, (struct sockaddr *)&addr, len);
322+
LOCK_GUARD(_mtx_event);
323+
_on_multi_read(&buf, &addr, count);
320324
} catch (std::exception &ex) {
321325
ErrorL << "Exception occurred when emit on_read: " << ex.what();
322326
}

src/Network/Socket.h

+36-33
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,9 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
282282
public:
283283
using Ptr = std::shared_ptr<Socket>;
284284
//接收数据回调
285-
using onReadCB = std::function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
285+
using onReadCB = std::function<void(Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
286+
using onMultiReadCB = std::function<void(Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count)>;
287+
286288
//发生错误回调
287289
using onErrCB = std::function<void(const SockException &err)>;
288290
//tcp监听接收到连接请求
@@ -352,6 +354,7 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
352354
* @param cb 回调对象
353355
*/
354356
void setOnRead(onReadCB cb);
357+
void setOnMultiRead(onMultiReadCB cb);
355358

356359
/**
357360
* 设置异常事件(包括eof等)回调
@@ -515,7 +518,7 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
515518

516519
void setSock(SockNum::Ptr sock);
517520
int onAccept(const SockNum::Ptr &sock, int event) noexcept;
518-
ssize_t onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept;
521+
ssize_t onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &buffer) noexcept;
519522
void onWriteAble(const SockNum::Ptr &sock);
520523
void onConnected(const SockNum::Ptr &sock, const onErrCB &cb);
521524
void onFlushed();
@@ -528,67 +531,67 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
528531
bool fromSock_l(SockNum::Ptr sock);
529532

530533
private:
531-
//send socket时的flag
534+
// send socket时的flag
532535
int _sock_flags = SOCKET_DEFAULE_FLAGS;
533-
//最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数
536+
// 最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数
534537
uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000;
535-
//控制是否接收监听socket可读事件,关闭后可用于流量控制
536-
std::atomic<bool> _enable_recv {true};
537-
//标记该socket是否可写,socket写缓存满了就不可写
538-
std::atomic<bool> _sendable {true};
539-
//是否已经触发err回调了
538+
// 控制是否接收监听socket可读事件,关闭后可用于流量控制
539+
std::atomic<bool> _enable_recv { true };
540+
// 标记该socket是否可写,socket写缓存满了就不可写
541+
std::atomic<bool> _sendable { true };
542+
// 是否已经触发err回调了
540543
bool _err_emit = false;
541-
//是否启用网速统计
544+
// 是否启用网速统计
542545
bool _enable_speed = false;
543546
// udp发送目标地址
544547
std::shared_ptr<struct sockaddr_storage> _udp_send_dst;
545548

546-
//接收速率统计
549+
// 接收速率统计
547550
BytesSpeed _recv_speed;
548-
//发送速率统计
551+
// 发送速率统计
549552
BytesSpeed _send_speed;
550553

551-
//tcp连接超时定时器
554+
// tcp连接超时定时器
552555
Timer::Ptr _con_timer;
553-
//tcp连接结果回调对象
556+
// tcp连接结果回调对象
554557
std::shared_ptr<void> _async_con_cb;
555558

556-
//记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器
559+
// 记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器
557560
Ticker _send_flush_ticker;
558-
//socket fd的抽象类
561+
// socket fd的抽象类
559562
SockFD::Ptr _sock_fd;
560-
//本socket绑定的poller线程,事件触发于此线程
563+
// 本socket绑定的poller线程,事件触发于此线程
561564
EventPoller::Ptr _poller;
562-
//跨线程访问_sock_fd时需要上锁
565+
// 跨线程访问_sock_fd时需要上锁
563566
mutable MutexWrapper<std::recursive_mutex> _mtx_sock_fd;
564567

565-
//socket异常事件(比如说断开)
568+
// socket异常事件(比如说断开)
566569
onErrCB _on_err;
567-
//收到数据事件
568-
onReadCB _on_read;
569-
//socket缓存清空事件(可用于发送流速控制)
570+
// 收到数据事件
571+
onMultiReadCB _on_multi_read;
572+
// socket缓存清空事件(可用于发送流速控制)
570573
onFlush _on_flush;
571-
//tcp监听收到accept请求事件
574+
// tcp监听收到accept请求事件
572575
onAcceptCB _on_accept;
573-
//tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
576+
// tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
574577
onCreateSocket _on_before_accept;
575-
//设置上述回调函数的锁
578+
// 设置上述回调函数的锁
576579
MutexWrapper<std::recursive_mutex> _mtx_event;
577580

578-
//一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
579-
List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
580-
//一级发送缓存锁
581+
// 一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
582+
List<std::pair<Buffer::Ptr, bool>> _send_buf_waiting;
583+
// 一级发送缓存锁
581584
MutexWrapper<std::recursive_mutex> _mtx_send_buf_waiting;
582-
//二级发送缓存, socket可写时,会把二级缓存批量写入到socket
585+
// 二级发送缓存, socket可写时,会把二级缓存批量写入到socket
583586
List<BufferList::Ptr> _send_buf_sending;
584-
//二级发送缓存锁
587+
// 二级发送缓存锁
585588
MutexWrapper<std::recursive_mutex> _mtx_send_buf_sending;
586-
//发送buffer结果回调
589+
// 发送buffer结果回调
587590
BufferList::SendResult _send_result;
588-
//对象个数统计
591+
// 对象个数统计
589592
ObjectStatistic<Socket> _statistic;
590593

591-
//链接缓存地址,防止tcp reset 导致无法获取对端的地址
594+
// 链接缓存地址,防止tcp reset 导致无法获取对端的地址
592595
struct sockaddr_storage _local_addr;
593596
struct sockaddr_storage _peer_addr;
594597
};

0 commit comments

Comments
 (0)