From a74d5b38b2021397d13b13aaa30f41f69be6f475 Mon Sep 17 00:00:00 2001 From: wuchangsheng Date: Thu, 21 Apr 2022 17:21:59 +0800 Subject: [PATCH 09/18] refactor kernel event poll/epoll --- src/lstack/api/lstack_epoll.c | 343 +++++++++++++++------ src/lstack/api/lstack_wrap.c | 21 +- src/lstack/core/lstack_dpdk.c | 4 +- src/lstack/core/lstack_init.c | 2 +- src/lstack/core/lstack_lwip.c | 1 + src/lstack/core/lstack_protocol_stack.c | 85 ++++- src/lstack/include/lstack_cfg.h | 1 - src/lstack/include/lstack_protocol_stack.h | 8 +- src/lstack/include/posix/lstack_epoll.h | 24 ++ 9 files changed, 350 insertions(+), 139 deletions(-) diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c index b8d53f6..cba67ea 100644 --- a/src/lstack/api/lstack_epoll.c +++ b/src/lstack/api/lstack_epoll.c @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -32,10 +33,14 @@ #include "gazelle_base_func.h" #include "lstack_lwip.h" #include "lstack_protocol_stack.h" +#include "posix/lstack_epoll.h" #define EPOLL_KERNEL_INTERVAL 10 /* ms */ -#define EPOLL_NSEC_TO_SEC 1000000000 +#define SEC_TO_NSEC 1000000000 +#define SEC_TO_MSEC 1000 +#define MSEC_TO_NSEC 1000000 #define EPOLL_MAX_EVENTS 512 +#define POLL_KERNEL_EVENTS 32 static PER_THREAD struct wakeup_poll g_wakeup_poll = {0}; static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */ @@ -149,12 +154,12 @@ int32_t lstack_epoll_create(int32_t size) posix_api->close_fn(fd); GAZELLE_RETURN(EINVAL); } - memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll)); - sem_init(&wakeup->event_sem, 0, 0); - sock->wakeup = wakeup; init_list_node(&wakeup->event_list); + wakeup->epollfd = fd; + sem_init(&wakeup->event_sem, 0, 0); + sock->wakeup = wakeup; g_use_epoll = true; return fd; @@ -162,6 +167,8 @@ int32_t lstack_epoll_create(int32_t size) int32_t lstack_epoll_close(int32_t fd) { + posix_api->close_fn(fd); + struct lwip_sock *sock = get_socket_by_fd(fd); if (sock == NULL) { LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno); @@ -176,6 +183,43 @@ int32_t lstack_epoll_close(int32_t fd) return 0; } +static uint16_t find_max_cnt_stack(int32_t *stack_count, uint16_t stack_num, struct protocol_stack *last_stack) +{ + uint16_t max_index = 0; + bool all_same_cnt = true; + + for (uint16_t i = 1; i < stack_num; i++) { + if (stack_count[i] != stack_count[0]) { + all_same_cnt = false; + } + + if (stack_count[i] > stack_count[max_index]) { + max_index = i; + } + } + + /* all stack same, don't change */ + if (all_same_cnt && last_stack) { + return last_stack->queue_id; + } + + /* first bind and all stack same. choice tick as queue_id, avoid all bind to statck_0.*/ + static uint16_t tick = 0; + if (all_same_cnt && stack_num) { + max_index = atomic_fetch_add(&tick, 1) % stack_num; + } + + return max_index; +} + +static void update_epoll_max_stack(struct wakeup_poll *wakeup) +{ + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + uint16_t bind_id = find_max_cnt_stack(wakeup->stack_fd_cnt, stack_group->stack_num, wakeup->max_stack); + + wakeup->max_stack = stack_group->stacks[bind_id]; +} + int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event) { LSTACK_LOG(DEBUG, LSTACK, "op=%d events: fd: %d\n", op, fd); @@ -185,35 +229,38 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even GAZELLE_RETURN(EINVAL); } + struct lwip_sock *epoll_sock = get_socket_by_fd(epfd); + if (epoll_sock == NULL || epoll_sock->wakeup == NULL) { + return posix_api->epoll_ctl_fn(epfd, op, fd, event); + } + struct lwip_sock *sock = get_socket(fd); if (sock == NULL) { + epoll_sock->wakeup->have_kernel_fd = true; return posix_api->epoll_ctl_fn(epfd, op, fd, event); } if (CONN_TYPE_HAS_HOST(sock->conn)) { + epoll_sock->wakeup->have_kernel_fd = true; int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event); if (ret < 0) { return ret; } } - struct lwip_sock *epoll_sock = get_socket_by_fd(epfd); - if (epoll_sock == NULL || epoll_sock->wakeup == NULL) { - LSTACK_LOG(ERR, LSTACK, "epfd=%d\n", fd); - GAZELLE_RETURN(EINVAL); - } - - uint32_t events = event->events | EPOLLERR | EPOLLHUP; do { switch (op) { case EPOLL_CTL_ADD: sock->wakeup = epoll_sock->wakeup; + if (sock->stack) { + epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]++; + } if (list_is_empty(&sock->event_list)) { list_add_node(&sock->wakeup->event_list, &sock->event_list); } /* fall through */ case EPOLL_CTL_MOD: - sock->epoll_events = events; + sock->epoll_events = event->events | EPOLLERR | EPOLLHUP; sock->ep_data = event->data; if (sock->conn && NETCONNTYPE_GROUP(netconn_type(sock->conn)) == NETCONN_TCP) { raise_pending_events(sock); @@ -222,6 +269,9 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even case EPOLL_CTL_DEL: list_del_node_init(&sock->event_list); sock->epoll_events = 0; + if (sock->stack) { + epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]--; + } break; default: GAZELLE_RETURN(EINVAL); @@ -230,6 +280,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even sock = get_socket(fd); } while (fd > 0 && sock != NULL); + update_epoll_max_stack(epoll_sock->wakeup); return 0; } @@ -346,129 +397,196 @@ static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds) return event_num; } -static inline bool have_kernel_fd(int32_t epfd, struct pollfd *fds, nfds_t nfds) +static void ms_to_timespec(struct timespec *timespec, int32_t timeout) { - /* when epfd > 0 is epoll type */ - for (uint32_t i = 0; i < nfds && epfd < 0; i++) { - if (get_socket(fds[i].fd) == NULL) { - return true; + clock_gettime(CLOCK_REALTIME, timespec); + timespec->tv_sec += timeout / SEC_TO_MSEC; + timespec->tv_nsec += (timeout % SEC_TO_MSEC) * MSEC_TO_NSEC; + timespec->tv_sec += timespec->tv_nsec / SEC_TO_NSEC; + timespec->tv_nsec = timespec->tv_nsec % SEC_TO_NSEC; +} + +static void change_epollfd_kernel_thread(struct wakeup_poll *wakeup, struct protocol_stack *old_stack, + struct protocol_stack *new_stack) +{ + if (old_stack) { + if (posix_api->epoll_ctl_fn(old_stack->epollfd, EPOLL_CTL_DEL, wakeup->epollfd, NULL) != 0) { + LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno); } } - return false; + /* avoid kernel thread post too much, use EPOLLET */ + struct epoll_event event; + event.data.ptr = &wakeup->event_sem; + event.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLET; + if (posix_api->epoll_ctl_fn(new_stack->epollfd, EPOLL_CTL_ADD, wakeup->epollfd, &event) != 0) { + LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno); + } } -static inline int32_t poll_kernel_event(struct pollfd *fds, nfds_t nfds) +static void epoll_bind_statck(struct wakeup_poll *wakeup) { - int32_t event_num = 0; - - for (uint32_t i = 0; i < nfds; i++) { - /* lwip event */ - if (get_socket(fds[i].fd) != NULL || fds[i].fd < 0) { - continue; - } - - int32_t ret = posix_api->poll_fn(&fds[i], 1, 0); - if (ret < 0) { - if (errno != EINTR) { - return ret; - } - } else { - event_num += ret; - } + /* all fd is kernel, set rand stack */ + if (wakeup->bind_stack == NULL && wakeup->max_stack== NULL) { + update_epoll_max_stack(wakeup); } - return event_num; + if (wakeup->bind_stack != wakeup->max_stack && wakeup->max_stack) { + bind_to_stack_numa(wakeup->max_stack); + change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, wakeup->max_stack); + wakeup->bind_stack = wakeup->max_stack; + } } -static int32_t get_event(struct wakeup_poll *wakeup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout) +int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout) { - struct pollfd *fds = (struct pollfd *)out; - struct epoll_event *events = (struct epoll_event *)out; - bool have_kernel = have_kernel_fd(epfd, fds, maxevents); + struct lwip_sock *sock = get_socket_by_fd(epfd); + if (sock == NULL || sock->wakeup == NULL) { + return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); + } + int32_t event_num = 0; - int32_t poll_time = 0; int32_t ret; - /* when epfd > 0 is epoll type */ + struct timespec epoll_time; + if (timeout >= 0) { + ms_to_timespec(&epoll_time, timeout); + } + + epoll_bind_statck(sock->wakeup); + do { - event_num += (epfd > 0) ? epoll_lwip_event(wakeup, &events[event_num], maxevents - event_num) : - poll_lwip_event(fds, maxevents); - - if (have_kernel) { - int32_t event_kernel_num = (epfd > 0) ? - posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0) : - poll_kernel_event(fds, maxevents); - if (event_kernel_num < 0) { - return event_kernel_num; - } - event_num += event_kernel_num; - if (timeout >= 0 && poll_time >= timeout) { - break; - } - poll_time += EPOLL_KERNEL_INTERVAL; + event_num += epoll_lwip_event(sock->wakeup, &events[event_num], maxevents - event_num); + + if (sock->wakeup->have_kernel_fd) { + event_num += posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0); } if (event_num > 0) { break; } - int32_t interval = (have_kernel) ? EPOLL_KERNEL_INTERVAL : timeout; - struct timespec epoll_interval; - clock_gettime(CLOCK_REALTIME, &epoll_interval); - epoll_interval.tv_sec += interval / 1000; - epoll_interval.tv_nsec += (interval % 1000) * 1000000; - epoll_interval.tv_sec += epoll_interval.tv_nsec / 1000000000; - epoll_interval.tv_nsec = epoll_interval.tv_nsec % 1000000000; - - if (timeout < 0 && !have_kernel) { - ret = sem_wait(&wakeup->event_sem); + if (timeout < 0) { + ret = sem_wait(&sock->wakeup->event_sem); } else { - ret = sem_timedwait(&wakeup->event_sem, &epoll_interval); + ret = sem_timedwait(&sock->wakeup->event_sem, &epoll_time); } - - if (!have_kernel && ret < 0) { - break; - } - } while (event_num <= maxevents); + } while (ret == 0); return event_num; } -int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout) +static void init_poll_wakeup_data(struct wakeup_poll *wakeup) { - /* avoid the starvation of epoll events from both netstack */ - maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents); + sem_init(&wakeup->event_sem, 0, 0); - struct lwip_sock *sock = get_socket_by_fd(epfd); - if (sock == NULL) { - GAZELLE_RETURN(EINVAL); + wakeup->last_fds = calloc(POLL_KERNEL_EVENTS, sizeof(struct pollfd)); + if (wakeup->last_fds == NULL) { + LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno); } - if (sock->wakeup == NULL) { - return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); + wakeup->events = calloc(POLL_KERNEL_EVENTS, sizeof(struct epoll_event)); + if (wakeup->events == NULL) { + LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno); } - return get_event(sock->wakeup, epfd, events, maxevents, timeout); + wakeup->last_max_nfds = POLL_KERNEL_EVENTS; + + wakeup->epollfd = posix_api->epoll_create_fn(POLL_KERNEL_EVENTS); + if (wakeup->epollfd < 0) { + LSTACK_LOG(ERR, LSTACK, "epoll_create_fn errno=%d\n", errno); + } } -static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeup) +static void resize_kernel_poll(struct wakeup_poll *wakeup, nfds_t nfds) { - int32_t stack_count[PROTOCOL_STACK_MAX] = {0}; + wakeup->last_fds = realloc(wakeup->last_fds, nfds * sizeof(struct pollfd)); + if (wakeup->last_fds == NULL) { + LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno); + } + + wakeup->events = realloc(wakeup->events, nfds * sizeof(struct epoll_event)); + if (wakeup->events == NULL) { + LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno); + } + + wakeup->last_max_nfds = nfds; + memset_s(wakeup->last_fds, nfds * sizeof(struct pollfd), 0, nfds * sizeof(struct pollfd)); +} + +static void poll_bind_statck(struct wakeup_poll *wakeup, int32_t *stack_count) +{ + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + uint16_t bind_id = find_max_cnt_stack(stack_count, stack_group->stack_num, wakeup->bind_stack); + + if (wakeup->bind_stack && wakeup->bind_stack->queue_id == bind_id) { + return; + } + change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, stack_group->stacks[bind_id]); + bind_to_stack_numa(stack_group->stacks[bind_id]); + wakeup->bind_stack = stack_group->stacks[bind_id]; +} + +static void update_kernel_poll(struct wakeup_poll *wakeup, uint32_t index, struct pollfd *new_fd) +{ + posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_DEL, wakeup->last_fds[index].fd, NULL); + + if (new_fd == NULL) { + return; + } + + struct epoll_event event; + event.data.u32 = index; + event.events = new_fd->events; + if (posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_ADD, new_fd->fd, &event) != 0) { + LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno); + } + + wakeup->last_fds[index].fd = new_fd->fd; + wakeup->last_fds[index].events = new_fd->events; + + wakeup->have_kernel_fd = true; +} + +static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfds) +{ if (!wakeup->init) { wakeup->init = true; - sem_init(&wakeup->event_sem, 0, 0); + init_poll_wakeup_data(wakeup); } else { while (sem_trywait(&wakeup->event_sem) == 0) {} } + if (nfds > wakeup->last_max_nfds) { + resize_kernel_poll(wakeup, nfds); + } + + int32_t stack_count[PROTOCOL_STACK_MAX] = {0}; + int32_t poll_change = 0; + + /* poll fds num less, del old fd */ + for (uint32_t i = nfds; i < wakeup->last_nfds; i++) { + update_kernel_poll(wakeup, i, NULL); + poll_change = 1; + } + for (uint32_t i = 0; i < nfds; i++) { - int32_t fd = fds[i].fd; fds[i].revents = 0; + if (fds[i].fd == wakeup->last_fds[i].fd && fds[i].events == wakeup->last_fds[i].events) { + continue; + } + poll_change = 1; + + int32_t fd = fds[i].fd; + struct lwip_sock *sock = get_socket(fd); + if (sock == NULL || CONN_TYPE_HAS_HOST(sock->conn)) { + update_kernel_poll(wakeup, i, fds + i); + } + do { - struct lwip_sock *sock = get_socket(fd); + sock = get_socket(fd); if (sock == NULL || sock->conn == NULL) { break; } @@ -481,25 +599,50 @@ static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeu } while (fd > 0); } - if (wakeup->bind_stack) { + wakeup->last_nfds = nfds; + if (poll_change == 0) { return; } - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - uint32_t bind_id = 0; - for (uint32_t i = 0; i < stack_group->stack_num; i++) { - if (stack_count[i] > stack_count[bind_id]) { - bind_id = i; - } - } - - bind_to_stack_numa(stack_group->stacks[bind_id]); - wakeup->bind_stack = stack_group->stacks[bind_id]; + poll_bind_statck(wakeup, stack_count); } int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout) { - poll_init(fds, nfds, &g_wakeup_poll); + poll_init(&g_wakeup_poll, fds, nfds); - return get_event(&g_wakeup_poll, -1, fds, nfds, timeout); + int32_t event_num = 0; + int32_t ret; + + struct timespec poll_time; + if (timeout >= 0) { + ms_to_timespec(&poll_time, timeout); + } + + /* when epfd > 0 is epoll type */ + do { + event_num += poll_lwip_event(fds, nfds); + + /* reduce syscall epoll_wait */ + if (g_wakeup_poll.have_kernel_fd) { + int32_t kernel_num = posix_api->epoll_wait_fn(g_wakeup_poll.epollfd, g_wakeup_poll.events, nfds, 0); + for (int32_t i = 0; i < kernel_num; i++) { + uint32_t index = g_wakeup_poll.events[i].data.u32; + fds[index].revents = g_wakeup_poll.events[i].events; + } + event_num += kernel_num >= 0 ? kernel_num : 0; + } + + if (event_num > 0) { + break; + } + + if (timeout < 0) { + ret = sem_wait(&g_wakeup_poll.event_sem); + } else { + ret = sem_timedwait(&g_wakeup_poll.event_sem, &poll_time); + } + } while (ret == 0); + + return event_num; } diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c index f623da3..bf5dcb4 100644 --- a/src/lstack/api/lstack_wrap.c +++ b/src/lstack/api/lstack_wrap.c @@ -45,8 +45,7 @@ enum KERNEL_LWIP_PATH { static inline enum KERNEL_LWIP_PATH select_path(int fd) { if (posix_api == NULL) { - /* link liblstack.so using LD_PRELOAD mode will read liblstack.so, - poisx_api need to be initialized here */ + /* posix api maybe call before gazelle init */ if (posix_api_init() != 0) { LSTACK_PRE_LOG(LSTACK_ERR, "posix_api_init failed\n"); } @@ -78,8 +77,7 @@ static inline enum KERNEL_LWIP_PATH select_path(int fd) static inline int32_t do_epoll_create(int32_t size) { if (posix_api == NULL) { - /* link liblstack.so using LD_PRELOAD mode will read liblstack.so, - poisx_api need to be initialized here */ + /* posix api maybe call before gazelle init */ if (posix_api_init() != 0) { LSTACK_PRE_LOG(LSTACK_ERR, "posix_api_init failed\n"); } @@ -99,11 +97,6 @@ static inline int32_t do_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct return posix_api->epoll_ctl_fn(epfd, op, fd, event); } - struct lwip_sock *sock = get_socket_by_fd(epfd); - if (sock == NULL || sock->wakeup == NULL) { - return posix_api->epoll_ctl_fn(epfd, op, fd, event); - } - return lstack_epoll_ctl(epfd, op, fd, event); } @@ -113,11 +106,6 @@ static inline int32_t do_epoll_wait(int32_t epfd, struct epoll_event* events, in return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); } - struct lwip_sock *sock = get_socket_by_fd(epfd); - if (sock == NULL || sock->wakeup == NULL) { - return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); - } - if (epfd < 0) { GAZELLE_RETURN(EBADF); } @@ -362,6 +350,11 @@ static inline ssize_t do_sendmsg(int32_t s, const struct msghdr *message, int32_ static inline int32_t do_close(int32_t s) { + struct lwip_sock *sock = get_socket_by_fd(s); + if (sock && sock->wakeup && sock->wakeup->epollfd == s) { + return lstack_epoll_close(s); + } + if (select_path(s) == PATH_KERNEL) { return posix_api->close_fn(s); } diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index aa91201..cdd2c05 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -95,8 +95,8 @@ int32_t dpdk_eal_init(void) if (ret < 0) { if (rte_errno == EALREADY) { LSTACK_PRE_LOG(LSTACK_INFO, "rte_eal_init aleady init\n"); - /* maybe other program inited, merge init param share init */ - ret = 0; + /* maybe other program inited, merge init param share init */ + ret = 0; } else { LSTACK_PRE_LOG(LSTACK_ERR, "rte_eal_init failed init, rte_errno %d\n", rte_errno); diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c index 037b8fd..f8e96bf 100644 --- a/src/lstack/core/lstack_init.c +++ b/src/lstack/core/lstack_init.c @@ -270,7 +270,7 @@ __attribute__((constructor)) void gazelle_network_init(void) lwip_sock_init(); /* wait stack thread and kernel_event thread init finish */ - wait_sem_value(&get_protocol_stack_group()->all_init, get_protocol_stack_group()->stack_num); + wait_sem_value(&get_protocol_stack_group()->all_init, get_protocol_stack_group()->stack_num * 2); if (g_init_fail) { LSTACK_EXIT(1, "stack thread or kernel_event thread failed\n"); } diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 00a82fb..8544ef7 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -29,6 +29,7 @@ #include "lstack_log.h" #include "lstack_dpdk.h" #include "lstack_stack_stat.h" +#include "posix/lstack_epoll.h" #include "lstack_lwip.h" #define HALF_DIVISOR (2) diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 565d19b..eb975c0 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -35,6 +35,7 @@ #define READ_LIST_MAX 32 #define SEND_LIST_MAX 32 #define HANDLE_RPC_MSG_MAX 32 +#define KERNEL_EPOLL_MAX 256 static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX; static struct protocol_stack_group g_stack_group = {0}; @@ -43,9 +44,6 @@ static PER_THREAD long g_stack_tid = 0; void set_init_fail(void); typedef void *(*stack_thread_func)(void *arg); -#ifdef GAZELLE_USE_EPOLL_EVENT_STACK -void update_stack_events(struct protocol_stack *stack); -#endif int32_t bind_to_stack_numa(struct protocol_stack *stack) { @@ -206,6 +204,10 @@ static void* gazelle_weakup_thread(void *arg) LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id); for (;;) { + if (rte_ring_count(stack->wakeup_ring) == 0) { + continue; + } + sem_t *event_sem; if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) { continue; @@ -268,6 +270,61 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id) return 0; } +static void* gazelle_kernel_event(void *arg) +{ + uint16_t queue_id = *(uint16_t *)arg; + + int32_t epoll_fd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN); + if (epoll_fd < 0) { + LSTACK_LOG(ERR, LSTACK, "queue_id=%d epoll_fd=%d errno=%d\n", queue_id, epoll_fd, errno); + /* exit in main thread, avoid create mempool and exit at the same time */ + set_init_fail(); + sem_post(&get_protocol_stack_group()->all_init); + return NULL; + } + + struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id]; + stack->epollfd = epoll_fd; + + sem_post(&get_protocol_stack_group()->all_init); + LSTACK_LOG(INFO, LSTACK, "kernel_event_%02d start\n", stack->queue_id); + + struct epoll_event events[KERNEL_EPOLL_MAX]; + for (;;) { + int32_t event_num = posix_api->epoll_wait_fn(epoll_fd, events, KERNEL_EPOLL_MAX, -1); + if (event_num <= 0) { + continue; + } + + for (int32_t i = 0; i < event_num; i++) { + if (events[i].data.ptr) { + sem_post((sem_t *)events[i].data.ptr); + } + } + } + + return NULL; +} + +static int32_t create_companion_thread(struct protocol_stack_group *stack_group, struct protocol_stack *stack) +{ + int32_t ret; + + if (stack_group->wakeup_enable) { + ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "gazelleweakup ret=%d errno=%d\n", ret, errno); + return ret; + } + } + + ret = create_thread(stack->queue_id, "gazellekernel", gazelle_kernel_event); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "gazellekernelEvent ret=%d errno=%d\n", ret, errno); + } + return ret; +} + void wait_sem_value(sem_t *sem, int32_t wait_value) { int32_t sem_val; @@ -315,12 +372,9 @@ static struct protocol_stack * stack_thread_init(uint16_t queue_id) return NULL; } - if (stack_group->wakeup_enable) { - int32_t ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread); - if (ret != 0) { - free(stack); - return NULL; - } + if (create_companion_thread(stack_group, stack) != 0) { + free(stack); + return NULL; } return stack; @@ -338,6 +392,7 @@ static void* gazelle_stack_thread(void *arg) LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%d\n", queue_id); return NULL; } + sem_post(&get_protocol_stack_group()->all_init); LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id); @@ -351,10 +406,6 @@ static void* gazelle_stack_thread(void *arg) send_stack_list(stack, SEND_LIST_MAX); sys_timer_run(); - -#ifdef GAZELLE_USE_EPOLL_EVENT_STACK - update_stack_events(stack); -#endif } return NULL; @@ -378,7 +429,13 @@ static int32_t init_protocol_sem(void) LSTACK_LOG(ERR, PORT, "sem_init failed ret=%d errno=%d\n", ret, errno); return -1; } - + + ret = sem_init(&stack_group->all_init, 0, 0); + if (ret < 0) { + LSTACK_LOG(ERR, PORT, "sem_init failed ret=%d errno=%d\n", ret, errno); + return -1; + } + return 0; } diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h index 345a373..987828d 100644 --- a/src/lstack/include/lstack_cfg.h +++ b/src/lstack/include/lstack_cfg.h @@ -33,7 +33,6 @@ #define LOG_DIR_PATH PATH_MAX #define LOG_LEVEL_LEN 16 #define GAZELLE_MAX_NUMA_NODES 8 -#define LWIP_EPOOL_MAX_EVENTS 512 /* Default value of low power mode parameters */ #define LSTACK_LPM_DETECT_MS_MIN (5 * 1000) diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index 9852878..bc4e4bd 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -51,6 +51,7 @@ struct protocol_stack { struct list_node send_list; struct list_node event_list; pthread_spinlock_t event_lock; + int32_t epollfd; /* kernel event thread epoll fd */ struct gazelle_stat_pkts stats; struct gazelle_stack_latency latency; @@ -75,13 +76,6 @@ struct protocol_stack_group { uint64_t call_alloc_fail; }; -struct wakeup_poll { - bool init; - struct protocol_stack *bind_stack; - struct list_node event_list; /* epoll temp use poll */ - sem_t event_sem; -}; - long get_stack_tid(void); struct protocol_stack *get_protocol_stack(void); struct protocol_stack *get_protocol_stack_by_fd(int32_t fd); diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h index cac640b..a83f41f 100644 --- a/src/lstack/include/posix/lstack_epoll.h +++ b/src/lstack/include/posix/lstack_epoll.h @@ -18,6 +18,30 @@ extern "C" { #endif #include +#include +#include + +#include "lstack_protocol_stack.h" + +struct wakeup_poll { + bool init; + struct protocol_stack *bind_stack; + sem_t event_sem; + + int32_t epollfd; + bool have_kernel_fd; + + /* poll */ + struct pollfd *last_fds; + nfds_t last_nfds; + nfds_t last_max_nfds; + struct epoll_event *events; + + /* epoll */ + int32_t stack_fd_cnt[PROTOCOL_STACK_MAX]; + struct protocol_stack *max_stack; + struct list_node event_list; /* epoll temp use */ +}; int32_t lstack_epoll_create(int32_t size); int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event); -- 2.23.0