From fadeb43a653ab5da503b7030b60b4e063f0b3aef Mon Sep 17 00:00:00 2001 From: wuchangsheng Date: Sun, 13 Mar 2022 22:57:44 +0800 Subject: [PATCH 24/34] refactor event --- src/common/gazelle_dfx_msg.h | 10 +++- src/lstack/api/lstack_epoll.c | 73 +++++++++++++++++--------- src/lstack/core/lstack_control_plane.c | 13 +++++ src/lstack/core/lstack_lwip.c | 76 ++++++++++++++++++++------- src/lstack/core/lstack_protocol_stack.c | 36 ++++++++++--- src/lstack/core/lstack_stack_stat.c | 4 ++ src/lstack/core/lstack_thread_rpc.c | 42 +++++++++++++++ src/lstack/include/lstack_lwip.h | 3 ++ src/lstack/include/lstack_protocol_stack.h | 2 + src/lstack/include/lstack_thread_rpc.h | 3 ++ src/lstack/include/lstack_weakup.h | 84 +++++++++++++++++++++--------- src/ltran/ltran_dfx.c | 36 ++++++++----- 12 files changed, 290 insertions(+), 92 deletions(-) diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h index cea4200..ae20436 100644 --- a/src/common/gazelle_dfx_msg.h +++ b/src/common/gazelle_dfx_msg.h @@ -64,10 +64,12 @@ struct gazelle_stat_pkts { uint64_t rx_drop; uint64_t rx_allocmbuf_fail; uint64_t tx_allocmbuf_fail; - uint16_t weakup_ring_cnt; uint64_t call_msg_cnt; + uint16_t weakup_ring_cnt; uint16_t conn_num; uint16_t send_idle_ring_cnt; + uint64_t event_list; + uint64_t wakeup_list; uint64_t read_lwip_drop; uint64_t read_lwip_cnt; uint64_t write_lwip_drop; @@ -89,6 +91,10 @@ struct gazelle_stat_pkts { uint64_t send_self_rpc; uint64_t call_null; uint64_t arp_copy_fail; + uint64_t epoll_pending; + uint64_t epoll_pending_call; + uint64_t epoll_self_call; + uint64_t epoll_self_event; }; /* same as define in lwip/stats.h - struct stats_mib2 */ @@ -162,6 +168,8 @@ struct gazelle_stat_lstack_conn_info { uint32_t send_ring_cnt; uint32_t recv_ring_cnt; uint32_t tcp_sub_state; + uint32_t event_ring_cnt; + uint32_t self_ring_cnt; }; struct gazelle_stat_lstack_conn { diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c index bcbb35e..a686ddb 100644 --- a/src/lstack/api/lstack_epoll.c +++ b/src/lstack/api/lstack_epoll.c @@ -87,19 +87,27 @@ void add_epoll_event(struct netconn *conn, uint32_t event) return; } - sock->have_event = true; - weakup_enqueue(sock->stack->weakup_ring, sock); - sock->stack->stats.weakup_events++; + if (weakup_enqueue(sock->stack->weakup_ring, sock)) { + if (list_is_empty(&sock->event_list)) { + list_add_node(&sock->stack->event_list, &sock->event_list); + } + } else { + sock->have_event = true; + sock->stack->stats.weakup_events++; + } } static void raise_pending_events(struct lwip_sock *sock) { - if (!sock->conn) { + struct weakup_poll *wakeup = sock->weakup; + struct protocol_stack *stack = sock->stack; + struct netconn *conn = sock->conn; + if (wakeup == NULL || stack == NULL || conn == NULL) { return; } struct lwip_sock *attach_sock = NULL; - if (sock->attach_fd > 0 && sock->attach_fd != sock->conn->socket) { + if (sock->attach_fd > 0 && sock->attach_fd != conn->socket) { attach_sock = get_socket_by_fd(sock->attach_fd); if (attach_sock == NULL) { return; @@ -108,7 +116,10 @@ static void raise_pending_events(struct lwip_sock *sock) attach_sock = sock; } - struct netconn *conn = attach_sock->conn; + conn = attach_sock->conn; + if (conn == NULL) { + return; + } struct tcp_pcb *tcp = conn->pcb.tcp; if ((tcp == NULL) || (tcp->state < ESTABLISHED)) { return; @@ -132,10 +143,17 @@ static void raise_pending_events(struct lwip_sock *sock) event |= POLLERR | POLLIN; } - if (event != 0) { - sock->events |= event; - rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock); - sem_post(&sock->weakup->event_sem); + if (event == 0) { + return; + } + sock->events |= event; + if (rte_ring_mp_enqueue(wakeup->event_ring, (void *)sock) == 0 || + rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) { + sem_post(&wakeup->event_sem); + stack->stats.epoll_pending++; + } else { + rpc_call_addevent(stack, sock); + stack->stats.epoll_pending_call++; } } @@ -168,6 +186,12 @@ int32_t lstack_epoll_create(int32_t size) GAZELLE_RETURN(ENOMEM); } + weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd); + if (weakup->self_ring == NULL) { + posix_api->close_fn(fd); + GAZELLE_RETURN(ENOMEM); + } + sock->weakup = weakup; return fd; @@ -247,11 +271,6 @@ static inline int32_t save_poll_event(struct pollfd *fds, uint32_t maxevents, st static bool remove_event(enum POLL_TYPE etype, struct lwip_sock **sock_list, int32_t event_num, struct lwip_sock *sock) { - /* close sock */ - if (sock->stack == NULL) { - return true; - } - /* remove duplicate event */ for (uint32_t i = 0; i < event_num && etype == TYPE_EPOLL; i++) { if (sock_list[i] == sock) { @@ -267,29 +286,26 @@ static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t m struct epoll_event *events = (struct epoll_event *)out; struct pollfd *fds = (struct pollfd *)out; - uint32_t events_cnt = rte_ring_count(weakup->event_ring); - if (events_cnt == 0) { - return 0; - } if (etype == TYPE_EPOLL) { maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents); } - events_cnt = LWIP_MIN(events_cnt, maxevents); int32_t event_num = 0; struct lwip_sock *sock = NULL; - while (event_num < events_cnt) { - int32_t ret = rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock); - if (ret != 0) { + while (event_num < maxevents) { + if (rte_ring_sc_dequeue(weakup->self_ring, (void **)&sock) && + rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock)) { break; } + /* close sock */ + if (sock->stack == NULL) { + return true; + } sock->have_event = false; if (remove_event(etype, weakup->sock_list, event_num, sock)) { - if (sock->stack) { - sock->stack->stats.remove_event++; - } + sock->stack->stats.remove_event++; continue; } @@ -390,6 +406,11 @@ static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *we if (weakup->event_ring == NULL) { GAZELLE_RETURN(ENOMEM); } + + weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid()); + if (weakup->self_ring == NULL) { + GAZELLE_RETURN(ENOMEM); + } } for (uint32_t i = 0; i < nfds; i++) { diff --git a/src/lstack/core/lstack_control_plane.c b/src/lstack/core/lstack_control_plane.c index a7e084d..c782d51 100644 --- a/src/lstack/core/lstack_control_plane.c +++ b/src/lstack/core/lstack_control_plane.c @@ -38,6 +38,7 @@ #define RECONNECT_TO_LTRAN_DELAY (1) #define GAZELLE_BADFD (-1) #define GAZELLE_LISTEN_BACKLOG 5 +#define GAZELLE_10MS (10000) static int32_t g_data_fd = -1; static volatile bool g_register_state = true; @@ -701,6 +702,12 @@ void control_server_thread(void *arg) int32_t num, connfd; struct epoll_event evt_array; while (1) { + /* wait init finish */ + if (posix_api->is_chld) { + usleep(GAZELLE_10MS); + continue; + } + num = posix_api->epoll_wait_fn(epfd, &evt_array, 1, -1); if (num <= 0) { continue; @@ -741,6 +748,12 @@ void control_client_thread(void *arg) } while (1) { + /* wait init finish */ + if (posix_api->is_chld) { + usleep(GAZELLE_10MS); + continue; + } + if (sockfd < 0) { set_register_state(false); sockfd = client_reg_proc_reconnect(epfd); diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index b157517..d55f1e6 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -110,6 +110,8 @@ void gazelle_init_sock(int32_t fd) init_list_node(&sock->recv_list); init_list_node(&sock->attach_list); init_list_node(&sock->listen_list); + init_list_node(&sock->event_list); + init_list_node(&sock->wakeup_list); } void gazelle_clean_sock(int32_t fd) @@ -126,6 +128,8 @@ void gazelle_clean_sock(int32_t fd) list_del_node_init(&sock->recv_list); list_del_node_init(&sock->attach_list); list_del_node_init(&sock->listen_list); + list_del_node_init(&sock->event_list); + list_del_node_init(&sock->wakeup_list); } void gazelle_free_pbuf(struct pbuf *pbuf) @@ -266,6 +270,30 @@ ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags) return (send_ret < 0) ? send_ret : send_len; } +void add_self_event(struct lwip_sock *sock, uint32_t events) +{ + struct weakup_poll *wakeup = sock->weakup; + struct protocol_stack *stack = sock->stack; + if (wakeup == NULL || stack == NULL) { + return; + } + + sock->events |= events; + + if (sock->have_event) { + return; + } + + if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) { + sock->have_event = true; + sem_post(&sock->weakup->event_sem); + stack->stats.epoll_self_event++; + } else { + rpc_call_addevent(stack, sock); + stack->stats.epoll_self_call++; + } +} + ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) { uint32_t free_count = rte_ring_free_count(sock->send_ring); @@ -303,14 +331,10 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) send_pkt++; } - if (!sock->have_event && (sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) { - sock->have_event = true; - sock->events |= EPOLLOUT; - rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock); - sem_post(&sock->weakup->event_sem); + if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) { + add_self_event(sock, EPOLLOUT); sock->stack->stats.write_events++; - } - if (!NETCONN_IS_DATAOUT(sock)) { + } else { sock->events &= ~EPOLLOUT; } @@ -507,14 +531,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) } } - if (!sock->have_event && (sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) { - sock->have_event = true; - sock->events |= EPOLLIN; - rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock); - sem_post(&sock->weakup->event_sem); + if ((sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) { + add_self_event(sock, EPOLLIN); sock->stack->stats.read_events++; - } - if (!NETCONN_IS_DATAIN(sock)) { + } else { sock->events &= ~EPOLLIN; } @@ -577,9 +597,14 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s conn->recv_cnt = rte_ring_count(netconn->recvmbox->ring); struct lwip_sock *sock = get_socket(netconn->socket); - if (sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) { + if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) { conn->recv_ring_cnt = rte_ring_count(sock->recv_ring); conn->send_ring_cnt = rte_ring_count(sock->send_ring); + struct weakup_poll *weakup = sock->weakup; + if (weakup) { + conn->event_ring_cnt = rte_ring_count(weakup->event_ring); + conn->self_ring_cnt = rte_ring_count(weakup->self_ring); + } } } } @@ -696,10 +721,8 @@ void get_lwip_connnum(struct rpc_msg *msg) msg->result = conn_num; } -void stack_recvlist_count(struct rpc_msg *msg) +static uint32_t get_list_count(struct list_node *list) { - struct protocol_stack *stack = get_protocol_stack(); - struct list_node *list = &(stack->recv_list); struct list_node *node, *temp; uint32_t count = 0; @@ -707,5 +730,20 @@ void stack_recvlist_count(struct rpc_msg *msg) count++; } - msg->result = count; + return count; +} + +void stack_wakeuplist_count(struct rpc_msg *msg) +{ + msg->result = get_list_count(get_protocol_stack()->wakeup_list); +} + +void stack_eventlist_count(struct rpc_msg *msg) +{ + msg->result = get_list_count(&get_protocol_stack()->event_list); +} + +void stack_recvlist_count(struct rpc_msg *msg) +{ + msg->result = get_list_count(&get_protocol_stack()->recv_list); } diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index c88f902..45649fe 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -193,6 +193,7 @@ int32_t init_protocol_stack(void) init_list_node(&stack->recv_list); init_list_node(&stack->listen_list); + init_list_node(&stack->event_list); stack_group->stacks[i] = stack; } @@ -261,8 +262,14 @@ static void* gazelle_weakup_thread(void *arg) thread_affinity_init(lcore_id); LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id); + struct list_node wakeup_list; + init_list_node(&wakeup_list); + stack->wakeup_list = &wakeup_list; + for (;;) { - weakup_thread(stack->weakup_ring); + wakeup_list_sock(&wakeup_list); + + weakup_thread(stack->weakup_ring, &wakeup_list); } return NULL; @@ -307,6 +314,24 @@ static void stack_thread_init(struct protocol_stack *stack) LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id); } +static void report_stack_event(struct protocol_stack *stack) +{ + struct list_node *list = &(stack->event_list); + struct list_node *node, *temp; + struct lwip_sock *sock; + + list_for_each_safe(node, temp, list) { + sock = container_of(node, struct lwip_sock, event_list); + + if (weakup_enqueue(stack->weakup_ring, sock) == 0) { + list_del_node_init(&sock->event_list); + stack->stats.weakup_events++; + } else { + break; + } + } +} + static void* gazelle_stack_thread(void *arg) { struct protocol_stack *stack = (struct protocol_stack *)arg; @@ -321,6 +346,8 @@ static void* gazelle_stack_thread(void *arg) read_recv_list(); sys_timer_run(); + + report_stack_event(stack); } return NULL; @@ -737,11 +764,8 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add } struct lwip_sock *sock = get_socket(head_fd); - if (!sock->have_event && have_accept_event(head_fd)) { - sock->have_event = true; - sock->events |= EPOLLIN; - rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock); - sem_post(&sock->weakup->event_sem); + if (have_accept_event(head_fd)) { + add_self_event(sock, EPOLLIN); sock->stack->stats.accept_events++; } diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c index 41fe9bf..b7b94e2 100644 --- a/src/lstack/core/lstack_stack_stat.c +++ b/src/lstack/core/lstack_stack_stat.c @@ -109,6 +109,10 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring); dfx->data.pkts.call_msg_cnt = rpc_call_msgcnt(stack); dfx->data.pkts.recv_list = rpc_call_recvlistcnt(stack); + dfx->data.pkts.event_list = rpc_call_eventlistcnt(stack); + if (stack->wakeup_list) { + dfx->data.pkts.wakeup_list = rpc_call_eventlistcnt(stack); + } dfx->data.pkts.conn_num = stack->conn_num; } diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index b3665a7..2fb24b4 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -177,6 +177,26 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); } +int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack) +{ + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_wakeuplist_count); + if (msg == NULL) { + return -1; + } + + return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); +} + +int32_t rpc_call_eventlistcnt(struct protocol_stack *stack) +{ + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_eventlist_count); + if (msg == NULL) { + return -1; + } + + return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); +} + int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) { struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count); @@ -187,6 +207,28 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); } +void add_epoll_event(struct netconn *conn, uint32_t event); +static void rpc_add_event(struct rpc_msg *msg) +{ + struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p; + if (sock->conn) { + add_epoll_event(sock->conn, sock->events); + } +} + +void rpc_call_addevent(struct protocol_stack *stack, void *sock) +{ + struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_add_event); + if (msg == NULL) { + return; + } + + msg->args[MSG_ARG_0].p = sock; + + msg->self_release = 0; + rpc_call(&stack->rpc_queue, msg); +} + static void rpc_replenish_idlembuf(struct rpc_msg *msg) { struct protocol_stack *stack = get_protocol_stack(); diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h index 581b9fe..87442cd 100644 --- a/src/lstack/include/lstack_lwip.h +++ b/src/lstack/include/lstack_lwip.h @@ -33,6 +33,8 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags); ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags); void read_recv_list(void); void add_recv_list(int32_t fd); +void stack_eventlist_count(struct rpc_msg *msg); +void stack_wakeuplist_count(struct rpc_msg *msg); void get_lwip_conntable(struct rpc_msg *msg); void get_lwip_connnum(struct rpc_msg *msg); void stack_recvlist_count(struct rpc_msg *msg); @@ -42,5 +44,6 @@ void gazelle_free_pbuf(struct pbuf *pbuf); ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags); ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags); ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags); +void add_self_event(struct lwip_sock *sock, uint32_t events); #endif diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index f289465..dd7633b 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -51,6 +51,8 @@ struct protocol_stack { struct list_node recv_list; struct list_node listen_list; + struct list_node event_list; + struct list_node *wakeup_list; struct gazelle_stat_pkts stats; struct gazelle_stack_latency latency; diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index 1365234..cffb273 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -50,9 +50,12 @@ struct rpc_msg { struct protocol_stack; void poll_rpc_msg(struct protocol_stack *stack); void rpc_call_replenish_idlembuf(struct protocol_stack *stack); +void rpc_call_addevent(struct protocol_stack *stack, void *sock); int32_t rpc_call_msgcnt(struct protocol_stack *stack); int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); int32_t rpc_call_recvlistcnt(struct protocol_stack *stack); +int32_t rpc_call_eventlistcnt(struct protocol_stack *stack); +int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack); int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn); int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn); int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn); diff --git a/src/lstack/include/lstack_weakup.h b/src/lstack/include/lstack_weakup.h index f334a0f..8f7fca2 100644 --- a/src/lstack/include/lstack_weakup.h +++ b/src/lstack/include/lstack_weakup.h @@ -22,55 +22,93 @@ struct weakup_poll { sem_t event_sem; struct lwip_sock *sock_list[EPOLL_MAX_EVENTS]; struct rte_ring *event_ring; + struct rte_ring *self_ring; }; #define WEAKUP_MAX (32) -static inline __attribute__((always_inline)) void weakup_attach_sock(struct lwip_sock *sock) +static inline void wakeup_list_sock(struct list_node *wakeup_list) { - struct list_node *list = &(sock->attach_list); struct list_node *node, *temp; - struct lwip_sock *attach_sock; - int32_t ret; - list_for_each_safe(node, temp, list) { - attach_sock = container_of(node, struct lwip_sock, attach_list); - if (attach_sock->weakup == NULL) { + list_for_each_safe(node, temp, wakeup_list) { + struct lwip_sock *sock = container_of(node, struct lwip_sock, wakeup_list); + + struct weakup_poll *weakup = sock->weakup; + struct protocol_stack *stack = sock->stack; + if (weakup == NULL || stack == NULL) { continue; } - ret = rte_ring_mp_enqueue(attach_sock->weakup->event_ring, (void *)attach_sock); + int32_t ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock); if (ret == 0) { - sem_post(&attach_sock->weakup->event_sem); - attach_sock->stack->stats.lwip_events++; + list_del_node_init(&sock->event_list); + sem_post(&weakup->event_sem); + stack->stats.lwip_events++; + } else { + break; } } } -static inline __attribute__((always_inline)) void weakup_thread(struct rte_ring *weakup_ring) +static inline int32_t weakup_attach_sock(struct list_node *attach_list) +{ + struct list_node *node, *temp; + int32_t wakeuped = -1; + + list_for_each_safe(node, temp, attach_list) { + struct lwip_sock *sock = container_of(node, struct lwip_sock, attach_list); + + struct weakup_poll *weakup = sock->weakup; + struct protocol_stack *stack = sock->stack; + if (weakup == NULL || stack == NULL) { + continue; + } + + int32_t ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock); + if (ret == 0) { + sem_post(&weakup->event_sem); + stack->stats.lwip_events++; + wakeuped = 0; + } + } + + return wakeuped; +} + +static inline void weakup_thread(struct rte_ring *weakup_ring, struct list_node *wakeup_list) { struct lwip_sock *sock; - int32_t ret; for (uint32_t i = 0; i < WEAKUP_MAX; ++i) { - ret = rte_ring_sc_dequeue(weakup_ring, (void **)&sock); + int32_t ret = rte_ring_sc_dequeue(weakup_ring, (void **)&sock); if (ret != 0) { break; } - ret = rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock); + struct weakup_poll *weakup = sock->weakup; + struct protocol_stack *stack = sock->stack; + if (weakup == NULL || stack == NULL) { + continue; + } + + ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock); if (ret == 0) { - sem_post(&sock->weakup->event_sem); - sock->stack->stats.lwip_events++; + sem_post(&weakup->event_sem); + stack->stats.lwip_events++; } /* listen notice attach sock */ + int32_t wakeuped = -1; if (!list_is_empty(&sock->attach_list)) { - weakup_attach_sock(sock); + wakeuped = weakup_attach_sock(&sock->attach_list); } - /* event_ring of attach sock may have idle elem */ - if (ret != 0) { + /* notice any epoll enough */ + if (ret != 0 && wakeuped != 0) { + if (list_is_empty(&sock->wakeup_list)) { + list_add_node(wakeup_list, &sock->wakeup_list); + } break; } } @@ -79,13 +117,7 @@ static inline __attribute__((always_inline)) void weakup_thread(struct rte_ring static inline __attribute__((always_inline)) int weakup_enqueue(struct rte_ring *weakup_ring, struct lwip_sock *sock) { - int ret = rte_ring_sp_enqueue(weakup_ring, (void *)sock); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "tid %d, failed\n", gettid()); - return -1; - } - - return 0; + return rte_ring_sp_enqueue(weakup_ring, (void *)sock); } #endif diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c index 451f527..66d6053 100644 --- a/src/ltran/ltran_dfx.c +++ b/src/ltran/ltran_dfx.c @@ -567,14 +567,20 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) printf("weakup_events: %-14"PRIu64" ", lstack_stat->data.pkts.weakup_events); printf("lwip_events: %-16"PRIu64" ", lstack_stat->data.pkts.lwip_events); printf("app_events: %-17"PRIu64"\n", lstack_stat->data.pkts.app_events); + printf("epoll_pending: %-14"PRIu64" ", lstack_stat->data.pkts.epoll_pending); + printf("epoll_self_event: %-11"PRIu64" ", lstack_stat->data.pkts.epoll_self_event); + printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event); printf("read_events: %-16"PRIu64" ", lstack_stat->data.pkts.read_events); printf("write_events: %-15"PRIu64" ", lstack_stat->data.pkts.write_events); printf("accept_events: %-14"PRIu64" \n", lstack_stat->data.pkts.accept_events); - printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt); printf("read_null: %-18"PRIu64" ", lstack_stat->data.pkts.read_null); - printf("call_alloc_fail: %-12"PRIu64" \n", lstack_stat->data.pkts.call_alloc_fail); - printf("remove_event: %-15"PRIu64" ", lstack_stat->data.pkts.remove_event); + printf("wakeup_list: %-16"PRIu64" ", lstack_stat->data.pkts.wakeup_list); + printf("event_list: %-17"PRIu64" \n", lstack_stat->data.pkts.event_list); printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc); + printf("epoll_pending_call: %-9"PRIu64" ", lstack_stat->data.pkts.epoll_pending_call); + printf("epoll_self_call: %-12"PRIu64" \n", lstack_stat->data.pkts.epoll_self_call); + printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt); + printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail); printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null); } @@ -866,8 +872,7 @@ static void gazelle_print_lstack_stat_snmp(void *buf, const struct gazelle_stat_ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_msg_request *req_msg) { - int32_t ret; - uint32_t i, unread_pkts; + uint32_t i; struct in_addr rip; struct in_addr lip; char str_ip[GAZELLE_SUBNET_LENGTH_MAX] = {0}; @@ -878,30 +883,33 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ printf("Active Internet connections (servers and established)\n"); do { printf("\n------ stack tid: %6u ------\n", stat->tid); - printf("No. Proto recv_cnt recv_ring in_send send_ring Local Address" + printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address" " Foreign Address State\n"); - unread_pkts = 0; + uint32_t unread_pkts = 0; + uint32_t unsend_pkts = 0; for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) { struct gazelle_stat_lstack_conn_info *conn_info = &conn->conn_list[i]; rip.s_addr = conn_info->rip; lip.s_addr = conn_info->lip; if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) { - printf("%-6utcp %-10u%-11u%-9u%-11u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt, - conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, - inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, inet_ntop(AF_INET, &rip, - str_rip, sizeof(str_rip)), conn_info->r_port, tcp_state_to_str(conn_info->tcp_sub_state)); + printf("%-6utcp %-10u%-11u%-9u%-11u%-7u%-12u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt, + conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->event_ring_cnt, + conn_info->self_ring_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, + inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port, + tcp_state_to_str(conn_info->tcp_sub_state)); } else if (conn_info->state == GAZELLE_LISTEN_LIST) { - printf("%-6utcp %-41u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt, + printf("%-6utcp %-60u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port); } else { printf("Got unknow tcp conn::%s:%5hu, state:%u\n", inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, conn_info->state); } unread_pkts += conn_info->recv_ring_cnt; + unsend_pkts += conn_info->send_ring_cnt; } if (conn->conn_num > 0) { - printf("Total unread pkts: %u \n", unread_pkts); + printf("Total unread pkts:%u unsend pkts:%u\n", unread_pkts, unsend_pkts); } if (i < conn->total_conn_num) { @@ -912,7 +920,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ if (stat->eof != 0) { break; } - ret = dfx_stat_read_from_ltran(buf, sizeof(struct gazelle_stack_dfx_data), req_msg->stat_mode); + int32_t ret = dfx_stat_read_from_ltran(buf, sizeof(struct gazelle_stack_dfx_data), req_msg->stat_mode); if (ret != GAZELLE_OK) { return; } -- 1.8.3.1