From da54963163baf9213c8cd34da6ec3c533ab1ef9d Mon Sep 17 00:00:00 2001 From: wu-changsheng Date: Tue, 20 Dec 2022 15:42:33 +0800 Subject: [PATCH 2/2] expand-data-recv-buff --- src/common/dpdk_common.h | 2 +- src/lstack/core/lstack_lwip.c | 202 ++++++++++++++++++++++++------- src/lstack/include/lstack_dpdk.h | 2 + 3 files changed, 164 insertions(+), 42 deletions(-) diff --git a/src/common/dpdk_common.h b/src/common/dpdk_common.h index 63d651d..c93f506 100644 --- a/src/common/dpdk_common.h +++ b/src/common/dpdk_common.h @@ -193,7 +193,7 @@ static __rte_always_inline uint32_t gazelle_ring_read(struct rte_ring *r, void * __rte_ring_dequeue_elems(r, prod, obj_table, sizeof(void *), n); - r->prod.head = prod + n; + __atomic_store_n(&r->prod.head, prod + n, __ATOMIC_RELEASE); return n; } diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 0b9b684..32d21b6 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -83,6 +83,11 @@ static void reset_sock_data(struct lwip_sock *sock) sock->send_lastdata = NULL; } + if (sock->lwip_lastdata) { + free_list_pbuf(sock->lwip_lastdata); + sock->lwip_lastdata = NULL; + } + if (sock->send_pre_del) { pbuf_free(sock->send_pre_del); sock->send_pre_del = NULL; @@ -95,6 +100,7 @@ static void reset_sock_data(struct lwip_sock *sock) sock->events = 0; sock->in_send = 0; sock->remain_len = 0; + sock->read_wait = false; if (sock->recv_lastdata) { pbuf_free(sock->recv_lastdata); @@ -185,7 +191,6 @@ void gazelle_init_sock(int32_t fd) init_list_node_null(&sock->recv_list); init_list_node_null(&sock->event_list); init_list_node_null(&sock->send_list); - pthread_spin_init(&sock->sock_lock, PTHREAD_PROCESS_PRIVATE); } void gazelle_clean_sock(int32_t fd) @@ -207,7 +212,6 @@ void gazelle_clean_sock(int32_t fd) list_del_node_null(&sock->recv_list); list_del_node_null(&sock->send_list); - pthread_spin_destroy(&sock->sock_lock); } void gazelle_free_pbuf(struct pbuf *pbuf) @@ -636,64 +640,166 @@ static inline void free_recv_ring_readover(struct rte_ring *ring) } } -ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) +static inline struct pbuf *gazelle_ring_enqueuelast(struct rte_ring *r) { - if (sock->conn->recvmbox == NULL) { - return 0; + struct pbuf *last_pbuf = NULL; + volatile uint32_t head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE); + uint32_t last = r->cons.head - 1; + if (last == head || last - head > r->capacity) { + return NULL; } - free_recv_ring_readover(sock->recv_ring); + __rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1); + __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE); - uint32_t free_count = gazelle_ring_free_count(sock->recv_ring); - if (free_count == 0) { - GAZELLE_RETURN(EAGAIN); + rte_mb(); + + head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE); + if (last == head || last - head > r->capacity) { + __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE); + return NULL; } - uint32_t data_count = rte_ring_count(sock->conn->recvmbox->ring); - uint32_t read_num = LWIP_MIN(free_count, data_count); - read_num = LWIP_MIN(read_num, SOCK_RECV_RING_SIZE); - struct pbuf *pbufs[SOCK_RECV_RING_SIZE]; - uint32_t read_count = 0; - ssize_t recv_len = 0; + return last_pbuf; +} + +static inline struct pbuf *pbuf_last(struct pbuf *pbuf) +{ + while (pbuf->next) { + pbuf = pbuf->next; + } + return pbuf; +} + +static struct pbuf *merge_pbufs(struct pbuf *pbufs[], uint32_t data_count, uint32_t data_len) +{ + struct pbuf *pre_last = (pbufs[0]->last) ? pbufs[0]->last : pbuf_last(pbufs[0]); + + if (data_count <= 1) { + pbufs[0]->last = pre_last; + return pbufs[0]; + } + + for (uint32_t i = 1; i < data_count; i++) { + pre_last->next = pbufs[i]; + pre_last = pbuf_last(pbufs[i]); + } + + pbufs[0]->tot_len = data_len; + pbufs[0]->last = pre_last; + + return pbufs[0]; +} - for (uint32_t i = 0; i < read_num; i++) { +static int32_t get_lwip_pbufs(struct lwip_sock *sock, struct pbuf *pbufs[], uint32_t *data_count, u8_t apiflags) +{ + uint32_t data_len = 0; + + for (uint32_t i = 0; i < *data_count; i++) { err_t err = netconn_recv_tcp_pbuf_flags(sock->conn, &pbufs[i], apiflags); if (err != ERR_OK) { - if (recv_len > 0) { + *data_count = i; + if (data_len > 0) { /* already received data, return that (this trusts in getting the same error from netconn layer again next time netconn_recv is called) */ break; } - - return (err == ERR_CLSD) ? 0 : -1; + return (err == ERR_CLSD) ? -1 : 0; } - recv_len += pbufs[i]->tot_len; - read_count++; + pbufs[i]->last = NULL; + pbufs[i]->in_write = 0; + data_len += pbufs[i]->tot_len; /* once we have some data to return, only add more if we don't need to wait */ apiflags |= NETCONN_DONTBLOCK | NETCONN_NOFIN; } - if (!(flags & MSG_PEEK)) { - uint32_t enqueue_num = gazelle_ring_sp_enqueue(sock->recv_ring, (void **)pbufs, read_count); - for (uint32_t i = enqueue_num; i < read_count; i++) { - /* update receive window */ - tcp_recved(sock->conn->pcb.tcp, pbufs[i]->tot_len); - pbuf_free(pbufs[i]); - sock->stack->stats.read_lwip_drop++; - } + return (int32_t)data_len; +} + +static void put_pbufs_into_recv_ring(struct lwip_sock *sock, struct pbuf *pbufs[], + uint32_t data_count, uint32_t data_len) +{ + uint32_t free_count = gazelle_ring_free_count(sock->recv_ring); + + if (data_count <= free_count) { + (void)gazelle_ring_sp_enqueue(sock->recv_ring, (void **)pbufs, data_count); + return; } - for (uint32_t i = 0; get_protocol_stack_group()->latency_start && i < read_count; i++) { - calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_LWIP); + struct pbuf *new_pbuf = merge_pbufs(pbufs, data_count, data_len); + + if (free_count) { + (void)gazelle_ring_sp_enqueue(sock->recv_ring, (void **)&new_pbuf, 1); + return; + } + + struct pbuf *last_pbuf = gazelle_ring_enqueuelast(sock->recv_ring); + if (last_pbuf == NULL) { + sock->lwip_lastdata = new_pbuf; + return; } - sock->stack->stats.read_lwip_cnt += read_count; - if (recv_len == 0) { + if (last_pbuf->last == NULL) { + last_pbuf->last = pbuf_last(last_pbuf); + } + last_pbuf->last->next = new_pbuf; + last_pbuf->tot_len += new_pbuf->tot_len; + last_pbuf->last = new_pbuf->last; + gazelle_ring_lastover(last_pbuf); + + if (last_pbuf->tot_len > SOCK_READ_MAXLEN) { + sock->read_wait = true; + } +} + +ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) +{ + if (sock->conn->recvmbox == NULL) { + return 0; + } + + free_recv_ring_readover(sock->recv_ring); + + if (sock->read_wait) { GAZELLE_RETURN(EAGAIN); } - return recv_len; + + struct pbuf *pbufs[SOCK_RECV_RING_SIZE]; + uint32_t data_count = rte_ring_count(sock->conn->recvmbox->ring); + int32_t data_len = get_lwip_pbufs(sock, pbufs, &data_count, apiflags); + if (unlikely(data_len < 0)) { + /* ERR_CLSD */ + return 0; + } else if (unlikely(data_len == 0) && !sock->lwip_lastdata) { + GAZELLE_RETURN(EAGAIN); + } + + if (get_protocol_stack_group()->latency_start) { + for (uint32_t i = 0; i < data_count; i++) { + calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_LWIP); + } + } + + if (data_count) { + uint32_t last_len = 0; + if (sock->lwip_lastdata) { + last_len = sock->lwip_lastdata->tot_len; + sock->lwip_lastdata->last->next = pbufs[0]; + sock->lwip_lastdata->tot_len += pbufs[0]->tot_len; + sock->lwip_lastdata->last = pbuf_last(pbufs[0]); + pbufs[0] = sock->lwip_lastdata; + sock->lwip_lastdata = NULL; + } + put_pbufs_into_recv_ring(sock, pbufs, data_count, data_len + last_len); + } else { + put_pbufs_into_recv_ring(sock, &sock->lwip_lastdata, 1, sock->lwip_lastdata->tot_len); + sock->lwip_lastdata = NULL; + } + sock->stack->stats.read_lwip_cnt += data_count; + + return data_len; } static int32_t check_msg_vaild(const struct msghdr *message) @@ -725,9 +831,9 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags) } for (int32_t i = 0; i < message->msg_iovlen; i++) { - if (message->msg_iov[i].iov_len == 0){ - continue; - } + if (message->msg_iov[i].iov_len == 0) { + continue; + } ssize_t recvd_local = read_stack_data(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags); if (recvd_local > 0) { @@ -828,7 +934,7 @@ static inline void del_data_in_event(struct lwip_sock *sock) static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len) { - uint16_t tot_len = pbuf->tot_len - free_len; + uint32_t tot_len = pbuf->tot_len - free_len; while (free_len && pbuf) { if (free_len >= pbuf->len) { @@ -840,7 +946,9 @@ static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len) } } - pbuf->tot_len = tot_len; + if (pbuf) { + pbuf->tot_len = tot_len; + } return pbuf; } @@ -849,7 +957,7 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) size_t recv_left = len; struct pbuf *pbuf = NULL; ssize_t recvd = 0; - uint16_t copy_len; + uint32_t copy_len; struct lwip_sock *sock = get_socket_by_fd(fd); bool latency_enable = get_protocol_stack_group()->latency_start; @@ -870,8 +978,15 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) break; } } + if (__atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE)) { + sock->recv_lastdata = pbuf; + break; + } - copy_len = (recv_left > pbuf->tot_len) ? pbuf->tot_len : (uint16_t)recv_left; + copy_len = (recv_left > pbuf->tot_len) ? pbuf->tot_len : recv_left; + if (copy_len > UINT16_MAX) { + copy_len = UINT16_MAX; + } pbuf_copy_partial(pbuf, (char *)buf + recvd, copy_len, 0); recvd += copy_len; @@ -879,6 +994,7 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) if (pbuf->tot_len > copy_len) { sock->recv_lastdata = pbuf_free_partial(pbuf, copy_len); + break; } else { if (sock->wakeup) { sock->wakeup->stat.app_read_cnt += 1; @@ -890,6 +1006,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) } } + if (sock->read_wait) { + sock->read_wait = false; + } + /* rte_ring_count reduce lock */ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)) { del_data_in_event(sock); diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h index c3bc527..ac068b8 100644 --- a/src/lstack/include/lstack_dpdk.h +++ b/src/lstack/include/lstack_dpdk.h @@ -24,6 +24,8 @@ #define MAX_PACKET_SZ 2048 +#define SOCK_READ_MAXLEN 0x200000 + #define RING_SIZE(x) ((x) - 1) #define MBUF_SZ (MAX_PACKET_SZ + RTE_PKTMBUF_HEADROOM) -- 2.23.0