提升 Node.js UDP 的性能?
網絡 IO 性能一直是熱門的討論話題,不管是操作系統還是應用軟件也在不斷優化網絡 IO 的性能。我們知道系統調用是相對耗時的,所以性能優化的一種方式就是減少系統調用。本文介紹最近嘗試在 Node.js 中通過引入 recvmmsg 提升 UDP 接收數據性能的相關內容,相比之前每調一次系統調用讀取一個數據包,recvmmsg 可以一次讀取多個包,從而減少系統調用提升性能。
UDP 的實現
下面是在 Node.js 中使用 UDP 的例子。
const dgram = require('dgram')
const socket = dgram.createSocket({type: 'udp4', msgCount: 10});
socket.bind(9999, function() {
socket.on('message',function() {
});
})其底層實現如下。
- 初始化 udp handle。
r = uv_udp_init(env->event_loop(), &handle_);- 注冊讀事件。
int UDPWrap::RecvStart() {
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
return err;
}uv_udp_recv_start 用于注冊讀事件,當有數據到來時執行 OnAlloc 分配內存,然后執行 OnRecv 消費數據。
- 分配保存數據的內存。 OnAlloc 實現如下:
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
return env()->allocate_managed_buffer(suggested_size);
}
uv_buf_t Environment::allocate_managed_buffer(const size_t suggested_size) {
std::unique_ptr<BackingStore> bs = ArrayBuffer::NewBackingStore(
isolate(),
suggested_size,
BackingStoreInitializationMode::kUninitialized);
uv_buf_t buf = uv_buf_init(static_cast<char*>(bs->Data()), bs->ByteLength());
released_allocated_buffers_.emplace(buf.base, std::move(bs));
return buf;
}Node.js 通過 V8 的 BackingStore 申請內存,然后把這塊內存傳給 Libuv 讀取數據,最后記錄內存首地址和 BackingStore 的映射關系,后續會用到。
- 消費數據 Libuv 讀取數據后會調 OnRecv 通知調用者。
void UDPWrap::OnRecv(ssize_t nread,
const uv_buf_t& buf_,
const sockaddr* addr,
unsigned int flags) {
Environment* env = this->env();
Isolate* isolate = env->isolate();
// 通過 buf 找到對應的 BackingStore,然后回調 JS
std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
Local<Value> argv[] = {
Integer::New(isolate, static_cast<int32_t>(nread)),
object(),
Undefined(isolate),
Undefined(isolate)
};
if (nread < 0) {
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
return;
} else if (nread == 0) {
bs = ArrayBuffer::NewBackingStore(isolate, 0);
} else if (static_cast<size_t>(nread) != bs->ByteLength()) { // 讀取的數據和預分配的不一致,需要創建新的 BackingStore 并把數據復制過去
std::unique_ptr<BackingStore> old_bs = std::move(bs);
bs = ArrayBuffer::NewBackingStore(
isolate, nread, BackingStoreInitializationMode::kUninitialized);
memcpy(bs->Data(), old_bs->Data(), nread);
}
// ...
// ...
}可以看到 UDP 的實現不算復雜。接著看如何在這個基礎上引入 recvmmsg 能力。
引入 recvmmsg
Libuv 本身支持 recvmmsg,所以我們不需要關系平臺兼容性問題(目前支持多個平臺,但并不支持在所有平臺中使用)。
- 設置使用 recvmmsg。
env->event_loop(), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG);通過 UV_UDP_RECVMMSG 標記可以使得 Libuv 使用 recvmmsg 接收 UDP 數據。
- 分配內存。 分配內存這里有點棘手,使用 UV_UDP_RECVMMSG 時需要分配一大塊內容,足以保存多個數據包的數據,而之前的實現中,每次是通過 BackingStore 分配一塊獨立的內容,用完后就釋放的。所以這里不能使用 BackingStore 了,直接使用原生的內存分配。
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
if (using_recvmmsg()) {
// msg_count_ 表示每次讀取多少個數據包
suggested_size *= msg_count_;
// 在當前對象記錄正在使用的內存,后面分析
mmsg_buf_ = uv_buf_init(reinterpret_cast<char*>(malloc(suggested_size)),
suggested_size);
return mmsg_buf_;
}
return env()->allocate_managed_buffer(suggested_size);
}通過 malloc 分配接收多個數據包所需要的一大片內存,每個數據包對應其中的一個分片。
- 消費數據 消費數據和之前的邏輯也有些不同,之前是消費完數據后就可以釋放對應的內存了,但是引入 recvmmsg 后,每次是分配一大塊內存的,每個數據包對應其中的一個分片,所以消費完一個數據包還不能釋放整個內存,需要全部數據包才能釋放這個內容。Libuv 的實現如下:
// buf->len 對應 OnAlloc 分片的一大片內存,除以 UV__UDP_DGRAM_MAXSIZ 表示一次最多接收幾個數據包
chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
if (chunks > ARRAY_SIZE(iov))
chunks = ARRAY_SIZE(iov);
for (k = 0; k < chunks; ++k) {
iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
// ...
}
do
nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
while (nread == -1 && errno == EINTR);
if (nread < 1) {
if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
handle->recv_cb(handle, 0, buf, NULL, 0);
else
handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
} else {
/* pass each chunk to the application */
for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
flags = UV_UDP_MMSG_CHUNK;
if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
flags |= UV_UDP_PARTIAL;
chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
handle->recv_cb(handle,
msgs[k].msg_len,
&chunk_buf,
msgs[k].msg_hdr.msg_name,
flags);
}
/* one last callback so the original buffer is freed */
if (handle->recv_cb != NULL)
handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
}另外之前是通過 BackingStore 分配內存來和 uv_buf_t 關聯來實現的,我們這里則是通過 malloc 直接分配的內容,所以接收數據時就不能通過 uv_buf_t 來查看對應的 BackingStore 了。
void UDPWrap::OnRecv(ssize_t nread,
const uv_buf_t& buf_,
const sockaddr* addr,
unsigned int flags) {
Environment* env = this->env();
Isolate* isolate = env->isolate();
std::unique_ptr<BackingStore> bs;
// 處理完本輪所有數據包,釋放內存
auto cleanup = OnScopeLeave([&]() {
if (using_recvmmsg() && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
release_buf();
}
});
// 歷史邏輯
if (!using_recvmmsg()) {
bs = env->release_managed_buffer(buf_);
}
Local<Value> argv[] = {
Integer::New(isolate, static_cast<int32_t>(nread)),
object(),
Undefined(isolate),
Undefined(isolate)
};
if (nread < 0) {
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
return;
} else if (nread == 0) {
bs = ArrayBuffer::NewBackingStore(isolate, 0);
} else if (using_recvmmsg()) {
// 創建一個 BackingStore 并把數據包的數據復制過去
bs = ArrayBuffer::NewBackingStore(
isolate, nread, BackingStoreInitializationMode::kUninitialized);
memcpy(bs->Data(), buf_.base, nread);
} else if (static_cast<size_t>(nread) != bs->ByteLength()) {
std::unique_ptr<BackingStore> old_bs = std::move(bs);
bs = ArrayBuffer::NewBackingStore(
isolate, nread, BackingStoreInitializationMode::kUninitialized);
memcpy(bs->Data(), old_bs->Data(), nread);
}
// ...
}上面的改動包括兩方面,一是兼容之前的收到數據時創建 BackingStore 的邏輯,這樣可以很大程度地復用之前的邏輯,二是判斷是否需要釋放內存。
- 避免內存泄漏 從上面的處理中可以看到,當本輪的所有數據包處理完后,我們就會正常釋放申請的大片內存,但是存在一個問題是假設分配了 1MB * 10 大小的內存,然后通過 recvmmsg 從操作系統獲取了 10 個數據包,但是處理第一個數據包時用戶調了 close 關閉了 socket,這樣會導致 Libuv 不會回調 Node.js 了,從而導致之前申請的大片內存沒有釋放,造成內存泄漏。所以在申請內存時需要記錄這個內存,然后 socket 關閉時釋放(獲取處理完所有的數據包后釋放)。
UDPWrap::~UDPWrap() {
// Libuv does not release the memory of memory which allocated
// by handle->alloc_cb when we call close in handle->read_cb,
// so we should release the memory here if necessary.
// 開啟了 recvmmsg 并且沒有釋放申請的內存則釋放
if (uv_udp_using_recvmmsg(reinterpret_cast<uv_udp_t*>(&handle_)) {
release_buf();
}
}
void release_buf() {
if (mmsg_buf_.base != nullptr) {
free(mmsg_buf_.base);
mmsg_buf_ = uv_buf_init(nullptr, 0);
}
}總結
recvmmsg 通過減少系統調用提升性能,但是不一定在什么場景下都有可觀的效果,比如在 QPS 高時可能比較有意義,另外因為每次都需要提前分配內容,所以數據量小時可能會導致每次分配過多無用的內存,然后又被釋放。該特性目前還沒有在 Node.js 中進行性能測試。
- PR:https://github.com/nodejs/node/pull/59126
























