From 18a911bfd87c4b558740a586728e2bbec9813a6f Mon Sep 17 00:00:00 2001 From: wuchangsheng Date: Fri, 11 Mar 2022 16:21:43 +0800 Subject: [PATCH 19/34] fix repeate msg --- src/common/gazelle_dfx_msg.h | 1 + src/lstack/core/lstack_dpdk.c | 2 +- src/lstack/core/lstack_lwip.c | 25 ++++--------- src/lstack/core/lstack_protocol_stack.c | 2 +- src/lstack/core/lstack_thread_rpc.c | 63 ++++++++++++++++++--------------- src/lstack/include/lstack_thread_rpc.h | 3 +- src/ltran/ltran_dfx.c | 3 +- 7 files changed, 46 insertions(+), 53 deletions(-) diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h index e681424..41cbefa 100644 --- a/src/common/gazelle_dfx_msg.h +++ b/src/common/gazelle_dfx_msg.h @@ -89,6 +89,7 @@ struct gazelle_stat_pkts { uint64_t event_null; uint64_t remove_event; uint64_t send_self_rpc; + uint64_t call_null; }; /* same as define in lwip/stats.h - struct stats_mib2 */ diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index 0544943..b8320db 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -133,7 +133,7 @@ static struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_i if (ret < 0) { return NULL; } - pool = rte_mempool_create(pool_name, CALL_POOL_SZ, sizeof(struct rpc_msg), CALL_CACHE_SZ, 0, NULL, NULL, NULL, + pool = rte_mempool_create(pool_name, CALL_POOL_SZ, sizeof(struct rpc_msg), 0, 0, NULL, NULL, NULL, NULL, rte_socket_id(), 0); if (pool == NULL) { LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno); diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 90ddecc..1fc8446 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -26,6 +26,7 @@ #include "lstack_protocol_stack.h" #include "lstack_log.h" #include "lstack_weakup.h" +#include "lstack_dpdk.h" #include "lstack_stack_stat.h" #include "lstack_lwip.h" @@ -82,9 +83,9 @@ static void reset_sock_data(struct lwip_sock *sock) } } - void gazelle_init_sock(int32_t fd) { + static uint32_t name_tick = 0; struct lwip_sock *sock = get_socket(fd); if (sock == NULL) { return; @@ -92,29 +93,15 @@ void gazelle_init_sock(int32_t fd) reset_sock_data(sock); - int32_t ret; - char name[RTE_RING_NAMESIZE] = {0}; - static uint32_t name_tick = 0; - - ret = snprintf_s(name, sizeof(name), RTE_RING_NAMESIZE - 1, "%s_%d", "sock_recv", name_tick++); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "%s create failed.\n", name); - return; - } - sock->recv_ring = rte_ring_create(name, SOCK_RECV_RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + sock->recv_ring = create_ring("sock_recv", SOCK_RECV_RING_SIZE, 0, name_tick++); if (sock->recv_ring == NULL) { - LSTACK_LOG(ERR, LSTACK, "%s create failed. errno: %d.\n", name, rte_errno); + LSTACK_LOG(ERR, LSTACK, "sock_recv create failed. errno: %d.\n", rte_errno); return; } - ret = snprintf_s(name, sizeof(name), RTE_RING_NAMESIZE - 1, "%s_%d", "sock_send", name_tick++); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "%s create failed. errno: %d.\n", name, rte_errno); - return; - } - sock->send_ring = rte_ring_create(name, SOCK_SEND_RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); + sock->send_ring = create_ring("sock_send", SOCK_SEND_RING_SIZE, 0, name_tick++); if (sock->send_ring == NULL) { - LSTACK_LOG(ERR, LSTACK, "%s create failed. errno: %d.\n", name, rte_errno); + LSTACK_LOG(ERR, LSTACK, "sock_send create failed. errno: %d.\n", rte_errno); return; } diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 939543b..db8a20a 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -318,7 +318,7 @@ static void* gazelle_stack_thread(void *arg) stack_thread_init(stack); for (;;) { - poll_rpc_msg(&stack->rpc_queue, stack->rpc_pool); + poll_rpc_msg(stack); eth_dev_poll(); diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index af5fad3..b4d57d3 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -24,12 +24,12 @@ #define HANDLE_RPC_MSG_MAX (8) static inline __attribute__((always_inline)) -struct rpc_msg *rpc_msg_alloc(struct rte_mempool *pool, rpc_msg_func func) +struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) { int32_t ret; struct rpc_msg *msg = NULL; - ret = rte_mempool_get(pool, (void **)&msg); + ret = rte_mempool_get(stack->rpc_pool, (void **)&msg); if (ret < 0) { get_protocol_stack_group()->call_alloc_fail++; return NULL; @@ -46,6 +46,9 @@ static inline __attribute__((always_inline)) void rpc_msg_free(struct rte_mempool *pool, struct rpc_msg *msg) { pthread_spin_destroy(&msg->lock); + + msg->self_release = 0; + msg->func = NULL; rte_mempool_put(pool, (void *)msg); } @@ -71,7 +74,7 @@ int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rp return ret; } -void poll_rpc_msg(lockless_queue *rpc_queue, struct rte_mempool *rpc_pool) +void poll_rpc_msg(struct protocol_stack *stack) { int32_t num; struct rpc_msg *msg = NULL; @@ -79,7 +82,7 @@ void poll_rpc_msg(lockless_queue *rpc_queue, struct rte_mempool *rpc_pool) num = 0; lockless_queue_node *first_node = NULL; while (num++ < HANDLE_RPC_MSG_MAX) { - lockless_queue_node *node = lockless_queue_mpsc_pop(rpc_queue); + lockless_queue_node *node = lockless_queue_mpsc_pop(&stack->rpc_queue); if (node == NULL) { return; } @@ -91,6 +94,8 @@ void poll_rpc_msg(lockless_queue *rpc_queue, struct rte_mempool *rpc_pool) if (msg->func) { msg->func(msg); + } else { + stack->stats.call_null++; } rte_mb(); @@ -98,7 +103,7 @@ void poll_rpc_msg(lockless_queue *rpc_queue, struct rte_mempool *rpc_pool) if (msg->self_release) { pthread_spin_unlock(&msg->lock); } else { - rpc_msg_free(rpc_pool, msg); + rpc_msg_free(stack->rpc_pool, msg); } if (first_node == node) { @@ -109,7 +114,7 @@ void poll_rpc_msg(lockless_queue *rpc_queue, struct rte_mempool *rpc_pool) int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, get_lwip_conntable); + struct rpc_msg *msg = rpc_msg_alloc(stack, get_lwip_conntable); if (msg == NULL) { return -1; } @@ -122,7 +127,7 @@ int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint3 int32_t rpc_call_connnum(struct protocol_stack *stack) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, get_lwip_connnum); + struct rpc_msg *msg = rpc_msg_alloc(stack, get_lwip_connnum); if (msg == NULL) { return -1; } @@ -132,7 +137,7 @@ int32_t rpc_call_connnum(struct protocol_stack *stack) int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, create_shadow_fd); + struct rpc_msg *msg = rpc_msg_alloc(stack, create_shadow_fd); if (msg == NULL) { return -1; } @@ -152,7 +157,7 @@ static void rpc_msgcnt(struct rpc_msg *msg) int32_t rpc_call_msgcnt(struct protocol_stack *stack) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, rpc_msgcnt); + struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_msgcnt); if (msg == NULL) { return -1; } @@ -162,7 +167,7 @@ int32_t rpc_call_msgcnt(struct protocol_stack *stack) int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, thread_register_phase1); + struct rpc_msg *msg = rpc_msg_alloc(stack, thread_register_phase1); if (msg == NULL) { return -1; } @@ -172,7 +177,7 @@ int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn) int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, thread_register_phase2); + struct rpc_msg *msg = rpc_msg_alloc(stack, thread_register_phase2); if (msg == NULL) { return -1; } @@ -182,7 +187,7 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_recvlist_count); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count); if (msg == NULL) { return -1; } @@ -199,7 +204,7 @@ static void rpc_replenish_idlembuf(struct rpc_msg *msg) void rpc_call_replenish_idlembuf(struct protocol_stack *stack) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, rpc_replenish_idlembuf); + struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_replenish_idlembuf); if (msg == NULL) { return; } @@ -210,7 +215,7 @@ void rpc_call_replenish_idlembuf(struct protocol_stack *stack) int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf) { - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_arp); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp); if (msg == NULL) { return -1; } @@ -225,7 +230,7 @@ int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf) int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol) { struct protocol_stack *stack = get_minconn_protocol_stack(); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_socket); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_socket); if (msg == NULL) { return -1; } @@ -240,7 +245,7 @@ int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol) int32_t rpc_call_close(int fd) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_close); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_close); if (msg == NULL) { return -1; } @@ -253,7 +258,7 @@ int32_t rpc_call_close(int fd) int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_bind); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_bind); if (msg == NULL) { return -1; } @@ -268,7 +273,7 @@ int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen int32_t rpc_call_listen(int s, int backlog) { struct protocol_stack *stack = get_protocol_stack_by_fd(s); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_listen); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_listen); if (msg == NULL) { return -1; } @@ -282,7 +287,7 @@ int32_t rpc_call_listen(int s, int backlog) int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_accept); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_accept); if (msg == NULL) { return -1; } @@ -297,7 +302,7 @@ int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen) int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_connect); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_connect); if (msg == NULL) { return -1; } @@ -312,7 +317,7 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_getpeername); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getpeername); if (msg == NULL) { return -1; } @@ -327,7 +332,7 @@ int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_getsockname); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getsockname); if (msg == NULL) { return -1; } @@ -342,7 +347,7 @@ int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_getsockopt); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getsockopt); if (msg == NULL) { return -1; } @@ -359,7 +364,7 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_setsockopt); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_setsockopt); if (msg == NULL) { return -1; } @@ -376,7 +381,7 @@ int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, int32_t rpc_call_fcntl(int fd, int cmd, long val) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_fcntl); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_fcntl); if (msg == NULL) { return -1; } @@ -391,7 +396,7 @@ int32_t rpc_call_fcntl(int fd, int cmd, long val) int32_t rpc_call_ioctl(int fd, long cmd, void *argp) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_ioctl); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_ioctl); if (msg == NULL) { return -1; } @@ -437,7 +442,7 @@ static void stack_send(struct rpc_msg *msg) ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_send); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_send); if (msg == NULL) { return -1; } @@ -454,7 +459,7 @@ ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags) int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_sendmsg); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendmsg); if (msg == NULL) { return -1; } @@ -469,7 +474,7 @@ int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags) int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack->rpc_pool, stack_recvmsg); + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvmsg); if (msg == NULL) { return -1; } diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index 8a05d6c..1365234 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -47,9 +47,8 @@ struct rpc_msg { union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */ }; -void poll_rpc_msg(lockless_queue *rpc_queue, struct rte_mempool *rpc_pool); - struct protocol_stack; +void poll_rpc_msg(struct protocol_stack *stack); void rpc_call_replenish_idlembuf(struct protocol_stack *stack); int32_t rpc_call_msgcnt(struct protocol_stack *stack); int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c index a01d91f..4b46ac9 100644 --- a/src/ltran/ltran_dfx.c +++ b/src/ltran/ltran_dfx.c @@ -576,7 +576,8 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail); printf("event_null: %-17"PRIu64" ", lstack_stat->data.pkts.event_null); printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event); - printf("send_self_rpc: %-14"PRIu64" \n", lstack_stat->data.pkts.send_self_rpc); + printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc); + printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null); } static void gazelle_print_lstack_stat_detail(struct gazelle_stack_dfx_data *lstack_stat, -- 1.8.3.1