From aa422a75961523de411ec849fd1f4e45da4477ac Mon Sep 17 00:00:00 2001 From: wuchangsheng Date: Mon, 14 Mar 2022 20:32:39 +0800 Subject: [PATCH 25/34] fix event miss --- src/common/gazelle_dfx_msg.h | 1 + src/lstack/api/lstack_epoll.c | 6 ++-- src/lstack/core/lstack_lwip.c | 54 ++++++++++++++++++++++++++---- src/lstack/core/lstack_protocol_stack.c | 31 +++++++++++++++++ src/lstack/core/lstack_stack_stat.c | 27 +++++++++++---- src/lstack/core/lstack_thread_rpc.c | 38 ++++++--------------- src/lstack/include/lstack_lwip.h | 2 ++ src/lstack/include/lstack_protocol_stack.h | 1 + src/lstack/include/lstack_thread_rpc.h | 1 + src/lstack/include/lstack_weakup.h | 2 +- src/ltran/ltran_dfx.c | 3 +- 11 files changed, 120 insertions(+), 46 deletions(-) diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h index ae20436..de669f5 100644 --- a/src/common/gazelle_dfx_msg.h +++ b/src/common/gazelle_dfx_msg.h @@ -95,6 +95,7 @@ struct gazelle_stat_pkts { uint64_t epoll_pending_call; uint64_t epoll_self_call; uint64_t epoll_self_event; + uint64_t send_list; }; /* same as define in lwip/stats.h - struct stats_mib2 */ diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c index a686ddb..cf072b0 100644 --- a/src/lstack/api/lstack_epoll.c +++ b/src/lstack/api/lstack_epoll.c @@ -61,7 +61,7 @@ static inline bool report_events(struct lwip_sock *sock, uint32_t event) return true; } - if (sock->have_event) { + if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) { return false; } @@ -92,7 +92,7 @@ void add_epoll_event(struct netconn *conn, uint32_t event) list_add_node(&sock->stack->event_list, &sock->event_list); } } else { - sock->have_event = true; + __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); sock->stack->stats.weakup_events++; } } @@ -302,7 +302,7 @@ static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t m if (sock->stack == NULL) { return true; } - sock->have_event = false; + __atomic_store_n(&sock->have_event, false, __ATOMIC_RELEASE); if (remove_event(etype, weakup->sock_list, event_num, sock)) { sock->stack->stats.remove_event++; diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index d55f1e6..d35a217 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -112,6 +112,7 @@ void gazelle_init_sock(int32_t fd) init_list_node(&sock->listen_list); init_list_node(&sock->event_list); init_list_node(&sock->wakeup_list); + init_list_node(&sock->send_list); } void gazelle_clean_sock(int32_t fd) @@ -130,6 +131,7 @@ void gazelle_clean_sock(int32_t fd) list_del_node_init(&sock->listen_list); list_del_node_init(&sock->event_list); list_del_node_init(&sock->wakeup_list); + list_del_node_init(&sock->send_list); } void gazelle_free_pbuf(struct pbuf *pbuf) @@ -280,12 +282,12 @@ void add_self_event(struct lwip_sock *sock, uint32_t events) sock->events |= events; - if (sock->have_event) { + if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) { return; } if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) { - sock->have_event = true; + __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); sem_post(&sock->weakup->event_sem); stack->stats.epoll_self_event++; } else { @@ -346,6 +348,34 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) return send_len; } +void stack_send(struct rpc_msg *msg) +{ + int32_t fd = msg->args[MSG_ARG_0].i; + int32_t flags = msg->args[MSG_ARG_2].i; + + struct protocol_stack *stack = get_protocol_stack(); + struct lwip_sock *sock = get_socket(fd); + if (sock == NULL) { + msg->result = -1; + return; + } + + msg->result = write_lwip_data(sock, fd, flags); + __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE); + + if (msg->result >= 0 && rte_ring_count(sock->send_ring)) { + if (list_is_empty(&sock->send_list)) { + __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); + list_add_node(&stack->send_list, &sock->send_list); + sock->stack->stats.send_self_rpc++; + } + } + + if (rte_ring_free_count(sock->send_ring)) { + add_epoll_event(sock->conn, EPOLLOUT); + } +} + ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) { if (sock->conn->recvmbox == NULL) { @@ -448,14 +478,19 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags) GAZELLE_RETURN(EINVAL); } + sock->send_flags = flags; ssize_t send = write_stack_data(sock, buf, len); - if (send < 0 || sock->have_rpc_send) { - return send; + + ssize_t ret = 0; + if (!__atomic_load_n(&sock->have_rpc_send, __ATOMIC_ACQUIRE)) { + __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); + ret = rpc_call_send(fd, buf, len, flags); } - sock->have_rpc_send = true; - ssize_t ret = rpc_call_send(fd, buf, len, flags); - return (ret < 0) ? ret : send; + if (send <= 0 || ret < 0) { + GAZELLE_RETURN(EAGAIN); + } + return send; } ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags) @@ -743,6 +778,11 @@ void stack_eventlist_count(struct rpc_msg *msg) msg->result = get_list_count(&get_protocol_stack()->event_list); } +void stack_sendlist_count(struct rpc_msg *msg) +{ + msg->result = get_list_count(&get_protocol_stack()->send_list); +} + void stack_recvlist_count(struct rpc_msg *msg) { msg->result = get_list_count(&get_protocol_stack()->recv_list); diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 45649fe..4ba851a 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -194,6 +194,7 @@ int32_t init_protocol_stack(void) init_list_node(&stack->recv_list); init_list_node(&stack->listen_list); init_list_node(&stack->event_list); + init_list_node(&stack->send_list); stack_group->stacks[i] = stack; } @@ -324,6 +325,7 @@ static void report_stack_event(struct protocol_stack *stack) sock = container_of(node, struct lwip_sock, event_list); if (weakup_enqueue(stack->weakup_ring, sock) == 0) { + __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); list_del_node_init(&sock->event_list); stack->stats.weakup_events++; } else { @@ -332,6 +334,33 @@ static void report_stack_event(struct protocol_stack *stack) } } +static void send_stack_list(struct protocol_stack *stack) +{ + struct list_node *list = &(stack->send_list); + struct list_node *node, *temp; + struct lwip_sock *sock; + + list_for_each_safe(node, temp, list) { + sock = container_of(node, struct lwip_sock, send_list); + + if (sock->conn == NULL) { + continue; + } + + ssize_t ret = write_lwip_data(sock, sock->conn->socket, sock->send_flags); + __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE); + if (ret >= 0 && rte_ring_count(sock->send_ring)) { + __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); + } else { + list_del_node_init(&sock->send_list); + } + + if (rte_ring_free_count(sock->send_ring)) { + add_epoll_event(sock->conn, EPOLLOUT); + } + } +} + static void* gazelle_stack_thread(void *arg) { struct protocol_stack *stack = (struct protocol_stack *)arg; @@ -348,6 +377,8 @@ static void* gazelle_stack_thread(void *arg) sys_timer_run(); report_stack_event(stack); + + send_stack_list(stack); } return NULL; diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c index b7b94e2..9a8fd08 100644 --- a/src/lstack/core/lstack_stack_stat.c +++ b/src/lstack/core/lstack_stack_stat.c @@ -107,11 +107,22 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail; dfx->data.pkts.weakup_ring_cnt = rte_ring_count(stack->weakup_ring); dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring); - dfx->data.pkts.call_msg_cnt = rpc_call_msgcnt(stack); - dfx->data.pkts.recv_list = rpc_call_recvlistcnt(stack); - dfx->data.pkts.event_list = rpc_call_eventlistcnt(stack); + + int32_t rpc_call_result = rpc_call_msgcnt(stack); + dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; + + rpc_call_result = rpc_call_recvlistcnt(stack); + dfx->data.pkts.recv_list = (rpc_call_result < 0) ? 0 : rpc_call_result; + + rpc_call_result = rpc_call_eventlistcnt(stack); + dfx->data.pkts.event_list = (rpc_call_result < 0) ? 0 : rpc_call_result; + + rpc_call_result = rpc_call_sendlistcnt(stack); + dfx->data.pkts.send_list = (rpc_call_result < 0) ? 0 : rpc_call_result; + if (stack->wakeup_list) { - dfx->data.pkts.wakeup_list = rpc_call_eventlistcnt(stack); + rpc_call_result = rpc_call_eventlistcnt(stack); + dfx->data.pkts.wakeup_list = (rpc_call_result < 0) ? 0 : rpc_call_result; } dfx->data.pkts.conn_num = stack->conn_num; } @@ -119,6 +130,8 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ static void get_stack_dfx_data(struct gazelle_stack_dfx_data *dfx, struct protocol_stack *stack, enum GAZELLE_STAT_MODE stat_mode) { + int32_t rpc_call_result; + switch (stat_mode) { case GAZELLE_STAT_LSTACK_SHOW: case GAZELLE_STAT_LSTACK_SHOW_RATE: @@ -129,8 +142,10 @@ static void get_stack_dfx_data(struct gazelle_stack_dfx_data *dfx, struct protoc sizeof(stack->lwip_stats->mib2)); break; case GAZELLE_STAT_LSTACK_SHOW_CONN: - dfx->data.conn.conn_num = rpc_call_conntable(stack, dfx->data.conn.conn_list, GAZELLE_LSTACK_MAX_CONN); - dfx->data.conn.total_conn_num = rpc_call_connnum(stack); + rpc_call_result = rpc_call_conntable(stack, dfx->data.conn.conn_list, GAZELLE_LSTACK_MAX_CONN); + dfx->data.conn.conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result; + rpc_call_result = rpc_call_connnum(stack); + dfx->data.conn.total_conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result; break; case GAZELLE_STAT_LSTACK_SHOW_LATENCY: memcpy_s(&dfx->data.latency, sizeof(dfx->data.latency), &stack->latency, sizeof(stack->latency)); diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index 2fb24b4..c95f2c0 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -197,6 +197,16 @@ int32_t rpc_call_eventlistcnt(struct protocol_stack *stack) return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); } +int32_t rpc_call_sendlistcnt(struct protocol_stack *stack) +{ + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendlist_count); + if (msg == NULL) { + return -1; + } + + return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); +} + int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) { struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count); @@ -442,34 +452,6 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp) return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); } -static void stack_send(struct rpc_msg *msg) -{ - int32_t fd = msg->args[MSG_ARG_0].i; - int32_t flags = msg->args[MSG_ARG_2].i; - - struct protocol_stack *stack = get_protocol_stack(); - struct lwip_sock *sock = get_socket(fd); - if (sock == NULL) { - msg->result = -1; - msg->self_release = 0; - return; - } - - msg->result = write_lwip_data(sock, fd, flags); - sock->have_rpc_send = false; - - if (msg->result >= 0 && rte_ring_count(sock->send_ring)) { - sock->have_rpc_send = true; - sock->stack->stats.send_self_rpc++; - msg->self_release = 1; - rpc_call(&stack->rpc_queue, msg); - } - - if (rte_ring_free_count(sock->send_ring)) { - add_epoll_event(sock->conn, EPOLLOUT); - } -} - ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags) { struct protocol_stack *stack = get_protocol_stack_by_fd(fd); diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h index 87442cd..cfd454d 100644 --- a/src/lstack/include/lstack_lwip.h +++ b/src/lstack/include/lstack_lwip.h @@ -33,11 +33,13 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags); ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags); void read_recv_list(void); void add_recv_list(int32_t fd); +void stack_sendlist_count(struct rpc_msg *msg); void stack_eventlist_count(struct rpc_msg *msg); void stack_wakeuplist_count(struct rpc_msg *msg); void get_lwip_conntable(struct rpc_msg *msg); void get_lwip_connnum(struct rpc_msg *msg); void stack_recvlist_count(struct rpc_msg *msg); +void stack_send(struct rpc_msg *msg); void stack_replenish_send_idlembuf(struct protocol_stack *stack); int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num); void gazelle_free_pbuf(struct pbuf *pbuf); diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index dd7633b..5b95dc9 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -52,6 +52,7 @@ struct protocol_stack { struct list_node recv_list; struct list_node listen_list; struct list_node event_list; + struct list_node send_list; struct list_node *wakeup_list; struct gazelle_stat_pkts stats; diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index cffb273..76ba36a 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -55,6 +55,7 @@ int32_t rpc_call_msgcnt(struct protocol_stack *stack); int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); int32_t rpc_call_recvlistcnt(struct protocol_stack *stack); int32_t rpc_call_eventlistcnt(struct protocol_stack *stack); +int32_t rpc_call_sendlistcnt(struct protocol_stack *stack); int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack); int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn); int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn); diff --git a/src/lstack/include/lstack_weakup.h b/src/lstack/include/lstack_weakup.h index 8f7fca2..4f6321e 100644 --- a/src/lstack/include/lstack_weakup.h +++ b/src/lstack/include/lstack_weakup.h @@ -16,7 +16,7 @@ #include #include "lstack_dpdk.h" -#define EPOLL_MAX_EVENTS 256 +#define EPOLL_MAX_EVENTS 512 struct weakup_poll { sem_t event_sem; diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c index 66d6053..a575c33 100644 --- a/src/ltran/ltran_dfx.c +++ b/src/ltran/ltran_dfx.c @@ -582,6 +582,7 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt); printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail); printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null); + printf("send_list: %-18"PRIu64" \n", lstack_stat->data.pkts.send_list); } static void gazelle_print_lstack_stat_detail(struct gazelle_stack_dfx_data *lstack_stat, @@ -884,7 +885,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ do { printf("\n------ stack tid: %6u ------\n", stat->tid); printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address" - " Foreign Address State\n"); + " Foreign Address State\n"); uint32_t unread_pkts = 0; uint32_t unsend_pkts = 0; for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) { -- 1.8.3.1