278 lines
8.7 KiB
Diff
278 lines
8.7 KiB
Diff
From 472e2f00b3fda7dad4396704fd94715d91be4642 Mon Sep 17 00:00:00 2001
|
|
From: jiangheng <jiangheng14@huawei.com>
|
|
Date: Wed, 21 Feb 2024 04:25:43 +0800
|
|
Subject: [PATCH] diff udp and tcp read from stack
|
|
|
|
---
|
|
src/lstack/core/lstack_lwip.c | 211 +++++++++++++++++++++++-----------
|
|
1 file changed, 146 insertions(+), 65 deletions(-)
|
|
|
|
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
|
|
index 50a3389..0b339fe 100644
|
|
--- a/src/lstack/core/lstack_lwip.c
|
|
+++ b/src/lstack/core/lstack_lwip.c
|
|
@@ -831,74 +831,96 @@ static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len)
|
|
return pbuf;
|
|
}
|
|
|
|
-ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags,
|
|
- struct sockaddr *addr, socklen_t *addrlen)
|
|
+static bool recv_break_for_err(struct lwip_sock *sock)
|
|
{
|
|
- size_t recv_left = len;
|
|
- struct pbuf *pbuf = NULL;
|
|
- ssize_t recvd = 0;
|
|
- uint32_t copy_len;
|
|
- struct lwip_sock *sock = get_socket_by_fd(fd);
|
|
- bool latency_enable = get_protocol_stack_group()->latency_start;
|
|
-
|
|
- if (sock->errevent > 0 && !NETCONN_IS_DATAIN(sock)) {
|
|
- errno = err_to_errno(netconn_err(sock->conn));
|
|
- return -1;
|
|
- }
|
|
+ bool break_wait = (sock->errevent > 0) && (!NETCONN_IS_DATAIN(sock));
|
|
+ errno = err_to_errno(netconn_err(sock->conn));
|
|
+ return break_wait;
|
|
+}
|
|
|
|
- thread_bind_stack(sock);
|
|
+static void recv_block_wait(struct lwip_sock *sock)
|
|
+{
|
|
+ lstack_block_wait(sock->wakeup);
|
|
+}
|
|
|
|
- if (sock->same_node_rx_ring != NULL) {
|
|
- return gazelle_same_node_ring_recv(sock, buf, len, flags);
|
|
+/*
|
|
+ * return 0 on success, -1 on error
|
|
+ * pbuf maybe NULL(tcp fin packet)
|
|
+ */
|
|
+static int recv_ring_get_one(struct lwip_sock *sock, bool noblock, struct pbuf **pbuf)
|
|
+{
|
|
+ if (sock->recv_lastdata != NULL) {
|
|
+ *pbuf = sock->recv_lastdata;
|
|
+ sock->recv_lastdata = NULL;
|
|
+ return 0;
|
|
}
|
|
|
|
- while (recv_left > 0) {
|
|
- if (sock->recv_lastdata) {
|
|
- pbuf = sock->recv_lastdata;
|
|
- sock->recv_lastdata = NULL;
|
|
+ if (noblock) {
|
|
+ if (gazelle_ring_read(sock->recv_ring, (void **)pbuf, 1) != 1) {
|
|
+ errno = EAGAIN;
|
|
+ return -1;
|
|
} else {
|
|
- if (netconn_is_nonblocking(sock->conn)) {
|
|
- if (gazelle_ring_read(sock->recv_ring, (void **)&pbuf, 1) != 1) {
|
|
- break;
|
|
- }
|
|
- } else {
|
|
- while (gazelle_ring_read(sock->recv_ring, (void **)&pbuf, 1) != 1 && recvd == 0) {
|
|
- /* if the connection is disconnected, recv return 0 */
|
|
- if (sock->errevent > 0 && !NETCONN_IS_DATAIN(sock)) {
|
|
- errno = err_to_errno(netconn_err(sock->conn));
|
|
- return -1;
|
|
- }
|
|
-
|
|
- lstack_block_wait(sock->wakeup);
|
|
- }
|
|
+ return 0;
|
|
+ }
|
|
+ } else {
|
|
+ while (gazelle_ring_read(sock->recv_ring, (void **)pbuf, 1) != 1) {
|
|
+ if (recv_break_for_err(sock)) {
|
|
+ return -1;
|
|
}
|
|
+ recv_block_wait(sock);
|
|
}
|
|
+ return 0;
|
|
+ }
|
|
+}
|
|
|
|
- /* if udp recv a packet whose len is 0, return 0 */
|
|
- if (NETCONN_IS_UDP(sock) && pbuf->tot_len == 0) {
|
|
- return 0;
|
|
+/* return true: fin is read to user, false: pend fin */
|
|
+static bool recv_ring_handle_fin(struct lwip_sock *sock, struct pbuf *pbuf, ssize_t recvd)
|
|
+{
|
|
+ if (pbuf == NULL) {
|
|
+ if (recvd > 0) {
|
|
+ /* handle data first, then handle fin */
|
|
+ sock->recv_lastdata = (void *)&fin_packet;
|
|
+ gazelle_ring_read_over(sock->recv_ring);
|
|
+ return false;
|
|
}
|
|
+ gazelle_ring_read_over(sock->recv_ring);
|
|
+ return true;
|
|
+ }
|
|
+ /* pending fin */
|
|
+ if (pbuf == (void *)&fin_packet) {
|
|
+ return true;
|
|
+ }
|
|
|
|
- /* fin */
|
|
- if (unlikely(pbuf == NULL)) {
|
|
- if (recvd > 0) {
|
|
- /* read data first, then read fin */
|
|
- sock->recv_lastdata = (void *)&fin_packet;
|
|
- gazelle_ring_read_over(sock->recv_ring);
|
|
- break;
|
|
- }
|
|
- gazelle_ring_read_over(sock->recv_ring);
|
|
- return 0;
|
|
+ return false;
|
|
+}
|
|
+
|
|
+static ssize_t recv_ring_tcp_read(struct lwip_sock *sock, void *buf, size_t len, bool noblock)
|
|
+{
|
|
+ ssize_t recvd = 0;
|
|
+ size_t recv_left = len;
|
|
+ uint32_t copy_len;
|
|
+ struct pbuf *pbuf = NULL;
|
|
+
|
|
+ if (len == 0) {
|
|
+ return 0;
|
|
+ }
|
|
+
|
|
+ while (recv_left > 0) {
|
|
+ if (recv_ring_get_one(sock, noblock, &pbuf) != 0) {
|
|
+ break;
|
|
}
|
|
|
|
- /* pending fin */
|
|
- if (unlikely(pbuf == (void *)&fin_packet)) {
|
|
- return 0;
|
|
+ if (unlikely((pbuf == NULL) || (pbuf == (void *)&fin_packet))) {
|
|
+ if (recv_ring_handle_fin(sock, pbuf, recvd)) {
|
|
+ return 0;
|
|
+ } else {
|
|
+ break; /* recvd > 0, pending fin, handle data */
|
|
+ }
|
|
}
|
|
|
|
copy_len = (recv_left > pbuf->tot_len) ? pbuf->tot_len : recv_left;
|
|
if (copy_len > UINT16_MAX) {
|
|
- copy_len = UINT16_MAX;
|
|
+ copy_len = UINT16_MAX; /* it's impossible to get here */
|
|
}
|
|
pbuf_copy_partial(pbuf, (char *)buf + recvd, copy_len, 0);
|
|
|
|
@@ -907,39 +929,98 @@ ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags
|
|
|
|
if (pbuf->tot_len > copy_len) {
|
|
sock->recv_lastdata = pbuf_free_partial(pbuf, copy_len);
|
|
- break;
|
|
} else {
|
|
if (sock->wakeup) {
|
|
sock->wakeup->stat.app_read_cnt += 1;
|
|
}
|
|
- if (latency_enable) {
|
|
+
|
|
+ if (get_protocol_stack_group()->latency_start) {
|
|
calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ_LSTACK);
|
|
}
|
|
- gazelle_ring_read_over(sock->recv_ring);
|
|
|
|
- /* in udp, if pbuf remaining len less than copy_len, discard these packets */
|
|
- if (recvd > 0 && NETCONN_IS_UDP(sock)) {
|
|
- sock->stack->stats.sock_rx_drop++;
|
|
- break;
|
|
- }
|
|
+ gazelle_ring_read_over(sock->recv_ring);
|
|
}
|
|
}
|
|
|
|
- /* rte_ring_count reduce lock */
|
|
- if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)
|
|
- && (!NETCONN_IS_DATAIN(sock))) {
|
|
- del_sock_event(sock, EPOLLIN);
|
|
+ if (recvd > 0) {
|
|
+ errno = 0;
|
|
+ } else {
|
|
+ recvd = -1;
|
|
}
|
|
|
|
+ return recvd;
|
|
+}
|
|
+
|
|
+static ssize_t recv_ring_udp_read(struct lwip_sock *sock, void *buf, size_t len, bool noblock,
|
|
+ struct sockaddr *addr, socklen_t *addrlen)
|
|
+{
|
|
+ size_t recv_left = len;
|
|
+ struct pbuf *pbuf = NULL;
|
|
+ uint32_t copy_len;
|
|
+
|
|
+ sock->recv_lastdata = NULL;
|
|
+
|
|
+ if (recv_ring_get_one(sock, noblock, &pbuf) != 0) {
|
|
+ /* errno have set */
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ copy_len = (recv_left > pbuf->tot_len) ? pbuf->tot_len : recv_left;
|
|
+ pbuf_copy_partial(pbuf, (char *)buf, copy_len, 0);
|
|
+ /* drop remaining data if have */
|
|
+ gazelle_ring_read_over(sock->recv_ring);
|
|
+
|
|
if (pbuf && addr && addrlen) {
|
|
lwip_sock_make_addr(sock->conn, &(pbuf->addr), pbuf->port, addr, addrlen);
|
|
}
|
|
|
|
- if (recvd == 0) {
|
|
+ if (copy_len < pbuf->tot_len) {
|
|
+ sock->stack->stats.sock_rx_drop++;
|
|
+ }
|
|
+
|
|
+ if (sock->wakeup) {
|
|
+ sock->wakeup->stat.app_read_cnt++;
|
|
+ }
|
|
+ if (get_protocol_stack_group()->latency_start) {
|
|
+ calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ_LSTACK);
|
|
+ }
|
|
+
|
|
+ return copy_len;
|
|
+}
|
|
+
|
|
+ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags,
|
|
+ struct sockaddr *addr, socklen_t *addrlen)
|
|
+{
|
|
+ ssize_t recvd = 0;
|
|
+ struct lwip_sock *sock = get_socket_by_fd(fd);
|
|
+
|
|
+ if (recv_break_for_err(sock)) {
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ thread_bind_stack(sock);
|
|
+
|
|
+ if (sock->same_node_rx_ring != NULL) {
|
|
+ return gazelle_same_node_ring_recv(sock, buf, len, flags);
|
|
+ }
|
|
+
|
|
+ if (NETCONN_IS_UDP(sock)) {
|
|
+ recvd = recv_ring_udp_read(sock, buf, len, netconn_is_nonblocking(sock->conn), addr, addrlen);
|
|
+ } else {
|
|
+ recvd = recv_ring_tcp_read(sock, buf, len, netconn_is_nonblocking(sock->conn));
|
|
+ }
|
|
+
|
|
+ /* rte_ring_count reduce lock */
|
|
+ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)
|
|
+ && (!NETCONN_IS_DATAIN(sock))) {
|
|
+ del_sock_event(sock, EPOLLIN);
|
|
+ }
|
|
+
|
|
+ if (recvd < 0) {
|
|
if (sock->wakeup) {
|
|
sock->wakeup->stat.read_null++;
|
|
}
|
|
- GAZELLE_RETURN(EAGAIN);
|
|
+ return -1;
|
|
}
|
|
return recvd;
|
|
}
|
|
--
|
|
2.27.0
|
|
|