diff --git a/0037-add-gazellectl-lstack-constraint.patch b/0037-add-gazellectl-lstack-constraint.patch new file mode 100644 index 0000000..7537c5f --- /dev/null +++ b/0037-add-gazellectl-lstack-constraint.patch @@ -0,0 +1,25 @@ +From 0a62cc1547009c6e382c08d7e99c8727201dd56a Mon Sep 17 00:00:00 2001 +From: wuchangsheng +Date: Mon, 21 Mar 2022 09:50:49 +0800 +Subject: [PATCH] addd gazellectl lstack constraint + +--- + README.md | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/README.md b/README.md +index e914b26..eef6362 100644 +--- a/README.md ++++ b/README.md +@@ -199,7 +199,7 @@ redis-server redis.conf + liblstack.so编译进应用程序后wrap网络编程标准接口,应用程序无需修改代码。 + + ### 9. gazellectl +-- 不使用ltran模式时不支持gazellectl ltran xxx 命令 ++- 不使用ltran模式时不支持gazellectl ltran xxx命令,以及-r, rate命令 + ``` + Usage: gazellectl [-h | help] + or: gazellectl ltran {quit | show} [LTRAN_OPTIONS] [time] +-- +1.8.3.1 + diff --git a/0038-refactor-event.patch b/0038-refactor-event.patch new file mode 100644 index 0000000..85d43d9 --- /dev/null +++ b/0038-refactor-event.patch @@ -0,0 +1,3098 @@ +From 9b4379914e97d4c0c267033559bf86d20c7381b6 Mon Sep 17 00:00:00 2001 +From: jiangheng +Date: Wed, 30 Mar 2022 00:04:46 +0800 +Subject: [PATCH] refactor event + +--- + README.md | 3 +- + src/common/gazelle_dfx_msg.h | 18 +- + src/lstack/api/lstack_epoll.c | 509 +++++++++++++++-------------- + src/lstack/api/lstack_signal.c | 1 + + src/lstack/core/lstack_cfg.c | 27 +- + src/lstack/core/lstack_dpdk.c | 99 ++++-- + src/lstack/core/lstack_init.c | 13 +- + src/lstack/core/lstack_lwip.c | 352 +++++++++++--------- + src/lstack/core/lstack_protocol_stack.c | 345 +++++++++---------- + src/lstack/core/lstack_stack_stat.c | 6 - + src/lstack/core/lstack_thread_rpc.c | 106 +++--- + src/lstack/include/lstack_cfg.h | 2 +- + src/lstack/include/lstack_dpdk.h | 7 +- + src/lstack/include/lstack_log.h | 9 +- + src/lstack/include/lstack_lwip.h | 11 +- + src/lstack/include/lstack_protocol_stack.h | 35 +- + src/lstack/include/lstack_thread_rpc.h | 5 +- + src/lstack/include/lstack_vdev.h | 2 +- + src/lstack/include/posix/lstack_epoll.h | 2 + + src/lstack/lstack.conf | 1 - + src/lstack/netif/lstack_ethdev.c | 25 +- + src/lstack/netif/lstack_vdev.c | 2 +- + src/ltran/ltran_dfx.c | 31 +- + src/ltran/ltran_opt.h | 6 +- + 24 files changed, 822 insertions(+), 795 deletions(-) + +diff --git a/README.md b/README.md +index e914b26..24079c7 100644 +--- a/README.md ++++ b/README.md +@@ -236,7 +236,7 @@ Usage: gazellectl [-h | help] + - 提供的命令行、配置文件以及配置大页内存需要root权限执行或修改。非root用户使用,需先提权以及修改文件权限。 + - 若要把用户态网卡绑回内核驱动,必须先将Gazelle退出。 + - 不支持accept阻塞模式或者connect阻塞模式。 +-- 最多只支持20000个链接(需要保证进程内,非网络连接的fd个数小于2000个)。 ++- 最多只支持1500个连接。 + - 协议栈当前只支持tcp、icmp、arp、ipv4。 + - 大页内存不支持在挂载点里创建子目录重新挂载。 + - 在对端ping时,要求指定报文长度小于等于14000。 +@@ -253,6 +253,7 @@ Usage: gazellectl [-h | help] + - 不使用ltran模式,kni网口只支持本地通讯使用,且需要启动前配置NetworkManager不管理kni网卡 + - 虚拟kni网口的ip及mac地址,需要与lstack配置文件保持一致 + - gazelle运行过程中,不允许删除运行文件,如果删除,需要重启gazelle ++- lstack配置的ip需要与应用程序的ip保持一致 + + ## Security risk note + gazelle有如下安全风险,用户需要评估使用场景风险 +diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h +index de669f5..6db67ee 100644 +--- a/src/common/gazelle_dfx_msg.h ++++ b/src/common/gazelle_dfx_msg.h +@@ -65,11 +65,9 @@ struct gazelle_stat_pkts { + uint64_t rx_allocmbuf_fail; + uint64_t tx_allocmbuf_fail; + uint64_t call_msg_cnt; +- uint16_t weakup_ring_cnt; + uint16_t conn_num; + uint16_t send_idle_ring_cnt; + uint64_t event_list; +- uint64_t wakeup_list; + uint64_t read_lwip_drop; + uint64_t read_lwip_cnt; + uint64_t write_lwip_drop; +@@ -79,22 +77,13 @@ struct gazelle_stat_pkts { + uint64_t app_write_idlefail; + uint64_t app_write_drop; + uint64_t recv_list; +- uint64_t lwip_events; +- uint64_t weakup_events; ++ uint64_t wakeup_events; + uint64_t app_events; + uint64_t call_alloc_fail; +- uint64_t read_events; +- uint64_t write_events; +- uint64_t accept_events; + uint64_t read_null; +- uint64_t remove_event; +- uint64_t send_self_rpc; + uint64_t call_null; + uint64_t arp_copy_fail; +- uint64_t epoll_pending; +- uint64_t epoll_pending_call; +- uint64_t epoll_self_call; +- uint64_t epoll_self_event; ++ uint64_t send_self_rpc; + uint64_t send_list; + }; + +@@ -169,8 +158,7 @@ struct gazelle_stat_lstack_conn_info { + uint32_t send_ring_cnt; + uint32_t recv_ring_cnt; + uint32_t tcp_sub_state; +- uint32_t event_ring_cnt; +- uint32_t self_ring_cnt; ++ int32_t sem_cnt; + }; + + struct gazelle_stat_lstack_conn { +diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c +index e54d496..b8d53f6 100644 +--- a/src/lstack/api/lstack_epoll.c ++++ b/src/lstack/api/lstack_epoll.c +@@ -14,6 +14,7 @@ + #include + #include + #include ++#include + + #include + #include +@@ -21,6 +22,7 @@ + #include + #include + #include ++#include + + #include "lstack_compiler.h" + #include "lstack_ethdev.h" +@@ -28,127 +30,103 @@ + #include "lstack_cfg.h" + #include "lstack_log.h" + #include "gazelle_base_func.h" +-#include "lstack_weakup.h" + #include "lstack_lwip.h" + #include "lstack_protocol_stack.h" + +-#define EPOLL_INTERVAL_10MS 10000000 ++#define EPOLL_KERNEL_INTERVAL 10 /* ms */ ++#define EPOLL_NSEC_TO_SEC 1000000000 ++#define EPOLL_MAX_EVENTS 512 + +-static PER_THREAD struct weakup_poll g_weakup_poll = {0}; +- +-enum POLL_TYPE { +- TYPE_POLL, +- TYPE_EPOLL, +-}; +- +-static inline bool check_event_vaild(struct lwip_sock *sock, uint32_t event) +-{ +- if ((event & EPOLLIN) && !NETCONN_IS_ACCEPTIN(sock) && !NETCONN_IS_DATAIN(sock)) { +- event &= ~EPOLLIN; +- } +- +- if ((event & EPOLLOUT) && !NETCONN_IS_DATAOUT(sock)) { +- event &= ~EPOLLOUT; +- } +- +- return (event) ? true : false; +-} +- +-static inline bool report_events(struct lwip_sock *sock, uint32_t event) +-{ +- /* error event */ +- if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) { +- return true; +- } +- +- if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) { +- return false; +- } +- +- return check_event_vaild(sock, event); +-} ++static PER_THREAD struct wakeup_poll g_wakeup_poll = {0}; ++static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */ + + void add_epoll_event(struct netconn *conn, uint32_t event) + { + /* conn sock nerver null, because lwip call this func */ + struct lwip_sock *sock = get_socket(conn->socket); + +- /* close_wait event should be (EPOLLRDHUP | EPOLLIN), but lwip is EPOLLERR */ +- if (event == EPOLLERR && conn->pcb.tcp && conn->pcb.tcp->state == CLOSE_WAIT) { +- event = EPOLLRDHUP | EPOLLIN | EPOLLERR; +- } +- + if ((event & sock->epoll_events) == 0) { + return; + } ++ + sock->events |= event & sock->epoll_events; + +- if (!sock->weakup || !report_events(sock, event)) { +- return; ++#ifdef GAZELLE_USE_EPOLL_EVENT_STACK ++ if (g_use_epoll && list_is_empty(&sock->event_list)) { ++ list_add_node(&sock->stack->event_list, &sock->event_list); + } ++#endif + +- if (weakup_enqueue(sock->stack->weakup_ring, sock)) { +- if (list_is_empty(&sock->event_list)) { +- list_add_node(&sock->stack->event_list, &sock->event_list); ++ if (sock->wakeup) { ++ sock->stack->stats.wakeup_events++; ++ if (get_protocol_stack_group()->wakeup_enable) { ++ rte_ring_sp_enqueue(sock->stack->wakeup_ring, &sock->wakeup->event_sem); ++ } else { ++ sem_post(&sock->wakeup->event_sem); + } +- } else { +- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); +- sock->stack->stats.weakup_events++; + } + } + +-static void raise_pending_events(struct lwip_sock *sock) ++static inline uint32_t update_events(struct lwip_sock *sock) + { +- struct weakup_poll *wakeup = sock->weakup; +- struct protocol_stack *stack = sock->stack; +- struct netconn *conn = sock->conn; +- if (wakeup == NULL || stack == NULL || conn == NULL) { +- return; ++ uint32_t event = 0; ++ ++ if (sock->epoll_events & EPOLLIN) { ++ if (sock->attach_fd > 0 && NETCONN_IS_ACCEPTIN(sock)) { ++ event |= EPOLLIN; ++ } ++ ++ if (sock->attach_fd < 0 && NETCONN_IS_DATAIN(sock)) { ++ event |= EPOLLIN; ++ } + } + +- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock; +- if (attach_sock == NULL) { +- return; ++ if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_OUTIDLE(sock)) { ++ event |= EPOLLOUT; + } + +- conn = attach_sock->conn; +- if (conn == NULL) { +- return; ++ if ((sock->epoll_events & EPOLLERR) && (sock->events & EPOLLERR)) { ++ event |= EPOLLERR | EPOLLIN; + } +- struct tcp_pcb *tcp = conn->pcb.tcp; +- if ((tcp == NULL) || (tcp->state < ESTABLISHED)) { ++ ++ return event; ++} ++ ++#ifdef GAZELLE_USE_EPOLL_EVENT_STACK ++void update_stack_events(struct protocol_stack *stack) ++{ ++ if (!g_use_epoll) { + return; + } + +- uint32_t event = 0; +- if (sock->epoll_events & EPOLLIN) { +- if (attach_sock->recv_lastdata || rte_ring_count(attach_sock->recv_ring) || NETCONN_IS_ACCEPTIN(attach_sock)) { +- event |= EPOLLIN; +- } +- } ++ struct list_node *node, *temp; ++ list_for_each_safe(node, temp, &stack->event_list) { ++ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list); + +- if (sock->epoll_events & EPOLLOUT) { +- if ((attach_sock->sendevent > 0) || +- ((tcp_sndbuf(conn->pcb.tcp) > TCP_SNDLOWAT) && (tcp_sndqueuelen(conn->pcb.tcp) < TCP_SNDQUEUELOWAT))) { +- event |= EPOLLOUT; ++ sock->events = update_events(sock); ++ if (sock->events != 0) { ++ continue; + } +- } + +- if (attach_sock->errevent > 0) { +- event |= POLLERR | POLLIN; ++ if (pthread_spin_trylock(&stack->event_lock)) { ++ continue; ++ } ++ list_del_node_init(&sock->event_list); ++ pthread_spin_unlock(&stack->event_lock); + } ++} ++#endif + +- if (event == 0) { ++static void raise_pending_events(struct lwip_sock *sock) ++{ ++ struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock; ++ if (attach_sock == NULL) { + return; + } +- attach_sock->events |= event; +- if (rte_ring_mp_enqueue(wakeup->event_ring, (void *)sock) == 0 || +- rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) { +- sem_post(&wakeup->event_sem); +- stack->stats.epoll_pending++; +- } else { +- rpc_call_addevent(stack, attach_sock); +- stack->stats.epoll_pending_call++; ++ ++ attach_sock->events = update_events(attach_sock); ++ if (attach_sock->events & attach_sock->epoll_events) { ++ rpc_call_addevent(attach_sock->stack, attach_sock); + } + } + +@@ -166,34 +144,35 @@ int32_t lstack_epoll_create(int32_t size) + GAZELLE_RETURN(EINVAL); + } + +- struct weakup_poll *weakup = malloc(sizeof(struct weakup_poll)); +- if (weakup == NULL) { ++ struct wakeup_poll *wakeup = malloc(sizeof(struct wakeup_poll)); ++ if (wakeup == NULL) { + posix_api->close_fn(fd); + GAZELLE_RETURN(EINVAL); + } + +- memset_s(weakup, sizeof(struct weakup_poll), 0, sizeof(struct weakup_poll)); +- sem_init(&weakup->event_sem, 0, 0); +- +- weakup->event_ring = create_ring("RING_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd); +- if (weakup->event_ring == NULL) { +- posix_api->close_fn(fd); +- GAZELLE_RETURN(ENOMEM); +- } +- +- weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd); +- if (weakup->self_ring == NULL) { +- posix_api->close_fn(fd); +- GAZELLE_RETURN(ENOMEM); +- } ++ memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll)); ++ sem_init(&wakeup->event_sem, 0, 0); + +- sock->weakup = weakup; ++ sock->wakeup = wakeup; ++ init_list_node(&wakeup->event_list); + ++ g_use_epoll = true; + return fd; + } + + int32_t lstack_epoll_close(int32_t fd) + { ++ struct lwip_sock *sock = get_socket_by_fd(fd); ++ if (sock == NULL) { ++ LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno); ++ GAZELLE_RETURN(EINVAL); ++ } ++ ++ if (sock->wakeup) { ++ free(sock->wakeup); ++ } ++ sock->wakeup = NULL; ++ + return 0; + } + +@@ -219,7 +198,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even + } + + struct lwip_sock *epoll_sock = get_socket_by_fd(epfd); +- if (epoll_sock == NULL || epoll_sock->weakup == NULL) { ++ if (epoll_sock == NULL || epoll_sock->wakeup == NULL) { + LSTACK_LOG(ERR, LSTACK, "epfd=%d\n", fd); + GAZELLE_RETURN(EINVAL); + } +@@ -228,7 +207,10 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even + do { + switch (op) { + case EPOLL_CTL_ADD: +- sock->weakup = epoll_sock->weakup; ++ sock->wakeup = epoll_sock->wakeup; ++ if (list_is_empty(&sock->event_list)) { ++ list_add_node(&sock->wakeup->event_list, &sock->event_list); ++ } + /* fall through */ + case EPOLL_CTL_MOD: + sock->epoll_events = events; +@@ -238,6 +220,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even + } + break; + case EPOLL_CTL_DEL: ++ list_del_node_init(&sock->event_list); + sock->epoll_events = 0; + break; + default: +@@ -250,176 +233,234 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even + return 0; + } + +-static inline int32_t save_poll_event(struct pollfd *fds, uint32_t maxevents, int32_t fd, uint32_t events) ++#ifdef GAZELLE_USE_EPOLL_EVENT_STACK ++static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents) + { + int32_t event_num = 0; +- for (uint32_t i = 0; i < maxevents; i++) { +- /* fds[i].revents != 0, the events is kernel events */ +- if (fds[i].fd == fd && fds[i].revents == 0) { +- fds[i].revents = events; +- event_num = 1; +- break; ++ struct protocol_stack_group *stack_group = get_protocol_stack_group(); ++ ++ maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents); ++ for (uint32_t i = 0; i < stack_group->stack_num && event_num < maxevents; i++) { ++ struct protocol_stack *stack = stack_group->stacks[i]; ++ int32_t start_event_num = event_num; ++ ++ if (pthread_spin_trylock(&stack->event_lock)) { ++ continue; ++ } ++ ++ struct list_node *node, *temp; ++ list_for_each_safe(node, temp, &stack->event_list) { ++ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list); ++ ++ uint32_t event = sock->events & sock->epoll_events; ++ if (event == 0 || sock->wait_close) { ++ continue; ++ } ++ ++ events[event_num].events = event; ++ events[event_num].data = sock->ep_data; ++ event_num++; ++ ++ if (event_num >= maxevents) { ++ break; ++ } + } ++ ++ pthread_spin_unlock(&stack->event_lock); ++ ++ __sync_fetch_and_add(&stack->stats.app_events, event_num - start_event_num); + } + + return event_num; + } +- +-static bool remove_event(enum POLL_TYPE etype, struct lwip_sock **sock_list, int32_t event_num, struct lwip_sock *sock, +- struct lwip_sock *attach_sock) ++#else ++static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents) + { +- /* remove duplicate event */ +- for (uint32_t i = 0; i < event_num && etype == TYPE_EPOLL; i++) { +- if (sock_list[i] == sock) { +- return true; ++ int32_t event_num = 0; ++ struct list_node *node, *temp; ++ list_for_each_safe(node, temp, &wakeup->event_list) { ++ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list); ++ if (sock->conn == NULL) { ++ list_del_node_init(&sock->event_list); ++ continue; + } ++ ++ struct lwip_sock *temp_sock = sock; ++ do { ++ struct lwip_sock *attach_sock = (temp_sock->attach_fd > 0) ? get_socket(temp_sock->attach_fd) : temp_sock; ++ if (attach_sock == NULL || temp_sock->wait_close) { ++ temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL; ++ continue; ++ } ++ ++ uint32_t event = update_events(attach_sock); ++ if (event != 0) { ++ events[event_num].events = event; ++ events[event_num].data = temp_sock->ep_data; ++ event_num++; ++ if (event_num >= maxevents) { ++ break; ++ } ++ } ++ ++ temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL; ++ } while (temp_sock); + } + +- return !check_event_vaild(attach_sock, attach_sock->events); ++ return event_num; + } ++#endif + +-static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t maxevents, enum POLL_TYPE etype) ++static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds) + { +- struct epoll_event *events = (struct epoll_event *)out; +- struct pollfd *fds = (struct pollfd *)out; +- +- if (etype == TYPE_EPOLL) { +- maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents); +- } + int32_t event_num = 0; +- struct lwip_sock *sock = NULL; + +- while (event_num < maxevents) { +- if (rte_ring_sc_dequeue(weakup->self_ring, (void **)&sock) && +- rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock)) { +- break; +- } +- __atomic_store_n(&sock->have_event, false, __ATOMIC_RELEASE); ++ for (uint32_t i = 0; i < nfds; i++) { ++ /* listenfd nextfd pointerto next stack listen, others nextfd=-1 */ ++ int32_t fd = fds[i].fd; ++ while (fd > 0) { ++ struct lwip_sock *sock = get_socket(fd); ++ if (sock == NULL) { ++ break; ++ } + +- /* sock->stack == NULL mean close sock */ +- if (sock->stack == NULL) { +- continue; +- } ++ /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */ ++ struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock; ++ if (attach_sock == NULL || sock->wait_close) { ++ fd = sock->nextfd; ++ continue; ++ } + +- /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */ +- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock; +- if (attach_sock == NULL) { +- continue; +- } ++ uint32_t events = update_events(attach_sock); ++ if (events) { ++ fds[i].revents = events; ++ __sync_fetch_and_add(&sock->stack->stats.app_events, 1); ++ event_num++; ++ break; ++ } + +- if (remove_event(etype, weakup->sock_list, event_num, sock, attach_sock)) { +- sock->stack->stats.remove_event++; +- continue; ++ fd = sock->nextfd; + } ++ } + +- if (etype == TYPE_EPOLL) { +- events[event_num].events = attach_sock->events; +- events[event_num].data = sock->ep_data; +- weakup->sock_list[event_num] = sock; +- event_num++; +- } else { +- /* shadow_fd event notice listen_fd */ +- if (attach_sock->shadowed_sock) { +- attach_sock = attach_sock->shadowed_sock; +- } ++ return event_num; ++} + +- if (sock->conn) { +- event_num += save_poll_event(fds, maxevents, sock->conn->socket, attach_sock->events); +- } ++static inline bool have_kernel_fd(int32_t epfd, struct pollfd *fds, nfds_t nfds) ++{ ++ /* when epfd > 0 is epoll type */ ++ for (uint32_t i = 0; i < nfds && epfd < 0; i++) { ++ if (get_socket(fds[i].fd) == NULL) { ++ return true; + } +- +- sock->stack->stats.app_events++; +- sem_trywait(&weakup->event_sem); /* each event post sem, so every read down sem */ + } + +- return event_num; ++ return false; + } + +-static inline int32_t remove_kernel_invaild_events(struct pollfd *fds, int32_t nfds, int32_t event_count) ++static inline int32_t poll_kernel_event(struct pollfd *fds, nfds_t nfds) + { +- int32_t real_count = 0; ++ int32_t event_num = 0; + +- for (int i = 0; i < nfds && real_count < event_count; i++) { +- if (fds[i].fd < 0 || fds[i].revents == 0) { ++ for (uint32_t i = 0; i < nfds; i++) { ++ /* lwip event */ ++ if (get_socket(fds[i].fd) != NULL || fds[i].fd < 0) { + continue; + } + +- struct lwip_sock *sock = get_socket(fds[i].fd); +- if (sock && CONN_TYPE_IS_LIBOS(sock->conn)) { +- fds[i].revents = 0; ++ int32_t ret = posix_api->poll_fn(&fds[i], 1, 0); ++ if (ret < 0) { ++ if (errno != EINTR) { ++ return ret; ++ } + } else { +- real_count++; ++ event_num += ret; + } + } + +- return real_count; ++ return event_num; + } + +-static int32_t poll_event(struct weakup_poll *weakup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout) ++static int32_t get_event(struct wakeup_poll *wakeup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout) + { +- struct epoll_event *events = (struct epoll_event *)out; + struct pollfd *fds = (struct pollfd *)out; ++ struct epoll_event *events = (struct epoll_event *)out; ++ bool have_kernel = have_kernel_fd(epfd, fds, maxevents); + int32_t event_num = 0; +- int32_t event_kernel_num = 0; +- struct timespec epoll_interval = { +- .tv_sec = 0, +- .tv_nsec = EPOLL_INTERVAL_10MS, +- }; +- uint32_t start_time = sys_now(); ++ int32_t poll_time = 0; ++ int32_t ret; + ++ /* when epfd > 0 is epoll type */ + do { +- /* epoll_wait type */ +- if (epfd > 0) { +- event_num += get_lwip_events(weakup, &events[event_num], maxevents - event_num, TYPE_EPOLL); +- if (event_num >= maxevents) { +- break; +- } ++ event_num += (epfd > 0) ? epoll_lwip_event(wakeup, &events[event_num], maxevents - event_num) : ++ poll_lwip_event(fds, maxevents); + +- event_kernel_num = posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0); ++ if (have_kernel) { ++ int32_t event_kernel_num = (epfd > 0) ? ++ posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0) : ++ poll_kernel_event(fds, maxevents); + if (event_kernel_num < 0) { +- break; ++ return event_kernel_num; + } + event_num += event_kernel_num; +- } else { +- /* for poll events, we need to distiguish kernel events and gazelle events */ +- event_kernel_num = posix_api->poll_fn(fds, maxevents, 0); +- if (event_kernel_num < 0) { ++ if (timeout >= 0 && poll_time >= timeout) { + break; + } +- event_kernel_num = remove_kernel_invaild_events(fds, maxevents, event_kernel_num); +- event_num += event_kernel_num; +- +- event_num += get_lwip_events(weakup, fds, maxevents, TYPE_POLL); ++ poll_time += EPOLL_KERNEL_INTERVAL; + } + + if (event_num > 0) { + break; + } + +- sem_timedwait(&weakup->event_sem, &epoll_interval); +- if (timeout > 0) { +- timeout = update_timeout(timeout, start_time); ++ int32_t interval = (have_kernel) ? EPOLL_KERNEL_INTERVAL : timeout; ++ struct timespec epoll_interval; ++ clock_gettime(CLOCK_REALTIME, &epoll_interval); ++ epoll_interval.tv_sec += interval / 1000; ++ epoll_interval.tv_nsec += (interval % 1000) * 1000000; ++ epoll_interval.tv_sec += epoll_interval.tv_nsec / 1000000000; ++ epoll_interval.tv_nsec = epoll_interval.tv_nsec % 1000000000; ++ ++ if (timeout < 0 && !have_kernel) { ++ ret = sem_wait(&wakeup->event_sem); ++ } else { ++ ret = sem_timedwait(&wakeup->event_sem, &epoll_interval); ++ } ++ ++ if (!have_kernel && ret < 0) { ++ break; + } +- } while (timeout != 0); ++ } while (event_num <= maxevents); + +- return (event_kernel_num < 0) ? event_kernel_num : event_num; ++ return event_num; + } + +-static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *weakup) ++int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout) + { +- int32_t stack_id = 0; +- int32_t stack_count[PROTOCOL_STACK_MAX] = {0}; ++ /* avoid the starvation of epoll events from both netstack */ ++ maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents); + +- if (weakup->event_ring == NULL) { +- weakup->event_ring = create_ring("POLL_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid()); +- if (weakup->event_ring == NULL) { +- GAZELLE_RETURN(ENOMEM); +- } ++ struct lwip_sock *sock = get_socket_by_fd(epfd); ++ if (sock == NULL) { ++ GAZELLE_RETURN(EINVAL); ++ } + +- weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid()); +- if (weakup->self_ring == NULL) { +- GAZELLE_RETURN(ENOMEM); +- } ++ if (sock->wakeup == NULL) { ++ return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); ++ } ++ ++ return get_event(sock->wakeup, epfd, events, maxevents, timeout); ++} ++ ++static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeup) ++{ ++ int32_t stack_count[PROTOCOL_STACK_MAX] = {0}; ++ ++ if (!wakeup->init) { ++ wakeup->init = true; ++ sem_init(&wakeup->event_sem, 0, 0); ++ } else { ++ while (sem_trywait(&wakeup->event_sem) == 0) {} + } + + for (uint32_t i = 0; i < nfds; i++) { +@@ -432,51 +473,33 @@ static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *we + break; + } + sock->epoll_events = fds[i].events | POLLERR; +- sock->weakup = weakup; +- +- raise_pending_events(sock); +- +- stack_count[sock->stack->queue_id]++; ++ sock->wakeup = wakeup; + + /* listenfd list */ + fd = sock->nextfd; ++ stack_count[sock->stack->queue_id]++; + } while (fd > 0); + } + +- for (uint32_t i = 0; i < get_protocol_stack_group()->stack_num; i++) { +- if (stack_count[i] > stack_count[stack_id]) { +- stack_id = i; +- } +- } +- +- bind_to_stack_numa(stack_id); +- +- return 0; +-} +- +-int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout) +-{ +- /* avoid the starvation of epoll events from both netstack */ +- maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents); +- +- struct lwip_sock *sock = get_socket_by_fd(epfd); +- if (sock == NULL) { +- GAZELLE_RETURN(EINVAL); ++ if (wakeup->bind_stack) { ++ return; + } + +- if (sock->weakup == NULL) { +- return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); ++ struct protocol_stack_group *stack_group = get_protocol_stack_group(); ++ uint32_t bind_id = 0; ++ for (uint32_t i = 0; i < stack_group->stack_num; i++) { ++ if (stack_count[i] > stack_count[bind_id]) { ++ bind_id = i; ++ } + } + +- return poll_event(sock->weakup, epfd, events, maxevents, timeout); ++ bind_to_stack_numa(stack_group->stacks[bind_id]); ++ wakeup->bind_stack = stack_group->stacks[bind_id]; + } + + int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout) + { +- int32_t ret = poll_init(fds, nfds, &g_weakup_poll); +- if (ret != 0) { +- return -1; +- } ++ poll_init(fds, nfds, &g_wakeup_poll); + +- return poll_event(&g_weakup_poll, -1, fds, nfds, timeout); ++ return get_event(&g_wakeup_poll, -1, fds, nfds, timeout); + } +diff --git a/src/lstack/api/lstack_signal.c b/src/lstack/api/lstack_signal.c +index 5e4af56..f4763e8 100644 +--- a/src/lstack/api/lstack_signal.c ++++ b/src/lstack/api/lstack_signal.c +@@ -16,6 +16,7 @@ + #include + #include + #include ++#include + + #include "lstack_log.h" + +diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c +index 53712a8..13086a3 100644 +--- a/src/lstack/core/lstack_cfg.c ++++ b/src/lstack/core/lstack_cfg.c +@@ -33,14 +33,14 @@ + #include "gazelle_reg_msg.h" + #include "lstack_log.h" + #include "gazelle_base_func.h" +-#include "gazelle_parse_config.h" + #include "lstack_protocol_stack.h" ++#include "gazelle_parse_config.h" + + #define DEFAULT_CONF_FILE "/etc/gazelle/lstack.conf" + #define LSTACK_CONF_ENV "LSTACK_CONF_PATH" + #define NUMA_CPULIST_PATH "/sys/devices/system/node/node%u/cpulist" + #define DEV_MAC_LEN 17 +-#define CPUS_RANGE_NUM 32 ++#define CPUS_MAX_NUM 256 + + static struct cfg_params g_config_params; + +@@ -50,7 +50,7 @@ static int32_t parse_host_addr(void); + static int32_t parse_low_power_mode(void); + static int32_t parse_stack_cpu_number(void); + static int32_t parse_use_ltran(void); +-static int32_t parse_weakup_cpu_number(void); ++static int32_t parse_wakeup_cpu_number(void); + static int32_t parse_mask_addr(void); + static int32_t parse_devices(void); + static int32_t parse_dpdk_args(void); +@@ -70,7 +70,7 @@ static struct config_vector_t g_config_tbl[] = { + { "devices", parse_devices }, + { "dpdk_args", parse_dpdk_args }, + { "num_cpus", parse_stack_cpu_number }, +- { "num_wakeup", parse_weakup_cpu_number }, ++ { "num_wakeup", parse_wakeup_cpu_number }, + { "low_power_mode", parse_low_power_mode }, + { "kni_switch", parse_kni_switch }, + { NULL, NULL } +@@ -240,7 +240,6 @@ static int32_t parse_stack_cpu_number(void) + } + + g_config_params.num_cpu = cnt; +- get_protocol_stack_group()->stack_num = g_config_params.num_cpu; + + return 0; + } +@@ -275,10 +274,10 @@ static int32_t numa_to_cpusnum(unsigned socket_id, uint32_t *cpulist, int32_t nu + + static int32_t stack_idle_cpuset(struct protocol_stack *stack, cpu_set_t *exclude) + { +- uint32_t cpulist[CPUS_RANGE_NUM]; ++ uint32_t cpulist[CPUS_MAX_NUM]; + +- int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_RANGE_NUM); +- if (cpunum <= 0 ) { ++ int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_MAX_NUM); ++ if (cpunum <= 0) { + LSTACK_LOG(ERR, LSTACK, "numa_to_cpusnum failed\n"); + return -1; + } +@@ -308,7 +307,7 @@ int32_t init_stack_numa_cpuset(void) + CPU_SET(cfg->cpus[idx], &stack_cpuset); + } + for (int32_t idx = 0; idx < cfg->num_wakeup; ++idx) { +- CPU_SET(cfg->weakup[idx], &stack_cpuset); ++ CPU_SET(cfg->wakeup[idx], &stack_cpuset); + } + + for (int32_t idx = 0; idx < stack_group->stack_num; ++idx) { +@@ -621,7 +620,7 @@ static int32_t parse_low_power_mode(void) + return 0; + } + +-static int32_t parse_weakup_cpu_number(void) ++static int32_t parse_wakeup_cpu_number(void) + { + const config_setting_t *cfg_args = NULL; + const char *args = NULL; +@@ -639,13 +638,19 @@ static int32_t parse_weakup_cpu_number(void) + } + + char *tmp_arg = strdup(args); +- int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.weakup, CFG_MAX_CPUS); ++ int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.wakeup, CFG_MAX_CPUS); + free(tmp_arg); + if (cnt <= 0 || cnt > CFG_MAX_CPUS) { + return -EINVAL; + } + g_config_params.num_wakeup = cnt; + ++ if (g_config_params.num_wakeup < g_config_params.num_cpu) { ++ LSTACK_PRE_LOG(LSTACK_ERR, "num_wakeup=%d less than num_stack_cpu=%d.\n", g_config_params.num_wakeup, ++ g_config_params.num_cpu); ++ return -EINVAL; ++ } ++ + return 0; + } + +diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c +index 430c6e5..3f446ea 100644 +--- a/src/lstack/core/lstack_dpdk.c ++++ b/src/lstack/core/lstack_dpdk.c +@@ -28,6 +28,7 @@ + #include + #include + #include ++#include + + #include "lstack_log.h" + #include "dpdk_common.h" +@@ -109,35 +110,39 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_ + char pool_name[PATH_MAX]; + struct rte_mempool *pool; + +- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id); ++ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id); + if (ret < 0) { + return NULL; + } + + /* time stamp before pbuf_custom as priv_data */ ++ pthread_mutex_lock(get_mem_mutex()); + pool = rte_pktmbuf_pool_create(pool_name, nb_mbuf, mbuf_cache_size, + sizeof(struct pbuf_custom) + GAZELLE_MBUFF_PRIV_SIZE, MBUF_SZ, rte_socket_id()); + if (pool == NULL) { + LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno); + } ++ pthread_mutex_unlock(get_mem_mutex()); + return pool; + } + +-static struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id) ++struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id) + { + char pool_name[PATH_MAX]; + struct rte_mempool *pool; + int32_t ret; + +- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id); ++ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id); + if (ret < 0) { + return NULL; + } ++ pthread_mutex_lock(get_mem_mutex()); + pool = rte_mempool_create(pool_name, CALL_POOL_SZ, sizeof(struct rpc_msg), 0, 0, NULL, NULL, NULL, + NULL, rte_socket_id(), 0); + if (pool == NULL) { + LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno); + } ++ pthread_mutex_unlock(get_mem_mutex()); + return pool; + } + +@@ -147,7 +152,7 @@ static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_ + char pool_name[PATH_MAX]; + struct reg_ring_msg *reg_buf; + +- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id); ++ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id); + if (ret < 0) { + return NULL; + } +@@ -167,21 +172,18 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num) + return -1; + } + +- stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, 0, stack->queue_id); ++ stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, RX_MBUF_CACHE_SZ, ++ stack->queue_id); + if (stack->rx_pktmbuf_pool == NULL) { + return -1; + } + +- stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, 0, stack->queue_id); ++ stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, TX_MBUF_CACHE_SZ, ++ stack->queue_id); + if (stack->tx_pktmbuf_pool == NULL) { + return -1; + } + +- stack->rpc_pool = create_rpc_mempool("rpc_msg", stack->queue_id); +- if (stack->rpc_pool == NULL) { +- return -1; +- } +- + if (use_ltran()) { + stack->reg_buf = create_reg_mempool("reg_ring_msg", stack->queue_id); + if (stack->reg_buf == NULL) { +@@ -214,16 +216,12 @@ int32_t create_shared_ring(struct protocol_stack *stack) + { + lockless_queue_init(&stack->rpc_queue); + +- stack->weakup_ring = create_ring("SHARED_WEAKUP_RING", VDEV_WEAKUP_QUEUE_SZ, 0, stack->queue_id); +- if (stack->weakup_ring == NULL) { +- return -1; +- } +- +- stack->send_idle_ring = create_ring("SEND_IDLE_RING", VDEV_IDLE_QUEUE_SZ, 0, stack->queue_id); +- if (stack->send_idle_ring == NULL) { +- return -1; ++ if (get_protocol_stack_group()->wakeup_enable) { ++ stack->wakeup_ring = create_ring("WAKEUP_RING", VDEV_WAKEUP_QUEUE_SZ, 0, stack->queue_id); ++ if (stack->wakeup_ring == NULL) { ++ return -1; ++ } + } +- stack->in_replenish = 0; + + if (use_ltran()) { + stack->rx_ring = create_ring("RING_RX", VDEV_RX_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ, stack->queue_id); +@@ -328,8 +326,19 @@ static struct eth_params *alloc_eth_params(uint16_t port_id, uint16_t nb_queues) + return eth_params; + } + ++uint64_t get_eth_params_rx_ol(void) ++{ ++ return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.rxmode.offloads; ++} ++ ++uint64_t get_eth_params_tx_ol(void) ++{ ++ return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.txmode.offloads; ++} ++ + static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev_info) + { ++#if CHECKSUM_OFFLOAD_ALL + uint64_t rx_ol = 0; + uint64_t tx_ol = 0; + +@@ -337,43 +346,48 @@ static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_inf + uint64_t tx_ol_capa = dev_info->tx_offload_capa; + + // rx ip +- if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) { + #if CHECKSUM_CHECK_IP_HW ++ if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) { + rx_ol |= DEV_RX_OFFLOAD_IPV4_CKSUM; + LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_IPV4_CKSUM\n"); +-#endif + } ++#endif + + // rx tcp +- if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) { + #if CHECKSUM_CHECK_TCP_HW ++ if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) { + rx_ol |= DEV_RX_OFFLOAD_TCP_CKSUM; + LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_TCP_CKSUM\n"); +-#endif + } ++#endif + + // tx ip +- if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) { + #if CHECKSUM_GEN_IP_HW ++ if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) { + tx_ol |= DEV_TX_OFFLOAD_IPV4_CKSUM; + LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_IPV4_CKSUM\n"); +-#endif + } ++#endif + + // tx tcp +- if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) { + #if CHECKSUM_GEN_TCP_HW ++ if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) { + tx_ol |= DEV_TX_OFFLOAD_TCP_CKSUM; + LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_TCP_CKSUM\n"); ++ } + #endif ++ if (!(rx_ol & DEV_RX_OFFLOAD_TCP_CKSUM) || !(rx_ol & DEV_RX_OFFLOAD_IPV4_CKSUM)) { ++ rx_ol = 0; ++ } ++ if (!(tx_ol & DEV_TX_OFFLOAD_TCP_CKSUM) || !(tx_ol & DEV_TX_OFFLOAD_IPV4_CKSUM)) { ++ tx_ol = 0; + } + + conf->rxmode.offloads = rx_ol; + conf->txmode.offloads = tx_ol; + +-#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW || CHECKSUM_GEN_IP_HW || CHECKSUM_GEN_TCP_HW + LSTACK_LOG(INFO, LSTACK, "set checksum offloads\n"); +-#endif ++#endif /* CHECKSUM_OFFLOAD_ALL */ + + return 0; + } +@@ -580,3 +594,30 @@ void dpdk_skip_nic_init(void) + } + } + ++int32_t init_dpdk_ethdev(void) ++{ ++ int32_t ret; ++ ++ ret = dpdk_ethdev_init(); ++ if (ret != 0) { ++ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n"); ++ return -1; ++ } ++ ++ ret = dpdk_ethdev_start(); ++ if (ret < 0) { ++ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n"); ++ return -1; ++ } ++ ++ if (get_global_cfg_params()->kni_switch) { ++ ret = dpdk_init_lstack_kni(); ++ if (ret < 0) { ++ return -1; ++ } ++ } ++ ++ struct protocol_stack_group *stack_group = get_protocol_stack_group(); ++ sem_post(&stack_group->ethdev_init); ++ return 0; ++} +diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c +index 17195c8..774d0f3 100644 +--- a/src/lstack/core/lstack_init.c ++++ b/src/lstack/core/lstack_init.c +@@ -30,6 +30,7 @@ + #include + #include + #include ++#include + + #include "lstack_cfg.h" + #include "lstack_control_plane.h" +@@ -225,16 +226,18 @@ __attribute__((constructor)) void gazelle_network_init(void) + + lstack_log_level_init(); + +- /* +- * Phase 8: memory and nic */ + ret = init_protocol_stack(); + if (ret != 0) { + LSTACK_EXIT(1, "init_protocol_stack failed\n"); + } + +- ret = create_stack_thread(); +- if (ret != 0) { +- LSTACK_EXIT(1, "create_stack_thread failed\n"); ++ /* ++ * Phase 8: nic */ ++ if (!use_ltran()) { ++ ret = init_dpdk_ethdev(); ++ if (ret != 0) { ++ LSTACK_EXIT(1, "init_dpdk_ethdev failed\n"); ++ } + } + + /* +diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c +index b4d75d2..887464d 100644 +--- a/src/lstack/core/lstack_lwip.c ++++ b/src/lstack/core/lstack_lwip.c +@@ -17,6 +17,7 @@ + #include + #include + #include ++#include + #include + #include + #include +@@ -25,7 +26,6 @@ + #include "lstack_ethdev.h" + #include "lstack_protocol_stack.h" + #include "lstack_log.h" +-#include "lstack_weakup.h" + #include "lstack_dpdk.h" + #include "lstack_stack_stat.h" + #include "lstack_lwip.h" +@@ -49,37 +49,82 @@ void listen_list_add_node(int32_t head_fd, int32_t add_fd) + sock->nextfd = add_fd; + } + ++static void free_ring_pbuf(struct rte_ring *ring) ++{ ++ while (1) { ++ struct pbuf *pbuf = NULL; ++ int32_t ret = rte_ring_sc_dequeue(ring, (void **)&pbuf); ++ if (ret != 0) { ++ break; ++ } ++ ++ pbuf_free(pbuf); ++ } ++} ++ + static void reset_sock_data(struct lwip_sock *sock) + { + /* check null pointer in ring_free func */ + if (sock->recv_ring) { ++ free_ring_pbuf(sock->recv_ring); + rte_ring_free(sock->recv_ring); + } + sock->recv_ring = NULL; + ++ if (sock->recv_wait_free) { ++ free_ring_pbuf(sock->recv_wait_free); ++ rte_ring_free(sock->recv_wait_free); ++ } ++ sock->recv_wait_free = NULL; ++ + if (sock->send_ring) { ++ free_ring_pbuf(sock->send_ring); + rte_ring_free(sock->send_ring); + } + sock->send_ring = NULL; + ++ if (sock->send_idle_ring) { ++ free_ring_pbuf(sock->send_idle_ring); ++ rte_ring_free(sock->send_idle_ring); ++ } ++ sock->send_idle_ring = NULL; ++ + sock->stack = NULL; +- sock->weakup = NULL; ++ sock->wakeup = NULL; + sock->events = 0; + sock->nextfd = -1; + sock->attach_fd = -1; + sock->wait_close = false; +- sock->have_event = false; +- sock->have_rpc_send = false; + sock->shadowed_sock = NULL; ++ sock->epoll_events = 0; ++ sock->events = 0; + + if (sock->recv_lastdata) { + pbuf_free(sock->recv_lastdata); +- sock->recv_lastdata = NULL; + } ++ sock->recv_lastdata = NULL; + + if (sock->send_lastdata) { + pbuf_free(sock->send_lastdata); +- sock->send_lastdata = NULL; ++ } ++ sock->send_lastdata = NULL; ++} ++ ++static void replenish_send_idlembuf(struct rte_ring *ring) ++{ ++ uint32_t replenish_cnt = rte_ring_free_count(ring); ++ ++ for (uint32_t i = 0; i < replenish_cnt; i++) { ++ struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM); ++ if (pbuf == NULL) { ++ break; ++ } ++ ++ int32_t ret = rte_ring_sp_enqueue(ring, (void *)pbuf); ++ if (ret < 0) { ++ pbuf_free(pbuf); ++ break; ++ } + } + } + +@@ -99,19 +144,31 @@ void gazelle_init_sock(int32_t fd) + return; + } + ++ sock->recv_wait_free = create_ring("wait_free", SOCK_RECV_RING_SIZE, 0, name_tick++); ++ if (sock->recv_wait_free == NULL) { ++ LSTACK_LOG(ERR, LSTACK, "wait_free create failed. errno: %d.\n", rte_errno); ++ return; ++ } ++ + sock->send_ring = create_ring("sock_send", SOCK_SEND_RING_SIZE, 0, name_tick++); + if (sock->send_ring == NULL) { + LSTACK_LOG(ERR, LSTACK, "sock_send create failed. errno: %d.\n", rte_errno); + return; + } + ++ sock->send_idle_ring = create_ring("idle_send", SOCK_SEND_RING_SIZE, 0, name_tick++); ++ if (sock->send_idle_ring == NULL) { ++ LSTACK_LOG(ERR, LSTACK, "idle_send create failed. errno: %d.\n", rte_errno); ++ return; ++ } ++ replenish_send_idlembuf(sock->send_idle_ring); ++ + sock->stack = get_protocol_stack(); + sock->stack->conn_num++; + init_list_node(&sock->recv_list); + init_list_node(&sock->attach_list); + init_list_node(&sock->listen_list); + init_list_node(&sock->event_list); +- init_list_node(&sock->wakeup_list); + init_list_node(&sock->send_list); + } + +@@ -129,7 +186,9 @@ void gazelle_clean_sock(int32_t fd) + list_del_node_init(&sock->recv_list); + list_del_node_init(&sock->attach_list); + list_del_node_init(&sock->listen_list); ++#ifdef GAZELLE_USE_EPOLL_EVENT_STACK + list_del_node_init(&sock->event_list); ++#endif + list_del_node_init(&sock->send_list); + } + +@@ -206,101 +265,60 @@ struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type) + void *data = rte_pktmbuf_mtod(mbuf, void *); + struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ); + +- return pbuf; +-} +- +-void stack_replenish_send_idlembuf(struct protocol_stack *stack) +-{ +- uint32_t replenish_cnt = rte_ring_free_count(stack->send_idle_ring); +- +- for (uint32_t i = 0; i < replenish_cnt; i++) { +- struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM); +- if (pbuf == NULL) { +- break; +- } +- +- int32_t ret = rte_ring_sp_enqueue(stack->send_idle_ring, (void *)pbuf); +- if (ret < 0) { +- gazelle_free_pbuf(pbuf); +- break; +- } ++#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW ++ if (pbuf) { ++ pbuf->ol_flags = 0; ++ pbuf->l2_len = 0; ++ pbuf->l3_len = 0; + } ++#endif ++ ++ return pbuf; + } + +-ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags) ++struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags) + { + struct pbuf *pbuf = NULL; +- ssize_t send_ret = 0; +- ssize_t send_len = 0; + +- do { +- if (sock->send_lastdata) { +- pbuf = sock->send_lastdata; +- sock->send_lastdata = NULL; +- } else { +- int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf); +- if (ret != 0) { +- break; +- } +- } +- +- if (sock->conn == NULL || sock->conn->pcb.tcp == NULL) { +- GAZELLE_RETURN(ENOENT); +- } +- +- uint16_t available = tcp_sndbuf(sock->conn->pcb.tcp); +- if (available < pbuf->tot_len) { +- sock->send_lastdata = pbuf; +- break; +- } +- +- ssize_t pbuf_len = pbuf->tot_len; +- send_ret = lwip_send(fd, pbuf, pbuf->tot_len, flags); +- if (send_ret > 0) { +- send_len += send_ret; +- } +- if (send_ret != pbuf_len) { +- sock->stack->stats.write_lwip_drop++; +- break; ++ if (sock->send_lastdata) { ++ pbuf = sock->send_lastdata; ++ sock->send_lastdata = NULL; ++ } else { ++ int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf); ++ if (ret != 0) { ++ *apiflags &= ~TCP_WRITE_FLAG_MORE; ++ return NULL; + } ++ } + +- sock->stack->stats.write_lwip_cnt++; +- } while (1); +- +- return (send_ret < 0) ? send_ret : send_len; +-} +- +-void add_self_event(struct lwip_sock *sock, uint32_t events) +-{ +- struct weakup_poll *wakeup = sock->weakup; +- struct protocol_stack *stack = sock->stack; +- if (wakeup == NULL || stack == NULL) { +- return; ++ if (pbuf->tot_len >= remain_size) { ++ sock->send_lastdata = pbuf; ++ *apiflags |= TCP_WRITE_FLAG_MORE; /* set TCP_PSH flag */ ++ return NULL; + } + +- sock->events |= events; ++ replenish_send_idlembuf(sock->send_idle_ring); + +- if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) { +- return; ++ if ((sock->epoll_events & EPOLLOUT) && rte_ring_free_count(sock->send_ring)) { ++ add_epoll_event(sock->conn, EPOLLOUT); + } + +- if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) { +- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); +- sem_post(&sock->weakup->event_sem); +- stack->stats.epoll_self_event++; +- } else { +- rpc_call_addevent(stack, sock); +- stack->stats.epoll_self_call++; +- } ++ sock->stack->stats.write_lwip_cnt++; ++ return pbuf; + } + + ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) + { ++ if (sock->events & EPOLLERR) { ++ return 0; ++ } ++ + uint32_t free_count = rte_ring_free_count(sock->send_ring); + if (free_count == 0) { +- GAZELLE_RETURN(EAGAIN); ++ return -1; + } +- uint32_t avaible_cont = rte_ring_count(sock->stack->send_idle_ring); ++ ++ uint32_t avaible_cont = rte_ring_count(sock->send_idle_ring); + avaible_cont = LWIP_MIN(free_count, avaible_cont); + + struct pbuf *pbuf = NULL; +@@ -309,7 +327,7 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) + uint32_t send_pkt = 0; + + while (send_len < len && send_pkt < avaible_cont) { +- int32_t ret = rte_ring_sc_dequeue(sock->stack->send_idle_ring, (void **)&pbuf); ++ int32_t ret = rte_ring_sc_dequeue(sock->send_idle_ring, (void **)&pbuf); + if (ret < 0) { + sock->stack->stats.app_write_idlefail++; + break; +@@ -322,28 +340,16 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) + ret = rte_ring_sp_enqueue(sock->send_ring, pbuf); + if (ret != 0) { + sock->stack->stats.app_write_drop++; +- gazelle_free_pbuf(pbuf); ++ pbuf_free(pbuf); + break; + } + +- sock->stack->stats.app_write_cnt++; + send_len += copy_len; + send_pkt++; + } ++ __sync_fetch_and_add(&sock->stack->stats.app_write_cnt, send_pkt); + +- if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) { +- add_self_event(sock, EPOLLOUT); +- sock->stack->stats.write_events++; +- } else { +- sock->events &= ~EPOLLOUT; +- } +- +- if (rte_ring_free_count(sock->stack->send_idle_ring) > USED_IDLE_WATERMARK && !sock->stack->in_replenish) { +- sock->stack->in_replenish = true; +- rpc_call_replenish_idlembuf(sock->stack); +- } +- +- return send_len; ++ return (send_len <= 0) ? -1 : send_len; + } + + void stack_send(struct rpc_msg *msg) +@@ -351,27 +357,62 @@ void stack_send(struct rpc_msg *msg) + int32_t fd = msg->args[MSG_ARG_0].i; + int32_t flags = msg->args[MSG_ARG_2].i; + +- struct protocol_stack *stack = get_protocol_stack(); + struct lwip_sock *sock = get_socket(fd); + if (sock == NULL) { + msg->result = -1; + return; + } + +- msg->result = write_lwip_data(sock, fd, flags); +- __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE); ++ if (!NETCONN_IS_DATAOUT(sock)) { ++ return; ++ } + +- if (msg->result >= 0 && +- (rte_ring_count(sock->send_ring) || sock->send_lastdata)) { ++ /* send all send_ring, so len set lwip send max. */ ++ ssize_t len = lwip_send(fd, sock, UINT16_MAX, flags); ++ if (len == 0) { ++ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */ ++ add_epoll_event(sock->conn, EPOLLERR); ++ } ++ ++ /* have remain data add sendlist */ ++ if (NETCONN_IS_DATAOUT(sock)) { + if (list_is_empty(&sock->send_list)) { +- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); +- list_add_node(&stack->send_list, &sock->send_list); +- sock->stack->stats.send_self_rpc++; ++ sock->send_flags = flags; ++ list_add_node(&sock->stack->send_list, &sock->send_list); + } ++ sock->stack->stats.send_self_rpc++; + } ++} + +- if (rte_ring_free_count(sock->send_ring)) { +- add_epoll_event(sock->conn, EPOLLOUT); ++void send_stack_list(struct protocol_stack *stack, uint32_t send_max) ++{ ++ struct list_node *node, *temp; ++ struct lwip_sock *sock; ++ uint32_t read_num = 0; ++ ++ list_for_each_safe(node, temp, &stack->send_list) { ++ sock = container_of(node, struct lwip_sock, send_list); ++ ++ if (sock->conn == NULL || !NETCONN_IS_DATAOUT(sock)) { ++ list_del_node_init(&sock->send_list); ++ continue; ++ } ++ ++ /* send all send_ring, so len set lwip send max. */ ++ ssize_t len = lwip_send(sock->conn->socket, sock, UINT16_MAX, sock->send_flags); ++ if (len == 0) { ++ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */ ++ add_epoll_event(sock->conn, EPOLLERR); ++ list_del_node_init(&sock->send_list); ++ } ++ ++ if (!NETCONN_IS_DATAOUT(sock)) { ++ list_del_node_init(&sock->send_list); ++ } ++ ++ if (++read_num >= send_max) { ++ break; ++ } + } + } + +@@ -381,6 +422,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) + return 0; + } + ++ if (rte_ring_count(sock->recv_wait_free)) { ++ free_ring_pbuf(sock->recv_wait_free); ++ } ++ + uint32_t free_count = rte_ring_free_count(sock->recv_ring); + uint32_t data_count = rte_ring_count(sock->conn->recvmbox->ring); + uint32_t read_max = LWIP_MIN(free_count, data_count); +@@ -411,6 +456,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) + read_count++; + } + ++ if (get_protocol_stack_group()->latency_start) { ++ calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_LWIP); ++ } ++ + recv_len += pbuf->len; + + /* once we have some data to return, only add more if we don't need to wait */ +@@ -425,6 +474,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) + add_epoll_event(sock->conn, EPOLLIN); + } + sock->stack->stats.read_lwip_cnt += read_count; ++ ++ if (recv_len == 0) { ++ GAZELLE_RETURN(EAGAIN); ++ } + return recv_len; + } + +@@ -440,7 +493,7 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags) + if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) || + ((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) || + ((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) { +- GAZELLE_RETURN(EINVAL); ++ GAZELLE_RETURN(EINVAL); + } + buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len); + } +@@ -479,16 +532,14 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags) + + sock->send_flags = flags; + ssize_t send = write_stack_data(sock, buf, len); +- +- ssize_t ret = 0; +- if (!__atomic_load_n(&sock->have_rpc_send, __ATOMIC_ACQUIRE)) { +- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); +- ret = rpc_call_send(fd, buf, len, flags); +- } +- +- if (send <= 0 || ret < 0) { ++ if (send < 0) { + GAZELLE_RETURN(EAGAIN); ++ } else if (send == 0) { ++ return 0; + } ++ rte_smp_mb(); ++ ++ rpc_call_send(fd, NULL, send, flags); + return send; + } + +@@ -505,7 +556,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags) + if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) || + ((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) || + ((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) { +- GAZELLE_RETURN(EINVAL); ++ GAZELLE_RETURN(EINVAL); + } + buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len); + } +@@ -513,7 +564,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags) + for (i = 0; i < message->msg_iovlen; i++) { + ret = gazelle_send(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags); + if (ret < 0) { +- return buflen == 0 ? ret : buflen; ++ return buflen == 0 ? ret : buflen; + } + buflen += ret; + } +@@ -536,6 +587,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) + } + sock->recv_flags = flags; + ++ if ((sock->events & EPOLLERR) && !NETCONN_IS_DATAIN(sock)) { ++ return 0; ++ } ++ + while (recv_left > 0) { + if (sock->recv_lastdata) { + pbuf = sock->recv_lastdata; +@@ -556,22 +611,18 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) + if (pbuf->tot_len > copy_len) { + sock->recv_lastdata = pbuf_free_header(pbuf, copy_len); + } else { +- pbuf_free(pbuf); +- sock->recv_lastdata = NULL; +- sock->stack->stats.app_read_cnt++; + if (get_protocol_stack_group()->latency_start) { + calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ); + } ++ ret = rte_ring_sp_enqueue(sock->recv_wait_free, pbuf); ++ if (ret != 0) { ++ pbuf_free(pbuf); ++ } ++ sock->recv_lastdata = NULL; ++ __sync_fetch_and_add(&sock->stack->stats.app_read_cnt, 1); + } + } + +- if ((sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) { +- add_self_event(sock, EPOLLIN); +- sock->stack->stats.read_events++; +- } else { +- sock->events &= ~EPOLLIN; +- } +- + if (recvd == 0) { + sock->stack->stats.read_null++; + GAZELLE_RETURN(EAGAIN); +@@ -588,30 +639,32 @@ void add_recv_list(int32_t fd) + } + } + +-void read_recv_list(void) ++void read_recv_list(struct protocol_stack *stack, uint32_t max_num) + { +- struct protocol_stack *stack = get_protocol_stack(); + struct list_node *list = &(stack->recv_list); + struct list_node *node, *temp; + struct lwip_sock *sock; +- struct lwip_sock *first_sock = NULL; ++ uint32_t read_num = 0; + + list_for_each_safe(node, temp, list) { + sock = container_of(node, struct lwip_sock, recv_list); + +- /* when read_lwip_data have data wait to read, add sock into recv_list. read_recv_list read this sock again. +- this is dead loop. so every sock just read one time */ +- if (sock == first_sock) { +- break; +- } +- if (first_sock == NULL) { +- first_sock = sock; ++ if (sock->conn == NULL || sock->recv_ring == NULL || sock->send_ring == NULL || sock->conn->pcb.tcp == NULL) { ++ list_del_node_init(&sock->recv_list); ++ continue; + } + +- /* recv_ring and send_ring maybe create fail, so check here */ +- if (sock->conn && sock->recv_ring && sock->send_ring && rte_ring_free_count(sock->recv_ring)) { ++ if (rte_ring_free_count(sock->recv_ring)) { + list_del_node_init(&sock->recv_list); +- lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags); ++ ssize_t len = lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags); ++ if (len == 0) { ++ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */ ++ add_epoll_event(sock->conn, EPOLLERR); ++ } ++ } ++ ++ if (++read_num >= max_num) { ++ break; + } + } + } +@@ -633,11 +686,13 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s + struct lwip_sock *sock = get_socket(netconn->socket); + if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) { + conn->recv_ring_cnt = rte_ring_count(sock->recv_ring); ++ conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0; ++ + conn->send_ring_cnt = rte_ring_count(sock->send_ring); +- struct weakup_poll *weakup = sock->weakup; +- if (weakup) { +- conn->event_ring_cnt = rte_ring_count(weakup->event_ring); +- conn->self_ring_cnt = rte_ring_count(weakup->self_ring); ++ conn->send_ring_cnt += (sock->send_lastdata) ? 1 : 0; ++ ++ if (sock->wakeup) { ++ sem_getvalue(&sock->wakeup->event_sem, &conn->sem_cnt); + } + } + } +@@ -786,11 +841,6 @@ static uint32_t get_list_count(struct list_node *list) + return count; + } + +-void stack_wakeuplist_count(struct rpc_msg *msg) +-{ +- msg->result = get_list_count(get_protocol_stack()->wakeup_list); +-} +- + void stack_eventlist_count(struct rpc_msg *msg) + { + msg->result = get_list_count(&get_protocol_stack()->event_list); +diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c +index e5761a4..da320e2 100644 +--- a/src/lstack/core/lstack_protocol_stack.c ++++ b/src/lstack/core/lstack_protocol_stack.c +@@ -17,6 +17,7 @@ + #include + #include + #include ++#include + #include + #include + #include +@@ -28,29 +29,34 @@ + #include "lstack_lwip.h" + #include "lstack_protocol_stack.h" + #include "lstack_cfg.h" +-#include "lstack_weakup.h" + #include "lstack_control_plane.h" + #include "lstack_stack_stat.h" + ++#define READ_LIST_MAX 32 ++#define SEND_LIST_MAX 32 ++#define HANDLE_RPC_MSG_MAX 32 ++ + static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX; + static struct protocol_stack_group g_stack_group = {0}; + static PER_THREAD long g_stack_tid = 0; ++static pthread_mutex_t g_mem_mutex = PTHREAD_MUTEX_INITIALIZER; + + typedef void *(*stack_thread_func)(void *arg); + +-int32_t bind_to_stack_numa(int32_t stack_id) ++#ifdef GAZELLE_USE_EPOLL_EVENT_STACK ++void update_stack_events(struct protocol_stack *stack); ++#endif ++ ++pthread_mutex_t *get_mem_mutex(void) + { +- static PER_THREAD int32_t last_stack_id = -1; ++ return &g_mem_mutex; ++} + ++int32_t bind_to_stack_numa(struct protocol_stack *stack) ++{ + int32_t ret; +- struct protocol_stack *stack = get_protocol_stack_group()->stacks[stack_id]; + pthread_t tid = pthread_self(); + +- if (last_stack_id == stack_id) { +- return 0; +- } +- last_stack_id = stack_id; +- + ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %d failed\n", rte_gettid(), stack->queue_id); +@@ -159,88 +165,27 @@ void lstack_low_power_idling(void) + } + } + +-int32_t init_protocol_stack(void) ++static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_func func) + { +- struct protocol_stack_group *stack_group = get_protocol_stack_group(); +- struct protocol_stack *stack = NULL; ++ /* thread may run slow, if arg is temp var maybe have relese */ ++ static uint16_t queue[PROTOCOL_STACK_MAX]; ++ char name[PATH_MAX]; ++ pthread_t tid; + int32_t ret; + +- for (uint32_t i = 0; i < stack_group->stack_num; i++) { +- stack = malloc(sizeof(*stack)); +- if (stack == NULL) { +- return -ENOMEM; +- } +- memset_s(stack, sizeof(*stack), 0, sizeof(*stack)); +- +- stack->queue_id = i; +- stack->port_id = stack_group->port_id; +- stack->cpu_id = get_global_cfg_params()->cpus[i]; +- stack->socket_id = numa_node_of_cpu(stack->cpu_id); +- if (stack->socket_id < 0) { +- LSTACK_LOG(ERR, PORT, "numa_node_of_cpu failed\n"); +- return -EINVAL; +- } +- +- ret = pktmbuf_pool_init(stack, stack_group->stack_num); +- if (ret != 0) { +- return ret; +- } +- +- ret = create_shared_ring(stack); +- if (ret != 0) { +- return ret; +- } +- +- init_list_node(&stack->recv_list); +- init_list_node(&stack->listen_list); +- init_list_node(&stack->event_list); +- init_list_node(&stack->send_list); +- +- stack_group->stacks[i] = stack; +- } +- +- ret = init_stack_numa_cpuset(); +- if (ret < 0) { ++ if (queue_id >= PROTOCOL_STACK_MAX) { ++ LSTACK_LOG(ERR, LSTACK, "queue_id is %d exceed max=%d\n", queue_id, PROTOCOL_STACK_MAX); + return -1; + } ++ queue[queue_id] = queue_id; + +- if (!use_ltran()) { +- ret = dpdk_ethdev_init(); +- if (ret != 0) { +- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n"); +- return -1; +- } +- +- ret = dpdk_ethdev_start(); +- if (ret < 0) { +- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n"); +- return -1; +- } +- +- if (get_global_cfg_params()->kni_switch) { +- ret = dpdk_init_lstack_kni(); +- if (ret < 0) { +- return -1; +- } +- } +- } +- +- return 0; +-} +- +-static int32_t create_thread(struct protocol_stack *stack, char *thread_name, stack_thread_func func) +-{ +- char name[PATH_MAX]; +- pthread_t tid; +- int32_t ret; +- +- ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, stack->queue_id); ++ ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, queue[queue_id]); + if (ret < 0) { + LSTACK_LOG(ERR, LSTACK, "set name failed\n"); + return -1; + } + +- ret = pthread_create(&tid, NULL, func, stack); ++ ret = pthread_create(&tid, NULL, func, &queue[queue_id]); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "pthread_create ret=%d\n", ret); + return -1; +@@ -257,148 +202,185 @@ static int32_t create_thread(struct protocol_stack *stack, char *thread_name, st + + static void* gazelle_weakup_thread(void *arg) + { +- struct protocol_stack *stack = (struct protocol_stack *)arg; +- int32_t lcore_id = get_global_cfg_params()->weakup[stack->queue_id]; ++ uint16_t queue_id = *(uint16_t *)arg; ++ struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id]; + ++ int32_t lcore_id = get_global_cfg_params()->wakeup[stack->queue_id]; + thread_affinity_init(lcore_id); +- LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id); + +- struct list_node wakeup_list; +- init_list_node(&wakeup_list); +- stack->wakeup_list = &wakeup_list; ++ LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id); + + for (;;) { +- wakeup_list_sock(&wakeup_list); ++ sem_t *event_sem; ++ if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) { ++ continue; ++ } + +- weakup_thread(stack->weakup_ring, &wakeup_list); ++ sem_post(event_sem); + } + + return NULL; + } + +-static void stack_thread_init(struct protocol_stack *stack) ++static void init_stack_value(struct protocol_stack *stack, uint16_t queue_id) + { +- uint16_t queue_id = stack->queue_id; +- int32_t ret; ++ struct protocol_stack_group *stack_group = get_protocol_stack_group(); ++ ++ memset_s(stack, sizeof(*stack), 0, sizeof(*stack)); + + set_stack_idx(queue_id); + stack->tid = gettid(); ++ stack->queue_id = queue_id; ++ stack->port_id = stack_group->port_id; ++ stack->cpu_id = get_global_cfg_params()->cpus[queue_id]; + stack->lwip_stats = &lwip_stats; +- RTE_PER_LCORE(_lcore_id) = stack->cpu_id; + +- thread_affinity_init(stack->cpu_id); ++ init_list_node(&stack->recv_list); ++ init_list_node(&stack->listen_list); ++ init_list_node(&stack->event_list); ++ init_list_node(&stack->send_list); ++ ++ pthread_spin_init(&stack->event_lock, PTHREAD_PROCESS_SHARED); + + sys_calibrate_tsc(); ++ stack_stat_init(); + +- hugepage_init(); ++ stack_group->stacks[queue_id] = stack; ++} + +- stack_replenish_send_idlembuf(stack); ++static struct protocol_stack * stack_thread_init(uint16_t queue_id) ++{ ++ struct protocol_stack_group *stack_group = get_protocol_stack_group(); + +- tcpip_init(NULL, NULL); ++ struct protocol_stack *stack = malloc(sizeof(*stack)); ++ if (stack == NULL) { ++ LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n"); ++ return NULL; ++ } ++ init_stack_value(stack, queue_id); + +- if (use_ltran()) { +- ret = client_reg_thrd_ring(); +- if (ret != 0) { +- LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret); +- } ++ cpu_set_t cpuset; ++ CPU_ZERO(&cpuset); ++ CPU_SET(stack->cpu_id, &cpuset); ++ if (rte_thread_set_affinity(&cpuset) != 0) { ++ LSTACK_LOG(ERR, LSTACK, "rte_thread_set_affinity failed\n"); ++ free(stack); ++ return NULL; + } ++ RTE_PER_LCORE(_lcore_id) = stack->cpu_id; + +- ret = ethdev_init(stack); ++ stack->socket_id = numa_node_of_cpu(stack->cpu_id); ++ if (stack->socket_id < 0) { ++ LSTACK_LOG(ERR, LSTACK, "numa_node_of_cpu failed\n"); ++ free(stack); ++ return NULL; ++ } ++ ++ int32_t ret = pktmbuf_pool_init(stack, stack_group->stack_num); + if (ret != 0) { +- LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret); ++ free(stack); ++ return NULL; + } + +- stack_stat_init(); ++ ret = create_shared_ring(stack); ++ if (ret != 0) { ++ free(stack); ++ return NULL; ++ } + +- struct protocol_stack_group *stack_group = get_protocol_stack_group(); +- sem_post(&stack_group->thread_inited); +- LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id); +-} ++ thread_affinity_init(stack->cpu_id); + +-static void report_stack_event(struct protocol_stack *stack) +-{ +- struct list_node *list = &(stack->event_list); +- struct list_node *node, *temp; +- struct lwip_sock *sock; ++ hugepage_init(); + +- list_for_each_safe(node, temp, list) { +- sock = container_of(node, struct lwip_sock, event_list); ++ tcpip_init(NULL, NULL); + +- if (weakup_enqueue(stack->weakup_ring, sock) == 0) { +- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); +- list_del_node_init(&sock->event_list); +- stack->stats.weakup_events++; +- } else { +- break; ++ if (use_ltran()) { ++ ret = client_reg_thrd_ring(); ++ if (ret != 0) { ++ free(stack); ++ return NULL; + } + } +-} +- +-static void send_stack_list(struct protocol_stack *stack) +-{ +- struct list_node *list = &(stack->send_list); +- struct list_node *node, *temp; +- struct lwip_sock *sock; + +- list_for_each_safe(node, temp, list) { +- sock = container_of(node, struct lwip_sock, send_list); ++ sem_post(&stack_group->thread_phase1); + +- if (sock->conn == NULL || sock->stack == NULL) { +- list_del_node_init(&sock->send_list); +- continue; +- } ++ int32_t sem_val; ++ do { ++ sem_getvalue(&stack_group->ethdev_init, &sem_val); ++ } while (!sem_val && !use_ltran()); + +- ssize_t ret = write_lwip_data(sock, sock->conn->socket, sock->send_flags); +- __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE); +- if (ret >= 0 && +- (rte_ring_count(sock->send_ring) || sock->send_lastdata)) { +- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); +- } else { +- list_del_node_init(&sock->send_list); +- } ++ ret = ethdev_init(stack); ++ if (ret != 0) { ++ free(stack); ++ return NULL; ++ } + +- if (rte_ring_free_count(sock->send_ring)) { +- add_epoll_event(sock->conn, EPOLLOUT); ++ if (stack_group->wakeup_enable) { ++ ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread); ++ if (ret != 0) { ++ free(stack); ++ return NULL; + } + } ++ ++ return stack; + } + + static void* gazelle_stack_thread(void *arg) + { +- struct protocol_stack *stack = (struct protocol_stack *)arg; ++ uint16_t queue_id = *(uint16_t *)arg; + +- stack_thread_init(stack); ++ struct protocol_stack *stack = stack_thread_init(queue_id); ++ if (stack == NULL) { ++ pthread_mutex_lock(&g_mem_mutex); ++ LSTACK_EXIT(1, "stack_thread_init failed\n"); ++ pthread_mutex_unlock(&g_mem_mutex); ++ } ++ LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id); + + for (;;) { +- poll_rpc_msg(stack); ++ poll_rpc_msg(stack, HANDLE_RPC_MSG_MAX); + + eth_dev_poll(); + +- read_recv_list(); ++ read_recv_list(stack, READ_LIST_MAX); + +- sys_timer_run(); ++ send_stack_list(stack, SEND_LIST_MAX); + +- report_stack_event(stack); ++ sys_timer_run(); + +- send_stack_list(stack); ++#ifdef GAZELLE_USE_EPOLL_EVENT_STACK ++ update_stack_events(stack); ++#endif + } + + return NULL; + } + +-int32_t create_stack_thread(void) ++int32_t init_protocol_stack(void) + { + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + int32_t ret; + +- ret = sem_init(&stack_group->thread_inited, 0, 0); ++ stack_group->stack_num = get_global_cfg_params()->num_cpu; ++ stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false; ++ ++ if (!use_ltran()) { ++ ret = sem_init(&stack_group->ethdev_init, 0, 0); ++ if (ret < 0) { ++ LSTACK_LOG(ERR, PORT, "sem_init failed\n"); ++ return -1; ++ } ++ } ++ ++ ret = sem_init(&stack_group->thread_phase1, 0, 0); + if (ret < 0) { + LSTACK_LOG(ERR, PORT, "sem_init failed\n"); + return -1; + } + + for (uint32_t i = 0; i < stack_group->stack_num; i++) { +- ret = create_thread(stack_group->stacks[i], "gazellestack", gazelle_stack_thread); ++ ret = create_thread(i, "gazellestack", gazelle_stack_thread); + if (ret != 0) { + return ret; + } +@@ -406,14 +388,12 @@ int32_t create_stack_thread(void) + + int32_t thread_inited_num; + do { +- sem_getvalue(&stack_group->thread_inited, &thread_inited_num); ++ sem_getvalue(&stack_group->thread_phase1, &thread_inited_num); + } while (thread_inited_num < stack_group->stack_num); + +- for (uint32_t i = 0; i < stack_group->stack_num; i++) { +- ret = create_thread(stack_group->stacks[i], "gazelleweakup", gazelle_weakup_thread); +- if (ret != 0) { +- return ret; +- } ++ ret = init_stack_numa_cpuset(); ++ if (ret < 0) { ++ return -1; + } + + return 0; +@@ -440,7 +420,6 @@ static inline bool is_real_close(int32_t fd) + + /* last sock */ + if (list_is_empty(&sock->attach_list)) { +- list_del_node_init(&sock->attach_list); + return true; + } + +@@ -557,24 +536,6 @@ void stack_listen(struct rpc_msg *msg) + } + } + +-static bool have_accept_event(int32_t fd) +-{ +- do { +- struct lwip_sock *sock = get_socket(fd); +- if (sock == NULL) { +- break; +- } +- +- if (NETCONN_IS_ACCEPTIN(sock)) { +- return true; +- } +- +- fd = sock->nextfd; +- } while (fd > 0); +- +- return false; +-} +- + void stack_accept(struct rpc_msg *msg) + { + int32_t fd = msg->args[MSG_ARG_0].i; +@@ -593,7 +554,7 @@ void stack_accept(struct rpc_msg *msg) + } + + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d attach_fd %d failed %d\n", get_stack_tid(), msg->args[MSG_ARG_0].i, +- fd, accept_fd); ++ fd, accept_fd); + msg->result = -1; + } + +@@ -768,13 +729,11 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) + int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen) + { + struct lwip_sock *sock = get_socket(fd); +- if (sock == NULL) { ++ if (sock == NULL || sock->attach_fd < 0) { + errno = EINVAL; + return -1; + } +- + fd = sock->attach_fd; +- int32_t head_fd = fd; + + struct lwip_sock *min_sock = NULL; + int32_t min_fd = fd; +@@ -783,15 +742,19 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add + if (sock == NULL) { + GAZELLE_RETURN(EINVAL); + } ++ struct lwip_sock *attach_sock = get_socket(sock->attach_fd); ++ if (attach_sock == NULL) { ++ GAZELLE_RETURN(EINVAL); ++ } + +- if (!NETCONN_IS_ACCEPTIN(sock)) { ++ if (!NETCONN_IS_ACCEPTIN(attach_sock)) { + fd = sock->nextfd; + continue; + } + +- if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) { +- min_sock = sock; +- min_fd = fd; ++ if (min_sock == NULL || min_sock->stack->conn_num > attach_sock->stack->conn_num) { ++ min_sock = attach_sock; ++ min_fd = sock->attach_fd; + } + + fd = sock->nextfd; +@@ -802,13 +765,7 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add + ret = rpc_call_accept(min_fd, addr, addrlen); + } + +- if (have_accept_event(head_fd)) { +- add_self_event(sock, EPOLLIN); +- sock = get_socket(head_fd); +- sock->stack->stats.accept_events++; +- } +- +- if(ret < 0) { ++ if (ret < 0) { + errno = EAGAIN; + } + return ret; +diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c +index 1813906..743857f 100644 +--- a/src/lstack/core/lstack_stack_stat.c ++++ b/src/lstack/core/lstack_stack_stat.c +@@ -105,8 +105,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ + lstack_get_low_power_info(&dfx->low_power_info); + memcpy_s(&dfx->data.pkts, sizeof(dfx->data.pkts), &stack->stats, sizeof(dfx->data.pkts)); + dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail; +- dfx->data.pkts.weakup_ring_cnt = rte_ring_count(stack->weakup_ring); +- dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring); + + int32_t rpc_call_result = rpc_call_msgcnt(stack); + dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; +@@ -120,10 +118,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ + rpc_call_result = rpc_call_sendlistcnt(stack); + dfx->data.pkts.send_list = (rpc_call_result < 0) ? 0 : rpc_call_result; + +- if (stack->wakeup_list) { +- rpc_call_result = rpc_call_eventlistcnt(stack); +- dfx->data.pkts.wakeup_list = (rpc_call_result < 0) ? 0 : rpc_call_result; +- } + dfx->data.pkts.conn_num = stack->conn_num; + } + +diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c +index 2a67333..26725f7 100644 +--- a/src/lstack/core/lstack_thread_rpc.c ++++ b/src/lstack/core/lstack_thread_rpc.c +@@ -19,9 +19,10 @@ + #include "lstack_protocol_stack.h" + #include "lstack_control_plane.h" + #include "gazelle_base_func.h" ++#include "lstack_dpdk.h" + #include "lstack_thread_rpc.h" + +-#define HANDLE_RPC_MSG_MAX (8) ++static PER_THREAD struct rte_mempool *rpc_pool = NULL; + + static inline __attribute__((always_inline)) + struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) +@@ -33,11 +34,20 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) + return NULL; + } + +- ret = rte_mempool_get(stack->rpc_pool, (void **)&msg); ++ static uint16_t pool_index = 0; ++ if (rpc_pool == NULL) { ++ rpc_pool = create_rpc_mempool("rpc_msg", pool_index++); ++ if (rpc_pool == NULL) { ++ return NULL; ++ } ++ } ++ ++ ret = rte_mempool_get(rpc_pool, (void **)&msg); + if (ret < 0) { + get_protocol_stack_group()->call_alloc_fail++; + return NULL; + } ++ msg->pool = rpc_pool; + + pthread_spin_init(&msg->lock, PTHREAD_PROCESS_SHARED); + msg->func = func; +@@ -47,13 +57,13 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) + } + + static inline __attribute__((always_inline)) +-void rpc_msg_free(struct rte_mempool *pool, struct rpc_msg *msg) ++void rpc_msg_free(struct rpc_msg *msg) + { + pthread_spin_destroy(&msg->lock); + + msg->self_release = 0; + msg->func = NULL; +- rte_mempool_put(pool, (void *)msg); ++ rte_mempool_put(msg->pool, (void *)msg); + } + + static inline __attribute__((always_inline)) +@@ -64,7 +74,7 @@ void rpc_call(lockless_queue *queue, struct rpc_msg *msg) + } + + static inline __attribute__((always_inline)) +-int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rpc_msg *msg) ++int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg) + { + int32_t ret; + +@@ -74,20 +84,20 @@ int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rp + pthread_spin_lock(&msg->lock); + + ret = msg->result; +- rpc_msg_free(pool, msg); ++ rpc_msg_free(msg); + return ret; + } + +-void poll_rpc_msg(struct protocol_stack *stack) ++void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num) + { +- int32_t num; ++ uint32_t num; + struct rpc_msg *msg = NULL; + + num = 0; +- while (num++ < HANDLE_RPC_MSG_MAX) { ++ while (num++ < max_num) { + lockless_queue_node *node = lockless_queue_mpsc_pop(&stack->rpc_queue); + if (node == NULL) { +- return; ++ break; + } + + msg = container_of(node, struct rpc_msg, queue_node); +@@ -103,7 +113,7 @@ void poll_rpc_msg(struct protocol_stack *stack) + if (msg->self_release) { + pthread_spin_unlock(&msg->lock); + } else { +- rpc_msg_free(stack->rpc_pool, msg); ++ rpc_msg_free(msg); + } + } + } +@@ -118,7 +128,7 @@ int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint3 + msg->args[MSG_ARG_0].p = conn_table; + msg->args[MSG_ARG_1].u = max_conn; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_connnum(struct protocol_stack *stack) +@@ -128,7 +138,7 @@ int32_t rpc_call_connnum(struct protocol_stack *stack) + return -1; + } + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) +@@ -142,7 +152,7 @@ int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struc + msg->args[MSG_ARG_1].cp = addr; + msg->args[MSG_ARG_2].socklen = addrlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + static void rpc_msgcnt(struct rpc_msg *msg) +@@ -158,7 +168,7 @@ int32_t rpc_call_msgcnt(struct protocol_stack *stack) + return -1; + } + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn) +@@ -168,7 +178,7 @@ int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn) + return -1; + } + msg->args[MSG_ARG_0].p = conn; +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) +@@ -178,17 +188,7 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) + return -1; + } + msg->args[MSG_ARG_0].p = conn; +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); +-} +- +-int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack) +-{ +- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_wakeuplist_count); +- if (msg == NULL) { +- return -1; +- } +- +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_eventlistcnt(struct protocol_stack *stack) +@@ -198,7 +198,7 @@ int32_t rpc_call_eventlistcnt(struct protocol_stack *stack) + return -1; + } + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_sendlistcnt(struct protocol_stack *stack) +@@ -208,7 +208,7 @@ int32_t rpc_call_sendlistcnt(struct protocol_stack *stack) + return -1; + } + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) +@@ -218,7 +218,7 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) + return -1; + } + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + void add_epoll_event(struct netconn *conn, uint32_t event); +@@ -243,24 +243,6 @@ void rpc_call_addevent(struct protocol_stack *stack, void *sock) + rpc_call(&stack->rpc_queue, msg); + } + +-static void rpc_replenish_idlembuf(struct rpc_msg *msg) +-{ +- struct protocol_stack *stack = get_protocol_stack(); +- stack_replenish_send_idlembuf(stack); +- stack->in_replenish = 0; +-} +- +-void rpc_call_replenish_idlembuf(struct protocol_stack *stack) +-{ +- struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_replenish_idlembuf); +- if (msg == NULL) { +- return; +- } +- +- msg->self_release = 0; +- rpc_call(&stack->rpc_queue, msg); +-} +- + int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf) + { + struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp); +@@ -287,7 +269,7 @@ int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol) + msg->args[MSG_ARG_1].i = type; + msg->args[MSG_ARG_2].i = protocol; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_close(int fd) +@@ -300,7 +282,7 @@ int32_t rpc_call_close(int fd) + + msg->args[MSG_ARG_0].i = fd; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen) +@@ -315,7 +297,7 @@ int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen + msg->args[MSG_ARG_1].cp = addr; + msg->args[MSG_ARG_2].socklen = addrlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_listen(int s, int backlog) +@@ -329,7 +311,7 @@ int32_t rpc_call_listen(int s, int backlog) + msg->args[MSG_ARG_0].i = s; + msg->args[MSG_ARG_1].i = backlog; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen) +@@ -344,7 +326,7 @@ int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen) + msg->args[MSG_ARG_1].p = addr; + msg->args[MSG_ARG_2].p = addrlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) +@@ -359,7 +341,7 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) + msg->args[MSG_ARG_1].cp = addr; + msg->args[MSG_ARG_2].socklen = addrlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) +@@ -374,7 +356,7 @@ int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) + msg->args[MSG_ARG_1].p = addr; + msg->args[MSG_ARG_2].p = addrlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) +@@ -389,7 +371,7 @@ int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) + msg->args[MSG_ARG_1].p = addr; + msg->args[MSG_ARG_2].p = addrlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen) +@@ -406,7 +388,7 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle + msg->args[MSG_ARG_3].p = optval; + msg->args[MSG_ARG_4].p = optlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) +@@ -423,7 +405,7 @@ int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, + msg->args[MSG_ARG_3].cp = optval; + msg->args[MSG_ARG_4].socklen = optlen; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_fcntl(int fd, int cmd, long val) +@@ -438,7 +420,7 @@ int32_t rpc_call_fcntl(int fd, int cmd, long val) + msg->args[MSG_ARG_1].i = cmd; + msg->args[MSG_ARG_2].l = val; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_ioctl(int fd, long cmd, void *argp) +@@ -453,7 +435,7 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp) + msg->args[MSG_ARG_1].l = cmd; + msg->args[MSG_ARG_2].p = argp; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags) +@@ -486,7 +468,7 @@ int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags) + msg->args[MSG_ARG_1].cp = msghdr; + msg->args[MSG_ARG_2].i = flags; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } + + int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags) +@@ -501,5 +483,5 @@ int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags) + msg->args[MSG_ARG_1].p = msghdr; + msg->args[MSG_ARG_2].i = flags; + +- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); ++ return rpc_sync_call(&stack->rpc_queue, msg); + } +diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h +index 48b7e44..345a373 100644 +--- a/src/lstack/include/lstack_cfg.h ++++ b/src/lstack/include/lstack_cfg.h +@@ -66,7 +66,7 @@ struct cfg_params { + uint16_t num_cpu; + uint32_t cpus[CFG_MAX_CPUS]; + uint16_t num_wakeup; +- uint32_t weakup[CFG_MAX_CPUS]; ++ uint32_t wakeup[CFG_MAX_CPUS]; + uint8_t num_ports; + uint16_t ports[CFG_MAX_PORTS]; + char log_file[PATH_MAX]; +diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h +index e76a9a6..e8080e1 100644 +--- a/src/lstack/include/lstack_dpdk.h ++++ b/src/lstack/include/lstack_dpdk.h +@@ -37,10 +37,10 @@ struct protocol_stack; + + #define MBUF_SZ (MAX_PACKET_SZ + RTE_PKTMBUF_HEADROOM) + +-#define MAX_CORE_NUM 256 ++#define MAX_CORE_NUM 256 + #define CALL_MSG_RING_SIZE (unsigned long long)32 +-#define CALL_CACHE_SZ 64 +-#define CALL_POOL_SZ ((VDEV_CALL_QUEUE_SZ << 1) + (2 * CALL_CACHE_SZ * MAX_CORE_NUM)) ++#define CALL_CACHE_SZ 0 ++#define CALL_POOL_SZ 128 + + /* Layout: + * | rte_mbuf | pbuf | custom_free_function | payload | +@@ -62,6 +62,7 @@ void dpdk_eal_init(void); + int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num); + struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id); + int32_t create_shared_ring(struct protocol_stack *stack); ++struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id); + void lstack_log_level_init(void); + int dpdk_ethdev_init(void); + int dpdk_ethdev_start(void); +diff --git a/src/lstack/include/lstack_log.h b/src/lstack/include/lstack_log.h +index 383495d..8b4209a 100644 +--- a/src/lstack/include/lstack_log.h ++++ b/src/lstack/include/lstack_log.h +@@ -15,17 +15,14 @@ + + #include + #include +- + #include + +-#include "lwipopts.h" +- +-#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1 ++#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1 + #define LSTACK_EXIT(a, fmt, ...) rte_exit(a, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__) + #define LSTACK_LOG(a, b, fmt, ...) (void)RTE_LOG(a, b, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__) + +-#define LSTACK_INFO LOG_INFO +-#define LSTACK_ERR LOG_ERR ++#define LSTACK_INFO LOG_INFO ++#define LSTACK_ERR LOG_ERR + + /* before rte_eal_init */ + #define LSTACK_PRE_LOG(level, fmt, ...) \ +diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h +index ffd3b80..c73e3a7 100644 +--- a/src/lstack/include/lstack_lwip.h ++++ b/src/lstack/include/lstack_lwip.h +@@ -21,32 +21,31 @@ + + #define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox)) + #define NETCONN_IS_DATAIN(sock) ((rte_ring_count((sock)->recv_ring) || (sock)->recv_lastdata)) +-#define NETCONN_IS_DATAOUT(sock) rte_ring_free_count((sock)->send_ring) ++#define NETCONN_IS_DATAOUT(sock) ((rte_ring_count((sock)->send_ring) || (sock)->send_lastdata)) ++#define NETCONN_IS_OUTIDLE(sock) rte_ring_free_count((sock)->send_ring) + + void create_shadow_fd(struct rpc_msg *msg); + void listen_list_add_node(int32_t head_fd, int32_t add_fd); + void gazelle_init_sock(int32_t fd); + int32_t gazelle_socket(int domain, int type, int protocol); + void gazelle_clean_sock(int32_t fd); +-ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags); ++struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags); + ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len); + ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags); + ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags); +-void read_recv_list(void); ++void read_recv_list(struct protocol_stack *stack, uint32_t max_num); ++void send_stack_list(struct protocol_stack *stack, uint32_t send_max); + void add_recv_list(int32_t fd); + void stack_sendlist_count(struct rpc_msg *msg); + void stack_eventlist_count(struct rpc_msg *msg); +-void stack_wakeuplist_count(struct rpc_msg *msg); + void get_lwip_conntable(struct rpc_msg *msg); + void get_lwip_connnum(struct rpc_msg *msg); + void stack_recvlist_count(struct rpc_msg *msg); + void stack_send(struct rpc_msg *msg); +-void stack_replenish_send_idlembuf(struct protocol_stack *stack); + int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num); + void gazelle_free_pbuf(struct pbuf *pbuf); + ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags); + ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags); + ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags); +-void add_self_event(struct lwip_sock *sock, uint32_t events); + + #endif +diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h +index 39052e1..9753385 100644 +--- a/src/lstack/include/lstack_protocol_stack.h ++++ b/src/lstack/include/lstack_protocol_stack.h +@@ -14,6 +14,7 @@ + #define __GAZELLE_PROTOCOL_STACK_H__ + + #include ++#include + #include + #include + #include "dpdk_common.h" +@@ -28,38 +29,32 @@ struct protocol_stack { + uint16_t socket_id; + uint16_t cpu_id; + volatile uint16_t conn_num; +- volatile bool in_replenish; +- +- // for dispatcher thread +- cpu_set_t idle_cpuset; ++ cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */ + + lockless_queue rpc_queue; +- struct rte_ring *weakup_ring; +- + struct rte_mempool *rx_pktmbuf_pool; + struct rte_mempool *tx_pktmbuf_pool; +- struct rte_mempool *rpc_pool; + struct rte_ring *rx_ring; + struct rte_ring *tx_ring; + struct rte_ring *reg_ring; +- struct rte_ring *send_idle_ring; ++ struct rte_ring *wakeup_ring; ++ + struct reg_ring_msg *reg_buf; + + struct netif netif; + uint32_t rx_ring_used; + uint32_t tx_ring_used; ++ struct eth_dev_ops *dev_ops; + + struct list_node recv_list; + struct list_node listen_list; +- struct list_node event_list; + struct list_node send_list; +- struct list_node *wakeup_list; ++ struct list_node event_list; ++ pthread_spinlock_t event_lock; + + struct gazelle_stat_pkts stats; + struct gazelle_stack_latency latency; + struct stats_ *lwip_stats; +- +- struct eth_dev_ops *dev_ops; + }; + + struct eth_params; +@@ -67,25 +62,35 @@ struct eth_params; + struct protocol_stack_group { + uint16_t stack_num; + uint16_t port_id; +- sem_t thread_inited; ++ sem_t thread_phase1; ++ sem_t ethdev_init; + struct rte_mempool *kni_pktmbuf_pool; + struct eth_params *eth_params; + struct protocol_stack *stacks[PROTOCOL_STACK_MAX]; ++ bool wakeup_enable; + + /* dfx stats */ + bool latency_start; + uint64_t call_alloc_fail; + }; + ++struct wakeup_poll { ++ bool init; ++ struct protocol_stack *bind_stack; ++ struct list_node event_list; /* epoll temp use poll */ ++ sem_t event_sem; ++}; ++ + long get_stack_tid(void); ++pthread_mutex_t *get_mem_mutex(void); + struct protocol_stack *get_protocol_stack(void); + struct protocol_stack *get_protocol_stack_by_fd(int32_t fd); + struct protocol_stack *get_minconn_protocol_stack(void); + struct protocol_stack_group *get_protocol_stack_group(void); + + int32_t init_protocol_stack(void); +-int32_t create_stack_thread(void); +-int bind_to_stack_numa(int stack_id); ++int32_t bind_to_stack_numa(struct protocol_stack *stack); ++int32_t init_dpdk_ethdev(void); + + /* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */ + void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack); +diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h +index 20539d9..61bcd38 100644 +--- a/src/lstack/include/lstack_thread_rpc.h ++++ b/src/lstack/include/lstack_thread_rpc.h +@@ -42,21 +42,20 @@ struct rpc_msg { + int32_t self_release; /* 0:msg handler release msg 1:msg sender release msg */ + int64_t result; /* func return val */ + lockless_queue_node queue_node; ++ struct rte_mempool *pool; + + rpc_msg_func func; /* msg handle func hook */ + union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */ + }; + + struct protocol_stack; +-void poll_rpc_msg(struct protocol_stack *stack); +-void rpc_call_replenish_idlembuf(struct protocol_stack *stack); ++void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num); + void rpc_call_addevent(struct protocol_stack *stack, void *sock); + int32_t rpc_call_msgcnt(struct protocol_stack *stack); + int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); + int32_t rpc_call_recvlistcnt(struct protocol_stack *stack); + int32_t rpc_call_eventlistcnt(struct protocol_stack *stack); + int32_t rpc_call_sendlistcnt(struct protocol_stack *stack); +-int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack); + int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn); + int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn); + int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn); +diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h +index 916b1e2..31a997d 100644 +--- a/src/lstack/include/lstack_vdev.h ++++ b/src/lstack/include/lstack_vdev.h +@@ -23,7 +23,7 @@ + #define VDEV_EVENT_QUEUE_SZ (DEFAULT_RING_SIZE) + #define VDEV_REG_QUEUE_SZ (DEFAULT_RING_SIZE) + #define VDEV_CALL_QUEUE_SZ (DEFAULT_RING_SIZE) +-#define VDEV_WEAKUP_QUEUE_SZ (DEFAULT_RING_SIZE) ++#define VDEV_WAKEUP_QUEUE_SZ (DEFAULT_RING_SIZE) + #define VDEV_IDLE_QUEUE_SZ (DEFAULT_RING_SIZE) + + #define VDEV_TX_QUEUE_SZ (DEFAULT_RING_SIZE) +diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h +index 2b3cff4..cac640b 100644 +--- a/src/lstack/include/posix/lstack_epoll.h ++++ b/src/lstack/include/posix/lstack_epoll.h +@@ -17,6 +17,8 @@ + extern "C" { + #endif + ++#include ++ + int32_t lstack_epoll_create(int32_t size); + int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event); + int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event *events, int32_t maxevents, int32_t timeout); +diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf +index fdca602..696dfb9 100644 +--- a/src/lstack/lstack.conf ++++ b/src/lstack/lstack.conf +@@ -16,7 +16,6 @@ kni_switch=0 + low_power_mode=0 + + num_cpus="2" +-num_wakeup="3" + + host_addr="192.168.1.10" + mask_addr="255.255.255.0" +diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c +index 8b2193f..ae39403 100644 +--- a/src/lstack/netif/lstack_ethdev.c ++++ b/src/lstack/netif/lstack_ethdev.c +@@ -51,6 +51,9 @@ void eth_dev_recv(struct rte_mbuf *mbuf) + stack->stats.rx_allocmbuf_fail++; + break; + } ++#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW ++ next->ol_flags = m->ol_flags; ++#endif + + if (head == NULL) { + head = next; +@@ -73,18 +76,19 @@ void eth_dev_recv(struct rte_mbuf *mbuf) + } + } + ++#define READ_PKTS_MAX 32 + int32_t eth_dev_poll(void) + { + uint32_t nr_pkts; +- struct rte_mbuf *pkts[DPDK_PKT_BURST_SIZE]; ++ struct rte_mbuf *pkts[READ_PKTS_MAX]; + struct protocol_stack *stack = get_protocol_stack(); + +- nr_pkts = stack->dev_ops->rx_poll(stack, pkts, DPDK_PKT_BURST_SIZE); ++ nr_pkts = stack->dev_ops->rx_poll(stack, pkts, READ_PKTS_MAX); + if (nr_pkts == 0) { + return nr_pkts; + } + +- if (get_protocol_stack_group()->latency_start) { ++ if (!use_ltran() && get_protocol_stack_group()->latency_start) { + uint64_t time_stamp = get_current_time(); + time_stamp_into_mbuf(nr_pkts, pkts, time_stamp); + } +@@ -146,19 +150,6 @@ static err_t eth_dev_output(struct netif *netif, struct pbuf *pbuf) + return ERR_OK; + } + +-static err_t eth_dev_input(struct pbuf *p, struct netif *netif) +-{ +- err_t ret = ethernet_input(p, netif); +- if (ret != ERR_OK) { +- return ret; +- } +- +- if (get_protocol_stack_group()->latency_start) { +- calculate_lstack_latency(&get_protocol_stack()->latency, p, GAZELLE_LATENCY_LWIP); +- } +- return ret; +-} +- + static err_t eth_dev_init(struct netif *netif) + { + struct cfg_params *cfg = get_global_cfg_params(); +@@ -200,7 +191,7 @@ int32_t ethdev_init(struct protocol_stack *stack) + netif_set_default(&stack->netif); + + struct netif *netif = netif_add(&stack->netif, &cfg->host_addr, &cfg->netmask, &cfg->gateway_addr, NULL, +- eth_dev_init, eth_dev_input); ++ eth_dev_init, ethernet_input); + if (netif == NULL) { + LSTACK_LOG(ERR, LSTACK, "netif_add failed\n"); + return ERR_IF; +diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c +index 57d3bce..5a4e86a 100644 +--- a/src/lstack/netif/lstack_vdev.c ++++ b/src/lstack/netif/lstack_vdev.c +@@ -91,7 +91,7 @@ static uint32_t vdev_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkt + uint32_t sent_pkts = 0; + + do { +- sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts); ++ sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts - sent_pkts); + } while (sent_pkts < nr_pkts); + + return sent_pkts; +diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c +index 8db5791..8d71966 100644 +--- a/src/ltran/ltran_dfx.c ++++ b/src/ltran/ltran_dfx.c +@@ -561,27 +561,16 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) + printf("app_write_drop: %-13"PRIu64" ", lstack_stat->data.pkts.app_write_drop); + printf("write_lwip_drop: %-12"PRIu64" ", lstack_stat->data.pkts.write_lwip_drop); + printf("app_write_idlebuf: %-10"PRIu16" \n", lstack_stat->data.pkts.send_idle_ring_cnt); ++ printf("event_list: %-17"PRIu64" ", lstack_stat->data.pkts.event_list); + printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list); +- printf("weakup_ring_cnt: %-12"PRIu16" ", lstack_stat->data.pkts.weakup_ring_cnt); + printf("conn_num: %-19"PRIu16" \n", lstack_stat->data.pkts.conn_num); +- printf("weakup_events: %-14"PRIu64" ", lstack_stat->data.pkts.weakup_events); +- printf("lwip_events: %-16"PRIu64" ", lstack_stat->data.pkts.lwip_events); +- printf("app_events: %-17"PRIu64"\n", lstack_stat->data.pkts.app_events); +- printf("epoll_pending: %-14"PRIu64" ", lstack_stat->data.pkts.epoll_pending); +- printf("epoll_self_event: %-11"PRIu64" ", lstack_stat->data.pkts.epoll_self_event); +- printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event); +- printf("read_events: %-16"PRIu64" ", lstack_stat->data.pkts.read_events); +- printf("write_events: %-15"PRIu64" ", lstack_stat->data.pkts.write_events); +- printf("accept_events: %-14"PRIu64" \n", lstack_stat->data.pkts.accept_events); +- printf("read_null: %-18"PRIu64" ", lstack_stat->data.pkts.read_null); +- printf("wakeup_list: %-16"PRIu64" ", lstack_stat->data.pkts.wakeup_list); +- printf("event_list: %-17"PRIu64" \n", lstack_stat->data.pkts.event_list); +- printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc); +- printf("epoll_pending_call: %-9"PRIu64" ", lstack_stat->data.pkts.epoll_pending_call); +- printf("epoll_self_call: %-12"PRIu64" \n", lstack_stat->data.pkts.epoll_self_call); ++ printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.wakeup_events); ++ printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.app_events); ++ printf("read_null: %-18"PRIu64" \n", lstack_stat->data.pkts.read_null); + printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt); + printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail); + printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null); ++ printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc); + printf("send_list: %-18"PRIu64" \n", lstack_stat->data.pkts.send_list); + } + +@@ -884,7 +873,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ + printf("Active Internet connections (servers and established)\n"); + do { + printf("\n------ stack tid: %6u ------\n", stat->tid); +- printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address" ++ printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt Local Address " + " Foreign Address State\n"); + uint32_t unread_pkts = 0; + uint32_t unsend_pkts = 0; +@@ -894,13 +883,13 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ + rip.s_addr = conn_info->rip; + lip.s_addr = conn_info->lip; + if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) { +- printf("%-6utcp %-10u%-11u%-9u%-11u%-7u%-12u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt, +- conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->event_ring_cnt, +- conn_info->self_ring_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, ++ printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt, ++ conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->sem_cnt, ++ inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, + inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port, + tcp_state_to_str(conn_info->tcp_sub_state)); + } else if (conn_info->state == GAZELLE_LISTEN_LIST) { +- printf("%-6utcp %-60u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt, ++ printf("%-6utcp %-50u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt, + inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port); + } else { + printf("Got unknow tcp conn::%s:%5hu, state:%u\n", +diff --git a/src/ltran/ltran_opt.h b/src/ltran/ltran_opt.h +index e4e085d..1117898 100644 +--- a/src/ltran/ltran_opt.h ++++ b/src/ltran/ltran_opt.h +@@ -34,12 +34,12 @@ + #define GAZELLE_KNI_ETHERNET_HEADER_SIZE 14 + #define GAZELLE_KNI_ETHERNET_FCS_SIZE 4 + +-#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%d" +-#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%d" ++#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%u" ++#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%u" + #define GAZELLE_PKT_MBUF_POOL_NAME_LENGTH 64 + + #define GAZELLE_BOND_NAME_LENGTH 64 +-#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%d" ++#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%hu" + #define GAZELLE_BOND_QUEUE_MIN 1 + #define GAZELLE_BOND_QUEUE_MAX 64 + +-- +1.8.3.1 + diff --git a/gazelle.spec b/gazelle.spec index a9d1961..af83209 100644 --- a/gazelle.spec +++ b/gazelle.spec @@ -2,7 +2,7 @@ Name: gazelle Version: 1.0.1 -Release: 5 +Release: 6 Summary: gazelle is a high performance user-mode stack License: Mulan PSL v2 URL: https://gitee.com/openeuler/gazelle @@ -51,6 +51,8 @@ Patch9033: 0033-fix-accept-check-remain-conn.patch Patch9034: 0034-fix-wakeup-list-dead-loop.patch Patch9035: 0035-add-check-for-stack-params.patch Patch9036: 0036-the-sending-of-sock-last-data-is-triggered-by-lstack.patch +Patch9037: 0037-add-gazellectl-lstack-constraint.patch +Patch9038: 0038-refactor-event.patch %description %{name} is a high performance user-mode stack. @@ -91,6 +93,10 @@ install -Dpm 0640 %{_builddir}/%{name}-%{version}/src/ltran/ltran.conf %{b %config(noreplace) %{conf_path}/ltran.conf %changelog +* Tue Mar 29 2022 jiangheng - 1.0.1-6 +- refactor event +- add gazellectl lstack constraint + * Fri Mar 18 2022 jiangheng - 1.0.1-5 - limit lwip_alloc_pbuf size to TCP_MSS - sending of sock last data is triggered by lstack iteself