From 8fcc0033d10f7bac263759794199b4ec96372a92 Mon Sep 17 00:00:00 2001 From: wu-changsheng Date: Sat, 3 Dec 2022 21:20:28 +0800 Subject: [PATCH 2/2] optimize app thread write buff block --- src/common/dpdk_common.h | 39 +-- src/common/gazelle_dfx_msg.h | 6 +- src/common/gazelle_opt.h | 2 + src/lstack/api/lstack_epoll.c | 2 +- src/lstack/core/lstack_dpdk.c | 9 + src/lstack/core/lstack_lwip.c | 389 +++++++++++++++++++------ src/lstack/core/lstack_stack_stat.c | 2 +- src/lstack/core/lstack_thread_rpc.c | 13 + src/lstack/include/lstack_dpdk.h | 10 +- src/lstack/include/lstack_lwip.h | 7 +- src/lstack/include/lstack_thread_rpc.h | 2 + src/lstack/netif/lstack_ethdev.c | 4 +- src/ltran/ltran_dfx.c | 17 +- 13 files changed, 366 insertions(+), 136 deletions(-) diff --git a/src/common/dpdk_common.h b/src/common/dpdk_common.h index 753c168..a0c304c 100644 --- a/src/common/dpdk_common.h +++ b/src/common/dpdk_common.h @@ -13,6 +13,7 @@ #ifndef __GAZELLE_DPDK_COMMON_H__ #define __GAZELLE_DPDK_COMMON_H__ +#include #include #include @@ -24,7 +25,7 @@ #define PTR_TO_PRIVATE(mbuf) RTE_PTR_ADD(mbuf, sizeof(struct rte_mbuf)) /* Layout: - * | rte_mbuf | pbuf | custom_free_function | payload | + * | rte_mbuf | pbuf | custom_free_function | tcp_seg | payload | **/ struct pbuf; static inline struct rte_mbuf *pbuf_to_mbuf(struct pbuf *p) @@ -148,7 +149,6 @@ static __rte_always_inline uint32_t gazelle_ring_sp_enqueue(struct rte_ring *r, return 0; } - __rte_ring_enqueue_elems(r, head, obj_table, sizeof(void *), n); __atomic_store_n(&r->cons.head, head + n, __ATOMIC_RELEASE); @@ -169,39 +169,13 @@ static __rte_always_inline uint32_t gazelle_ring_sc_dequeue(struct rte_ring *r, return 0; } - __rte_ring_dequeue_elems(r, cons, obj_table, sizeof(void *), n); - r->cons.tail = cons + n; - - return n; -} - -/* get ring obj dont dequeue */ -static __rte_always_inline uint32_t gazelle_ring_sc_peek(struct rte_ring *r, void **obj_table, uint32_t n) -{ - uint32_t prod = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE); - uint32_t cons = r->cons.tail; - - uint32_t entries = prod - cons; - if (n > entries) { - n = entries; - } - if (unlikely(n == 0)) { - return 0; - } - - - __rte_ring_dequeue_elems(r, cons, obj_table, sizeof(void *), n); + __atomic_store_n(&r->cons.tail, cons + n, __ATOMIC_RELEASE); return n; } -static __rte_always_inline void gazelle_ring_dequeue_over(struct rte_ring *r, uint32_t n) -{ - r->cons.tail += n; -} - static __rte_always_inline uint32_t gazelle_ring_read(struct rte_ring *r, void **obj_table, uint32_t n) { uint32_t cons = __atomic_load_n(&r->cons.head, __ATOMIC_ACQUIRE); @@ -222,11 +196,6 @@ static __rte_always_inline uint32_t gazelle_ring_read(struct rte_ring *r, void * return n; } -static __rte_always_inline void gazelle_ring_read_n(struct rte_ring *r, uint32_t n) -{ - __atomic_store_n(&r->prod.tail, r->prod.tail + n, __ATOMIC_RELEASE); -} - static __rte_always_inline void gazelle_ring_read_over(struct rte_ring *r) { __atomic_store_n(&r->prod.tail, r->prod.head, __ATOMIC_RELEASE); @@ -240,7 +209,7 @@ static __rte_always_inline uint32_t gazelle_ring_readover_count(struct rte_ring static __rte_always_inline uint32_t gazelle_ring_readable_count(const struct rte_ring *r) { rte_smp_rmb(); - return r->cons.head - r->prod.tail; + return r->cons.head - r->prod.head; } static __rte_always_inline uint32_t gazelle_ring_count(const struct rte_ring *r) diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h index f6f8d0e..0bdd238 100644 --- a/src/common/gazelle_dfx_msg.h +++ b/src/common/gazelle_dfx_msg.h @@ -70,7 +70,7 @@ struct gazelle_stack_stat { struct gazelle_wakeup_stat { uint64_t app_events; - uint64_t app_write_idlefail; + uint64_t app_write_rpc; uint64_t app_write_cnt; uint64_t app_read_cnt; uint64_t read_null; @@ -157,8 +157,10 @@ struct gazelle_stat_lstack_conn_info { uint32_t send_ring_cnt; uint32_t recv_ring_cnt; uint32_t tcp_sub_state; - int32_t sem_cnt; + uint32_t send_back_cnt; int32_t fd; + uint64_t recv_all; + uint64_t send_all; }; struct gazelle_stat_lstack_conn { diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h index 4262e90..aac1ec9 100644 --- a/src/common/gazelle_opt.h +++ b/src/common/gazelle_opt.h @@ -47,6 +47,8 @@ #define RTE_TEST_TX_DESC_DEFAULT 2048 #define RTE_TEST_RX_DESC_DEFAULT 4096 +#define MBUF_MAX_DATA_LEN 1460 + #define DPDK_PKT_BURST_SIZE 512 /* total:33 client, index 32 is invaild client */ diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c index 35df625..4ea6474 100644 --- a/src/lstack/api/lstack_epoll.c +++ b/src/lstack/api/lstack_epoll.c @@ -321,7 +321,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even if (CONN_TYPE_HAS_HOST(sock->conn)) { int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event); if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "fd=%d epfd=%d op=%d\n", fd, epfd, op); + LSTACK_LOG(ERR, LSTACK, "fd=%d epfd=%d op=%d errno=%d\n", fd, epfd, op, errno); } } diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index 2e8fb45..b9f2793 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -194,6 +194,15 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num) return 0; } +int32_t gazelle_alloc_mbuf_with_reserve(struct rte_mempool *pool, struct rte_mbuf **mbufs, unsigned count) +{ + if (rte_mempool_avail_count(pool) < RESERVE_NIC_RECV) { + return -1; + } + + return rte_pktmbuf_alloc_bulk(pool, mbufs, count); +} + struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id) { char ring_name[RTE_RING_NAMESIZE] = {0}; diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index d30ecdc..2cda2d9 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -34,9 +34,6 @@ #include "dpdk_common.h" #include "lstack_lwip.h" -#define HALF_DIVISOR (2) -#define USED_IDLE_WATERMARK (VDEV_IDLE_QUEUE_SZ >> 2) - static void free_ring_pbuf(struct rte_ring *ring) { void *pbufs[SOCK_RECV_RING_SIZE]; @@ -55,20 +52,41 @@ static void free_ring_pbuf(struct rte_ring *ring) } while (gazelle_ring_readover_count(ring)); } +static void free_list_pbuf(struct pbuf *pbuf) +{ + while (pbuf) { + struct pbuf *del_pbuf = pbuf; + pbuf = pbuf->next; + + del_pbuf->next = NULL; + pbuf_free(del_pbuf); + } +} + static void reset_sock_data(struct lwip_sock *sock) { /* check null pointer in ring_free func */ if (sock->recv_ring) { free_ring_pbuf(sock->recv_ring); rte_ring_free(sock->recv_ring); + sock->recv_ring = NULL; } - sock->recv_ring = NULL; if (sock->send_ring) { free_ring_pbuf(sock->send_ring); rte_ring_free(sock->send_ring); + sock->send_ring = NULL; + } + + if (sock->send_lastdata) { + free_list_pbuf(sock->send_lastdata); + sock->send_lastdata = NULL; + } + + if (sock->send_pre_del) { + pbuf_free(sock->send_pre_del); + sock->send_pre_del = NULL; } - sock->send_ring = NULL; sock->stack = NULL; sock->wakeup = NULL; @@ -76,6 +94,7 @@ static void reset_sock_data(struct lwip_sock *sock) sock->epoll_events = 0; sock->events = 0; sock->in_send = 0; + sock->remain_len = 0; if (sock->recv_lastdata) { pbuf_free(sock->recv_lastdata); @@ -97,6 +116,9 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u pbuf->l4_len = 0; pbuf->header_off = 0; pbuf->rexmit = 0; + pbuf->in_write = 0; + pbuf->head = 0; + pbuf->last = pbuf; } return pbuf; @@ -110,13 +132,13 @@ static bool replenish_send_idlembuf(struct protocol_stack *stack, struct rte_rin uint32_t replenish_cnt = gazelle_ring_free_count(ring); uint32_t alloc_num = LWIP_MIN(replenish_cnt, RING_SIZE(SOCK_SEND_RING_SIZE)); - if (rte_pktmbuf_alloc_bulk(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)pbuf, alloc_num) != 0) { + if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)pbuf, alloc_num) != 0) { stack->stats.tx_allocmbuf_fail++; return true; } for (uint32_t i = 0; i < alloc_num; i++) { - pbuf[i] = init_mbuf_to_pbuf(pbuf[i], PBUF_TRANSPORT, TCP_MSS, PBUF_RAM); + pbuf[i] = init_mbuf_to_pbuf(pbuf[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM); } uint32_t num = gazelle_ring_sp_enqueue(ring, pbuf, alloc_num); @@ -158,6 +180,7 @@ void gazelle_init_sock(int32_t fd) init_list_node_null(&sock->recv_list); init_list_node_null(&sock->event_list); init_list_node_null(&sock->send_list); + pthread_spin_init(&sock->sock_lock, PTHREAD_PROCESS_PRIVATE); } void gazelle_clean_sock(int32_t fd) @@ -179,6 +202,7 @@ void gazelle_clean_sock(int32_t fd) list_del_node_null(&sock->recv_list); list_del_node_null(&sock->send_list); + pthread_spin_destroy(&sock->sock_lock); } void gazelle_free_pbuf(struct pbuf *pbuf) @@ -196,7 +220,7 @@ int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, { struct pbuf_custom *pbuf_custom = NULL; - int32_t ret = rte_pktmbuf_alloc_bulk(pool, mbufs, num); + int32_t ret = gazelle_alloc_mbuf_with_reserve(pool, mbufs, num); if (ret != 0) { return ret; } @@ -214,7 +238,7 @@ struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type) struct rte_mbuf *mbuf; struct protocol_stack *stack = get_protocol_stack(); - if (rte_pktmbuf_alloc_bulk(stack->rxtx_pktmbuf_pool, &mbuf, 1) != 0) { + if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, &mbuf, 1) != 0) { stack->stats.tx_allocmbuf_fail++; return NULL; } @@ -226,23 +250,55 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8 { struct pbuf *pbuf = NULL; - if (gazelle_ring_sc_peek(sock->send_ring, (void **)&pbuf, 1) != 1) { - *apiflags &= ~TCP_WRITE_FLAG_MORE; + if (unlikely(sock->send_pre_del)) { + pbuf = sock->send_pre_del; + if (pbuf->tot_len > remain_size || + (pbuf->head && __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE))) { + *apiflags &= ~TCP_WRITE_FLAG_MORE; + return NULL; + } + + if (pbuf->next) { + sock->send_lastdata = pbuf->next; + pbuf->next = NULL; + } + return pbuf; + } + + if (sock->send_lastdata) { + pbuf = sock->send_lastdata; + if (pbuf->tot_len > remain_size) { + *apiflags &= ~TCP_WRITE_FLAG_MORE; + return NULL; + } + sock->send_pre_del = pbuf; + sock->send_lastdata = pbuf->next; + pbuf->next = NULL; + return pbuf; + } + + gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf, 1); + if (pbuf == NULL) { return NULL; } + sock->send_pre_del = pbuf; - if (pbuf->tot_len > remain_size) { + if (pbuf->tot_len > remain_size || __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE)) { *apiflags &= ~TCP_WRITE_FLAG_MORE; + pbuf->head = 1; return NULL; } + sock->send_lastdata = pbuf->next; + pbuf->next = NULL; return pbuf; } -void write_lwip_over(struct lwip_sock *sock, uint32_t n) +void write_lwip_over(struct lwip_sock *sock) { - gazelle_ring_dequeue_over(sock->send_ring, n); - sock->stack->stats.write_lwip_cnt += n; + sock->send_pre_del = NULL; + sock->send_all++; + sock->stack->stats.write_lwip_cnt++; } static inline void del_data_out_event(struct lwip_sock *sock) @@ -261,21 +317,174 @@ static inline void del_data_out_event(struct lwip_sock *sock) pthread_spin_unlock(&sock->wakeup->event_list_lock); } -void write_stack_over(struct lwip_sock *sock) +static ssize_t do_app_write(struct pbuf *pbufs[], void *buf, size_t len, uint32_t write_num) { - if (sock->send_lastdata) { - sock->send_lastdata->tot_len = sock->send_lastdata->len = sock->send_datalen; - sock->send_lastdata = NULL; + ssize_t send_len = 0; + uint32_t i = 0; + + for (i = 0; i < write_num - 1; i++) { + rte_prefetch0(pbufs[i + 1]); + rte_prefetch0(pbufs[i + 1]->payload); + rte_prefetch0((char *)buf + send_len + MBUF_MAX_DATA_LEN); + pbuf_take(pbufs[i], (char *)buf + send_len, MBUF_MAX_DATA_LEN); + pbufs[i]->tot_len = pbufs[i]->len = MBUF_MAX_DATA_LEN; + send_len += MBUF_MAX_DATA_LEN; } + /* reduce the branch in loop */ + uint16_t copy_len = len - send_len; + pbuf_take(pbufs[i], (char *)buf + send_len, copy_len); + pbufs[i]->tot_len = pbufs[i]->len = copy_len; + send_len += copy_len; + + return send_len; +} + +static inline ssize_t app_direct_write(struct protocol_stack *stack, struct lwip_sock *sock, void *buf, + size_t len, uint32_t write_num) +{ + struct pbuf **pbufs = (struct pbuf **)malloc(write_num * sizeof(struct pbuf *)); + if (pbufs == NULL) { + return 0; + } + + /* first pbuf get from send_ring. and malloc pbufs attach to first pbuf */ + if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)&pbufs[1], write_num - 1) != 0) { + stack->stats.tx_allocmbuf_fail++; + free(pbufs); + return 0; + } + + (void)gazelle_ring_read(sock->send_ring, (void **)&pbufs[0], 1); + + uint32_t i = 1; + for (; i < write_num - 1; i++) { + rte_prefetch0(mbuf_to_pbuf((void *)pbufs[i + 1])); + pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM); + pbufs[i - 1]->next = pbufs[i]; + } + if (write_num > 1) { + pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM); + pbufs[i - 1]->next = pbufs[i]; + } + + ssize_t send_len = do_app_write(pbufs, buf, len, write_num); + gazelle_ring_read_over(sock->send_ring); - if (sock->wakeup) { - sock->wakeup->stat.app_write_cnt++; - if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) { - del_data_out_event(sock); - } + pbufs[0]->last = pbufs[write_num - 1]; + sock->remain_len = 0; + free(pbufs); + return send_len; +} + +static inline ssize_t app_direct_attach(struct protocol_stack *stack, struct pbuf *attach_pbuf, void *buf, + size_t len, uint32_t write_num) +{ + struct pbuf **pbufs = (struct pbuf **)malloc(write_num * sizeof(struct pbuf *)); + if (pbufs == NULL) { + return 0; + } + + /* first pbuf get from send_ring. and malloc pbufs attach to first pbuf */ + if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)pbufs, write_num) != 0) { + stack->stats.tx_allocmbuf_fail++; + free(pbufs); + return 0; + } + + pbufs[0] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[0], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM); + uint32_t i = 1; + for (; i < write_num - 1; i++) { + rte_prefetch0(mbuf_to_pbuf((void *)pbufs[i + 1])); + pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM); + pbufs[i - 1]->next = pbufs[i]; } + if (write_num > 1) { + pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM); + pbufs[i - 1]->next = pbufs[i]; + } + + ssize_t send_len = do_app_write(pbufs, buf, len, write_num); + + attach_pbuf->last->next = pbufs[0]; + attach_pbuf->last = pbufs[write_num - 1]; + + free(pbufs); + return send_len; +} + +static inline ssize_t app_buff_write(struct lwip_sock *sock, void *buf, size_t len, uint32_t write_num) +{ + struct pbuf *pbufs[SOCK_SEND_RING_SIZE]; + + (void)gazelle_ring_read(sock->send_ring, (void **)pbufs, write_num); + + ssize_t send_len = do_app_write(pbufs, buf, len, write_num); + + gazelle_ring_read_over(sock->send_ring); + + sock->remain_len = MBUF_MAX_DATA_LEN - pbufs[write_num - 1]->len; + return send_len; +} + +static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r) +{ + struct pbuf *last_pbuf = NULL; + volatile uint32_t tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); + uint32_t last = r->prod.tail - 1; + if (last == tail || last - tail > r->capacity) { + return NULL; + } + + __rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1); + __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE); + + rte_mb(); + + tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); + if (last == tail || last - tail > r->capacity) { + __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE); + return NULL; + } + + return last_pbuf; +} + +static inline void gazelle_ring_lastover(struct pbuf *last_pbuf) +{ + __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE); +} + +static inline size_t merge_data_lastpbuf(struct lwip_sock *sock, void *buf, size_t len) +{ + struct pbuf *last_pbuf = gazelle_ring_readlast(sock->send_ring); + if (last_pbuf == NULL) { + sock->remain_len = 0; + return 0; + } + + if (last_pbuf->next || last_pbuf->len >= MBUF_MAX_DATA_LEN) { + sock->remain_len = 0; + gazelle_ring_lastover(last_pbuf); + return 0; + } + + size_t send_len = MBUF_MAX_DATA_LEN - last_pbuf->len; + if (send_len >= len) { + sock->remain_len = send_len - len; + send_len = len; + } else { + sock->remain_len = 0; + } + + uint16_t offset = last_pbuf->len; + last_pbuf->tot_len = last_pbuf->len = offset + send_len; + pbuf_take_at(last_pbuf, buf, send_len, offset); + + gazelle_ring_lastover(last_pbuf); + + return send_len; } ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) @@ -284,44 +493,49 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) GAZELLE_RETURN(ENOTCONN); } - struct pbuf *pbuf = NULL; + struct protocol_stack *stack = sock->stack; + struct wakeup_poll *wakeup = sock->wakeup; + if (!stack|| len == 0 || !wakeup) { + return 0; + } + ssize_t send_len = 0; - uint32_t send_pkt = 0; - while (send_len < len) { - if (sock->send_lastdata) { - pbuf = sock->send_lastdata; - } else { - if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) { - if (sock->wakeup) { - sock->wakeup->stat.app_write_idlefail++; - } - break; - } - sock->send_lastdata = pbuf; - sock->send_datalen = 0; + /* merge data into last pbuf */ + if (sock->remain_len) { + send_len = merge_data_lastpbuf(sock, (char *)buf, len); + if (send_len >= len) { + return len; } + } - uint16_t remian_len = pbuf->len - sock->send_datalen; - uint16_t copy_len = (len - send_len > remian_len) ? remian_len : (len - send_len); - pbuf_take_at(pbuf, (char *)buf + send_len, copy_len, sock->send_datalen); - sock->send_datalen += copy_len; - if (sock->send_datalen >= pbuf->len) { - sock->send_lastdata = NULL; - pbuf->tot_len = pbuf->len = sock->send_datalen; - send_pkt++; - } + uint32_t write_num = (len - send_len + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN; + uint32_t write_avail = gazelle_ring_readable_count(sock->send_ring); - send_len += copy_len; + /* send_ring is full, data attach last pbuf */ + if (write_avail == 0) { + struct pbuf *last_pbuf = gazelle_ring_readlast(sock->send_ring); + if (last_pbuf) { + send_len += app_direct_attach(stack, last_pbuf, (char *)buf + send_len, len - send_len, write_num); + gazelle_ring_lastover(last_pbuf); + wakeup->stat.app_write_cnt += write_num; + } else { + (void)rpc_call_replenish(stack, sock); + wakeup->stat.app_write_rpc++; + } + sock->remain_len = 0; + return send_len; } - if (sock->wakeup) { - sock->wakeup->stat.app_write_cnt += send_pkt; - } + /* send_ring have idle */ + send_len += (write_num <= write_avail) ? app_buff_write(sock, (char *)buf + send_len, len - send_len, write_num) : + app_direct_write(stack, sock, (char *)buf + send_len, len - send_len, write_num); + wakeup->stat.app_write_cnt += write_num; - if (send_len == 0) { - usleep(100); + if (wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) { + del_data_out_event(sock); } + return send_len; } @@ -340,6 +554,14 @@ static inline bool replenish_send_ring(struct protocol_stack *stack, struct lwip return replenish_again; } +void rpc_replenish(struct rpc_msg *msg) +{ + struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_0].p; + struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_1].p; + + msg->result = replenish_send_ring(stack, sock); +} + static inline bool do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock, int32_t flags) { /* send all send_ring, so len set lwip send max. */ @@ -375,6 +597,7 @@ void stack_send(struct rpc_msg *msg) list_add_node(&stack->send_list, &sock->send_list); __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); } + stack->stats.send_self_rpc++; } } @@ -389,6 +612,13 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max) list_for_each_safe(node, temp, &stack->send_list) { sock = container_of(node, struct lwip_sock, send_list); + if (++read_num > send_max) { + /* list head move to next send */ + list_del_node(&stack->send_list); + list_add_node(&sock->send_list, &stack->send_list); + break; + } + __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE); rte_mb(); @@ -397,22 +627,6 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max) continue; } - if (tcp_sndbuf(sock->conn->pcb.tcp) < TCP_MSS) { - __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); - continue; - } - - if (!NETCONN_IS_DATAOUT(sock)) { - replenish_again = replenish_send_ring(stack, sock); - if (replenish_again) { - __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); - continue; - } - - list_del_node_null(&sock->send_list); - continue; - } - replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0); if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) { @@ -420,10 +634,6 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max) } else { __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); } - - if (++read_num >= send_max) { - break; - } } } @@ -491,6 +701,7 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_LWIP); } + sock->recv_all += read_count; sock->stack->stats.read_lwip_cnt += read_count; if (recv_len == 0) { GAZELLE_RETURN(EAGAIN); @@ -572,7 +783,6 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags) if (send <= 0) { return send; } - write_stack_over(sock); notice_stack_send(sock, fd, send, flags); return send; @@ -608,7 +818,6 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags) } if (buflen > 0) { - write_stack_over(sock); notice_stack_send(sock, s, buflen, flags); } return buflen; @@ -724,10 +933,16 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num) struct lwip_sock *sock; uint32_t read_num = 0; - struct list_node *last_node = list->prev; list_for_each_safe(node, temp, list) { sock = container_of(node, struct lwip_sock, recv_list); + if (++read_num >= max_num) { + /* list head move to next send */ + list_del_node(&stack->recv_list); + list_add_node(&sock->recv_list, &stack->recv_list); + break; + } + if (sock->conn == NULL || sock->conn->recvmbox == NULL || rte_ring_count(sock->conn->recvmbox->ring) == 0) { list_del_node_null(&sock->recv_list); continue; @@ -741,11 +956,6 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num) } else if (len > 0) { add_sock_event(sock, EPOLLIN); } - - /* last_node:recv only once per sock. max_num avoid cost too much time this loop */ - if (++read_num >= max_num || last_node == node) { - break; - } } } @@ -772,6 +982,19 @@ void gazelle_connected_callback(struct netconn *conn) add_sock_event(sock, EPOLLOUT); } +static uint32_t send_back_count(struct lwip_sock *sock) +{ + uint32_t count = 0; + struct pbuf *pbuf = sock->send_lastdata; + + while (pbuf) { + count++; + pbuf = pbuf->next; + } + + return count; +} + static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const struct tcp_pcb *pcb) { struct netconn *netconn = (struct netconn *)pcb->callback_arg; @@ -791,8 +1014,10 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) { conn->recv_ring_cnt = gazelle_ring_readable_count(sock->recv_ring); conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0; - + conn->send_back_cnt = send_back_count(sock); conn->send_ring_cnt = gazelle_ring_readover_count(sock->send_ring); + conn->recv_all = sock->recv_all; + conn->send_all = sock->send_all; } } } diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c index 6261fa9..45f84a7 100644 --- a/src/lstack/core/lstack_stack_stat.c +++ b/src/lstack/core/lstack_stack_stat.c @@ -106,7 +106,7 @@ static void get_wakeup_stat(struct protocol_stack_group *stack_group, struct pro stat->app_events += wakeup->stat.app_events; stat->read_null += wakeup->stat.read_null; stat->app_write_cnt += wakeup->stat.app_write_cnt; - stat->app_write_idlefail += wakeup->stat.app_write_idlefail; + stat->app_write_rpc += wakeup->stat.app_write_rpc; stat->app_read_cnt += wakeup->stat.app_read_cnt; } } diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index 295baf3..29ca4e4 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -440,6 +440,19 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp) return rpc_sync_call(&stack->rpc_queue, msg); } +int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock) +{ + struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_replenish); + if (msg == NULL) { + return -1; + } + + msg->args[MSG_ARG_0].p = stack; + msg->args[MSG_ARG_1].p = sock; + + return rpc_sync_call(&stack->rpc_queue, msg); +} + int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h index 8d68e06..c042ef5 100644 --- a/src/lstack/include/lstack_dpdk.h +++ b/src/lstack/include/lstack_dpdk.h @@ -15,13 +15,15 @@ #include "gazelle_opt.h" -#define RXTX_NB_MBUF (128 * 2000) /* mbuf per connect * connect num */ +#define RXTX_NB_MBUF (256 * 2000) /* mbuf per connect * connect num. size of mbuf is 2536 Byte */ #define RXTX_CACHE_SZ (VDEV_RX_QUEUE_SZ) #define KNI_NB_MBUF (DEFAULT_RING_SIZE << 4) -#define MBUF_HEADER_LEN 64 +#define RESERVE_NIC_RECV (1024) -#define MAX_PACKET_SZ 2048 +#define MBUF_HEADER_LEN 64 + +#define MAX_PACKET_SZ 2048 #define RING_SIZE(x) ((x) - 1) @@ -37,6 +39,7 @@ int thread_affinity_init(int cpu_id); struct protocol_stack; struct rte_mempool; struct rte_ring; +struct rte_mbuf; int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, uint32_t mbuf_num); int32_t dpdk_eal_init(void); int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num); @@ -49,5 +52,6 @@ void dpdk_skip_nic_init(void); int32_t dpdk_init_lstack_kni(void); void dpdk_restore_pci(void); bool port_in_stack_queue(uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); +int32_t gazelle_alloc_mbuf_with_reserve(struct rte_mempool *pool, struct rte_mbuf **mbufs, unsigned count); #endif /* GAZELLE_DPDK_H */ diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h index 968eff2..b24006a 100644 --- a/src/lstack/include/lstack_lwip.h +++ b/src/lstack/include/lstack_lwip.h @@ -15,19 +15,20 @@ #define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox)) #define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata)) -#define NETCONN_IS_DATAOUT(sock) gazelle_ring_readover_count((sock)->send_ring) +#define NETCONN_IS_DATAOUT(sock) (gazelle_ring_readover_count((sock)->send_ring) || (sock)->send_lastdata || (sock)->send_pre_del) #define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring) struct lwip_sock; struct rte_mempool; struct rpc_msg; struct rte_mbuf; +struct protocol_stack; void create_shadow_fd(struct rpc_msg *msg); void gazelle_init_sock(int32_t fd); int32_t gazelle_socket(int domain, int type, int protocol); void gazelle_clean_sock(int32_t fd); struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags); -void write_lwip_over(struct lwip_sock *sock, uint32_t n); +void write_lwip_over(struct lwip_sock *sock); ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len); ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags); ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags); @@ -39,10 +40,12 @@ void get_lwip_conntable(struct rpc_msg *msg); void get_lwip_connnum(struct rpc_msg *msg); void stack_recvlist_count(struct rpc_msg *msg); void stack_send(struct rpc_msg *msg); +void app_rpc_write(struct rpc_msg *msg); int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num); void gazelle_free_pbuf(struct pbuf *pbuf); ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags); ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags); ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags); +void rpc_replenish(struct rpc_msg *msg); #endif diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index 6928f98..2c1202e 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -51,6 +51,7 @@ struct rpc_msg { struct protocol_stack; struct rte_mbuf; struct wakeup_poll; +struct lwip_sock; void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num); void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup); int32_t rpc_call_msgcnt(struct protocol_stack *stack); @@ -75,5 +76,6 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen); int32_t rpc_call_fcntl(int fd, int cmd, long val); int32_t rpc_call_ioctl(int fd, long cmd, void *argp); +int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock); #endif diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c index 87fe9ae..7984ded 100644 --- a/src/lstack/netif/lstack_ethdev.c +++ b/src/lstack/netif/lstack_ethdev.c @@ -82,7 +82,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack) } } -#define READ_PKTS_MAX 32 +#define READ_PKTS_MAX 128 int32_t eth_dev_poll(void) { uint32_t nr_pkts; @@ -183,7 +183,7 @@ static err_t eth_dev_output(struct netif *netif, struct pbuf *pbuf) if (likely(first_mbuf->pkt_len > MBUF_MAX_LEN)) { mbuf->ol_flags |= RTE_MBUF_F_TX_TCP_SEG; - mbuf->tso_segsz = TCP_MSS; + mbuf->tso_segsz = MBUF_MAX_DATA_LEN; } mbuf->l2_len = first_pbuf->l2_len; mbuf->l3_len = first_pbuf->l3_len; diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c index c505822..54839af 100644 --- a/src/ltran/ltran_dfx.c +++ b/src/ltran/ltran_dfx.c @@ -568,7 +568,7 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) printf("read_lwip_drop: %-13"PRIu64" \n", lstack_stat->data.pkts.stack_stat.read_lwip_drop); printf("app_write: %-18"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_write_cnt); printf("write_lwip: %-17"PRIu64" ", lstack_stat->data.pkts.stack_stat.write_lwip_cnt); - printf("app_get_idlefail: %-11"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_idlefail); + printf("app_write_rpc: %-14"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_rpc); printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list_cnt); printf("send_list: %-18"PRIu64" ", lstack_stat->data.pkts.send_list_cnt); printf("conn_num: %-19hu \n", lstack_stat->data.pkts.conn_num); @@ -875,8 +875,8 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ printf("Active Internet connections (servers and established)\n"); do { printf("\n------ stack tid: %6u ------\n", stat->tid); - printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt fd Local Address " - " Foreign Address State\n"); + printf("No. Proto lwip_recv recv_ring recv_all in_send send_ring send_back_cnt send_all " + "fd Local Address Foreign Address State\n"); uint32_t unread_pkts = 0; uint32_t unsend_pkts = 0; for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) { @@ -885,20 +885,21 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ rip.s_addr = conn_info->rip; lip.s_addr = conn_info->lip; if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) { - printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%-7d%s:%hu %s:%hu %s\n", i, conn_info->recv_cnt, - conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->sem_cnt, + printf("%-6utcp %-11u%-11u%-10lu%-9u%-11u%-15d%-10lu%-7d%s:%hu %s:%hu %s\n", i, + conn_info->recv_cnt, conn_info->recv_ring_cnt, conn_info->recv_all, conn_info->in_send, + conn_info->send_ring_cnt, conn_info->send_back_cnt, conn_info->send_all, conn_info->fd, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port, tcp_state_to_str(conn_info->tcp_sub_state)); } else if (conn_info->state == GAZELLE_LISTEN_LIST) { - printf("%-6utcp %-50u%-7d%s:%hu 0.0.0.0:* LISTEN\n", i, conn_info->recv_cnt, + printf("%-6utcp %-57u%-7d%s:%hu 0.0.0.0:* LISTEN\n", i, conn_info->recv_cnt, conn_info->fd, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port); } else { printf("Got unknow tcp conn::%s:%5hu, state:%u\n", inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, conn_info->state); } - unread_pkts += conn_info->recv_ring_cnt; - unsend_pkts += conn_info->send_ring_cnt; + unread_pkts += conn_info->recv_ring_cnt + conn_info->recv_cnt; + unsend_pkts += conn_info->send_ring_cnt + conn_info->in_send + conn_info->send_back_cnt; } if (conn->conn_num > 0) { printf("Total unread pkts:%u unsend pkts:%u\n", unread_pkts, unsend_pkts); -- 2.23.0