From c095a3145da08e976bb6e1e2a2abc6bc3d3aac31 Mon Sep 17 00:00:00 2001 From: jianheng Date: Sun, 12 Mar 2023 01:30:34 +0800 Subject: [PATCH] add pbuf lock when aggregate pbuf reduce invalid send rpc count --- src/common/gazelle_dfx_msg.h | 1 - src/lstack/core/lstack_cfg.c | 8 -- src/lstack/core/lstack_lwip.c | 150 ++++++++------------- src/lstack/core/lstack_protocol_stack.c | 4 - src/lstack/core/lstack_stack_stat.c | 3 - src/lstack/core/lstack_thread_rpc.c | 46 +------ src/lstack/include/lstack_cfg.h | 1 - src/lstack/include/lstack_lwip.h | 1 - src/lstack/include/lstack_protocol_stack.h | 1 - src/lstack/include/lstack_thread_rpc.h | 25 +++- src/lstack/lstack.conf | 2 - src/ltran/ltran_dfx.c | 1 - 12 files changed, 87 insertions(+), 156 deletions(-) diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h index 674f2d7..608a023 100644 --- a/src/common/gazelle_dfx_msg.h +++ b/src/common/gazelle_dfx_msg.h @@ -81,7 +81,6 @@ struct gazelle_stat_pkts { uint16_t conn_num; uint64_t recv_list_cnt; uint64_t call_alloc_fail; - uint64_t send_list_cnt; uint32_t mempool_freecnt; struct gazelle_stack_stat stack_stat; struct gazelle_wakeup_stat wakeup_stat; diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c index 86d0f14..9195f34 100644 --- a/src/lstack/core/lstack_cfg.c +++ b/src/lstack/core/lstack_cfg.c @@ -58,7 +58,6 @@ static int32_t parse_listen_shadow(void); static int32_t parse_app_bind_numa(void); static int32_t parse_main_thread_affinity(void); static int32_t parse_unix_prefix(void); -static int32_t parse_send_connect_number(void); static int32_t parse_read_connect_number(void); static int32_t parse_rpc_number(void); static int32_t parse_nic_read_number(void); @@ -108,7 +107,6 @@ static struct config_vector_t g_config_tbl[] = { { "unix_prefix", parse_unix_prefix }, { "tcp_conn_count", parse_tcp_conn_count }, { "mbuf_count_per_conn", parse_mbuf_count_per_conn }, - { "send_connect_number", parse_send_connect_number }, { "read_connect_number", parse_read_connect_number }, { "rpc_number", parse_rpc_number }, { "nic_read_number", parse_nic_read_number }, @@ -734,12 +732,6 @@ static int32_t parse_mbuf_count_per_conn(void) MBUF_COUNT_PER_CONN, 1, INT32_MAX); } -static int32_t parse_send_connect_number(void) -{ - return parse_int(&g_config_params.send_connect_number, "send_connect_number", - STACK_THREAD_DEFAULT, 1, INT32_MAX); -} - static int32_t parse_read_connect_number(void) { return parse_int(&g_config_params.read_connect_number, "read_connect_number", diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index dcd7e05..e83bffa 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -94,7 +94,7 @@ static void reset_sock_data(struct lwip_sock *sock) sock->listen_next = NULL; sock->epoll_events = 0; sock->events = 0; - sock->in_send = 0; + sock->call_num = 0; sock->remain_len = 0; if (sock->recv_lastdata) { @@ -117,9 +117,10 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u pbuf->l4_len = 0; pbuf->header_off = 0; pbuf->rexmit = 0; - pbuf->in_write = 0; + pbuf->allow_in = 1; pbuf->head = 0; pbuf->last = pbuf; + pthread_spin_init(&pbuf->pbuf_lock, PTHREAD_PROCESS_SHARED); } return pbuf; @@ -193,7 +194,6 @@ void gazelle_init_sock(int32_t fd) sock->stack->conn_num++; init_list_node_null(&sock->recv_list); init_list_node_null(&sock->event_list); - init_list_node_null(&sock->send_list); } void gazelle_clean_sock(int32_t fd) @@ -214,7 +214,6 @@ void gazelle_clean_sock(int32_t fd) reset_sock_data(sock); list_del_node_null(&sock->recv_list); - list_del_node_null(&sock->send_list); } void gazelle_free_pbuf(struct pbuf *pbuf) @@ -264,11 +263,16 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8 if (unlikely(sock->send_pre_del)) { pbuf = sock->send_pre_del; - if (pbuf->tot_len > remain_size || - (pbuf->head && __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE))) { + pthread_spin_lock(&pbuf->pbuf_lock); + if (pbuf->tot_len > remain_size) { + pthread_spin_unlock(&pbuf->pbuf_lock); *apiflags &= ~TCP_WRITE_FLAG_MORE; return NULL; } + if (pbuf->allow_in == 1) { + __sync_fetch_and_sub(&pbuf->allow_in, 1); + } + pthread_spin_unlock(&pbuf->pbuf_lock); if (pbuf->next) { sock->send_lastdata = pbuf->next; @@ -295,10 +299,24 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8 } sock->send_pre_del = pbuf; - if (pbuf->tot_len > remain_size || __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE)) { - *apiflags &= ~TCP_WRITE_FLAG_MORE; - pbuf->head = 1; - return NULL; + if (!gazelle_ring_readover_count(sock->send_ring)) { + pthread_spin_lock(&pbuf->pbuf_lock); + if (pbuf->tot_len > remain_size) { + pthread_spin_unlock(&pbuf->pbuf_lock); + *apiflags &= ~TCP_WRITE_FLAG_MORE; + pbuf->head = 1; + return NULL; + } + if(pbuf->allow_in == 1){ + __sync_fetch_and_sub(&pbuf->allow_in, 1); + } + pthread_spin_unlock(&pbuf->pbuf_lock); + } else { + if (pbuf->tot_len > remain_size) { + *apiflags &= ~TCP_WRITE_FLAG_MORE; + pbuf->head = 1; + return NULL; + } } sock->send_lastdata = pbuf->next; @@ -442,18 +460,17 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r) struct pbuf *last_pbuf = NULL; volatile uint32_t tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); uint32_t last = r->prod.tail - 1; - if (last == tail || last - tail > r->capacity) { + if (last + 1 == tail || last + 1 - tail > r->capacity) { return NULL; } __rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1); - __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE); - - rte_mb(); - tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); - if (last == tail || last - tail > r->capacity) { - __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE); + if (pthread_spin_trylock(&last_pbuf->pbuf_lock) != 0) { + return NULL; + } + if (last_pbuf->allow_in != 1) { + pthread_spin_unlock(&last_pbuf->pbuf_lock); return NULL; } @@ -462,7 +479,7 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r) static inline void gazelle_ring_lastover(struct pbuf *last_pbuf) { - __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE); + pthread_spin_unlock(&last_pbuf->pbuf_lock); } static inline size_t merge_data_lastpbuf(struct lwip_sock *sock, void *buf, size_t len) @@ -623,56 +640,28 @@ void stack_send(struct rpc_msg *msg) { int32_t fd = msg->args[MSG_ARG_0].i; struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_3].p; + bool replenish_again; struct lwip_sock *sock = get_socket(fd); if (sock == NULL) { msg->result = -1; + LSTACK_LOG(ERR, LSTACK, "stack_send: sock error!\n"); + rpc_msg_free(msg); return; } - __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE); - rte_mb(); - - /* have remain data or replenish again add sendlist */ - if (sock->errevent == 0 && NETCONN_IS_DATAOUT(sock)) { - if (list_is_null(&sock->send_list)) { - list_add_node(&stack->send_list, &sock->send_list); - __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); - } - } -} - -void send_stack_list(struct protocol_stack *stack, uint32_t send_max) -{ - struct list_node *node, *temp; - struct lwip_sock *sock; - uint32_t read_num = 0; - bool replenish_again; - - list_for_each_safe(node, temp, &stack->send_list) { - sock = container_of(node, struct lwip_sock, send_list); - - if (++read_num > send_max) { - /* list head move to next send */ - list_del_node(&stack->send_list); - list_add_node(&sock->send_list, &stack->send_list); - break; - } - - __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE); - rte_mb(); - - if (sock->conn == NULL || sock->errevent > 0) { - list_del_node_null(&sock->send_list); - continue; - } - - replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0); - - if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) { - list_del_node_null(&sock->send_list); + replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0); + __sync_fetch_and_sub(&sock->call_num, 1); + if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) { + rpc_msg_free(msg); + return; + } else { + if(__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 0){ + rpc_call(&stack->rpc_queue, msg); + __sync_fetch_and_add(&sock->call_num, 1); } else { - __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); + rpc_msg_free(msg); + return; } } } @@ -686,29 +675,6 @@ static inline void free_recv_ring_readover(struct rte_ring *ring) } } -static inline struct pbuf *gazelle_ring_enqueuelast(struct rte_ring *r) -{ - struct pbuf *last_pbuf = NULL; - volatile uint32_t head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE); - uint32_t last = r->cons.head - 1; - if (last == head || last - head > r->capacity) { - return NULL; - } - - __rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1); - __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE); - - rte_mb(); - - head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE); - if (last == head || last - head > r->capacity) { - __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE); - return NULL; - } - - return last_pbuf; -} - static inline struct pbuf *pbuf_last(struct pbuf *pbuf) { while (pbuf->next) { @@ -824,11 +790,14 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags) static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) { - if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) { - __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); - if (rpc_call_send(fd, NULL, len, flags) != 0) { - __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE); + if(__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) >= 2){ + ;; + } else { + while (rpc_call_send(fd, NULL, len, flags) < 0) { + usleep(1000); // wait 1ms to exec again + LSTACK_LOG(INFO, LSTACK, "rpc_call_send failed, try again\n"); } + __sync_fetch_and_add(&sock->call_num, 1); } } @@ -1240,13 +1209,6 @@ void stack_mempool_size(struct rpc_msg *msg) msg->result = rte_mempool_avail_count(stack->rxtx_pktmbuf_pool); } -void stack_sendlist_count(struct rpc_msg *msg) -{ - struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p; - - msg->result = get_list_count(&stack->send_list); -} - void stack_recvlist_count(struct rpc_msg *msg) { struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p; diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index e13034f..300d7af 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -279,7 +279,6 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id) stack->lwip_stats = &lwip_stats; init_list_node(&stack->recv_list); - init_list_node(&stack->send_list); init_list_node(&stack->wakeup_list); sys_calibrate_tsc(); @@ -417,7 +416,6 @@ static void* gazelle_stack_thread(void *arg) struct cfg_params *cfg = get_global_cfg_params(); bool use_ltran_flag = cfg->use_ltran;; bool kni_switch = cfg->kni_switch; - uint32_t send_connect_number = cfg->send_connect_number; uint32_t read_connect_number = cfg->read_connect_number; uint32_t rpc_number = cfg->rpc_number; uint32_t nic_read_number = cfg->nic_read_number; @@ -441,8 +439,6 @@ static void* gazelle_stack_thread(void *arg) for (;;) { poll_rpc_msg(stack, rpc_number); - send_stack_list(stack, send_connect_number); - gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number); read_recv_list(stack, read_connect_number); diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c index 75322d5..a39e372 100644 --- a/src/lstack/core/lstack_stack_stat.c +++ b/src/lstack/core/lstack_stack_stat.c @@ -152,9 +152,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ rpc_call_result = rpc_call_recvlistcnt(stack); dfx->data.pkts.recv_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; - rpc_call_result = rpc_call_sendlistcnt(stack); - dfx->data.pkts.send_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; - dfx->data.pkts.conn_num = stack->conn_num; } diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index 08fe20d..fe3b757 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -23,14 +23,6 @@ #include "lstack_dpdk.h" #include "lstack_thread_rpc.h" -#define RPC_MSG_MAX 512 -#define RPC_MSG_MASK (RPC_MSG_MAX - 1) -struct rpc_msg_pool { - struct rpc_msg msgs[RPC_MSG_MAX]; - uint32_t prod __rte_cache_aligned; - uint32_t cons __rte_cache_aligned; -}; - static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL; static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool) @@ -76,21 +68,6 @@ static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func return msg; } -static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg) -{ - pthread_spin_destroy(&msg->lock); - - msg->self_release = 0; - msg->func = NULL; - - atomic_fetch_add((_Atomic uint32_t *)&msg->pool->cons, 1); -} - -static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg) -{ - lockless_queue_mpsc_push(queue, &msg->queue_node); -} - static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg) { int32_t ret; @@ -124,10 +101,13 @@ void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num) stack->stats.call_null++; } - if (msg->self_release) { - pthread_spin_unlock(&msg->lock); - } else { - rpc_msg_free(msg); + /* stack_send free msg in stack_send */ + if (msg->func != stack_send) { + if (msg->self_release) { + pthread_spin_unlock(&msg->lock); + } else { + rpc_msg_free(msg); + } } } } @@ -217,18 +197,6 @@ int32_t rpc_call_mempoolsize(struct protocol_stack *stack) return rpc_sync_call(&stack->rpc_queue, msg); } -int32_t rpc_call_sendlistcnt(struct protocol_stack *stack) -{ - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendlist_count); - if (msg == NULL) { - return -1; - } - - msg->args[MSG_ARG_0].p = stack; - - return rpc_sync_call(&stack->rpc_queue, msg); -} - int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) { struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count); diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h index 5f16c19..2705fee 100644 --- a/src/lstack/include/lstack_cfg.h +++ b/src/lstack/include/lstack_cfg.h @@ -76,7 +76,6 @@ struct cfg_params { uint32_t lpm_pkts_in_detect; uint32_t tcp_conn_count; uint32_t mbuf_count_per_conn; - uint32_t send_connect_number; uint32_t read_connect_number; uint32_t rpc_number; uint32_t nic_read_number; diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h index 6f5b4f4..02110e0 100644 --- a/src/lstack/include/lstack_lwip.h +++ b/src/lstack/include/lstack_lwip.h @@ -35,7 +35,6 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags); void read_recv_list(struct protocol_stack *stack, uint32_t max_num); void send_stack_list(struct protocol_stack *stack, uint32_t send_max); void add_recv_list(int32_t fd); -void stack_sendlist_count(struct rpc_msg *msg); void get_lwip_conntable(struct rpc_msg *msg); void get_lwip_connnum(struct rpc_msg *msg); void stack_recvlist_count(struct rpc_msg *msg); diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index 795db39..11b001c 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -68,7 +68,6 @@ struct protocol_stack { struct rte_mbuf *pkts[RTE_TEST_RX_DESC_DEFAULT]; struct list_node recv_list; - struct list_node send_list; struct list_node wakeup_list; volatile uint16_t conn_num; diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index aff30dc..ed111fb 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -24,6 +24,10 @@ #define MSG_ARG_3 (3) #define MSG_ARG_4 (4) #define RPM_MSG_ARG_SIZE (5) + +#define RPC_MSG_MAX 512 +#define RPC_MSG_MASK (RPC_MSG_MAX - 1) + struct rpc_msg; typedef void (*rpc_msg_func)(struct rpc_msg *msg); union rpc_msg_arg { @@ -48,6 +52,12 @@ struct rpc_msg { union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */ }; +struct rpc_msg_pool { + struct rpc_msg msgs[RPC_MSG_MAX]; + uint32_t prod __rte_cache_aligned; + uint32_t cons __rte_cache_aligned; +}; + struct protocol_stack; struct rte_mbuf; struct wakeup_poll; @@ -57,7 +67,6 @@ void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wake int32_t rpc_call_msgcnt(struct protocol_stack *stack); int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); int32_t rpc_call_recvlistcnt(struct protocol_stack *stack); -int32_t rpc_call_sendlistcnt(struct protocol_stack *stack); int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn); int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn); int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn); @@ -79,4 +88,18 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp); int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock); int32_t rpc_call_mempoolsize(struct protocol_stack *stack); +static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg) +{ + lockless_queue_mpsc_push(queue, &msg->queue_node); +} + +static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg) +{ + pthread_spin_destroy(&msg->lock); + + msg->self_release = 0; + + __atomic_fetch_add((_Atomic uint32_t *)&msg->pool->cons, 1, __ATOMIC_SEQ_CST); +} + #endif diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf index a4571ff..cf81954 100644 --- a/src/lstack/lstack.conf +++ b/src/lstack/lstack.conf @@ -28,8 +28,6 @@ send_ring_size = 256 expand_send_ring = 0 #protocol stack thread per loop params -#send connect to nic -send_connect_number = 4 #read data form protocol stack into recv_ring read_connect_number = 4 #process rpc msg number diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c index 1c9d4fa..051787e 100644 --- a/src/ltran/ltran_dfx.c +++ b/src/ltran/ltran_dfx.c @@ -572,7 +572,6 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) printf("write_lwip: %-17"PRIu64" ", lstack_stat->data.pkts.stack_stat.write_lwip_cnt); printf("app_write_rpc: %-14"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_rpc); printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list_cnt); - printf("send_list: %-18"PRIu64" ", lstack_stat->data.pkts.send_list_cnt); printf("conn_num: %-19hu \n", lstack_stat->data.pkts.conn_num); printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.stack_stat.wakeup_events); printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_events); -- 2.23.0