update README
fix some bugs
refactor pkt read send to improve performance
refactor kernle event toimproveperformance
(cherry picked from commit a8c66704608ca83c799adab88be6214bccdcfa44)
837 lines
28 KiB
Diff
837 lines
28 KiB
Diff
From a74d5b38b2021397d13b13aaa30f41f69be6f475 Mon Sep 17 00:00:00 2001
|
|
From: wuchangsheng <wuchangsheng2@huawei.com>
|
|
Date: Thu, 21 Apr 2022 17:21:59 +0800
|
|
Subject: [PATCH 09/18] refactor kernel event poll/epoll
|
|
|
|
---
|
|
src/lstack/api/lstack_epoll.c | 343 +++++++++++++++------
|
|
src/lstack/api/lstack_wrap.c | 21 +-
|
|
src/lstack/core/lstack_dpdk.c | 4 +-
|
|
src/lstack/core/lstack_init.c | 2 +-
|
|
src/lstack/core/lstack_lwip.c | 1 +
|
|
src/lstack/core/lstack_protocol_stack.c | 85 ++++-
|
|
src/lstack/include/lstack_cfg.h | 1 -
|
|
src/lstack/include/lstack_protocol_stack.h | 8 +-
|
|
src/lstack/include/posix/lstack_epoll.h | 24 ++
|
|
9 files changed, 350 insertions(+), 139 deletions(-)
|
|
|
|
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
|
|
index b8d53f6..cba67ea 100644
|
|
--- a/src/lstack/api/lstack_epoll.c
|
|
+++ b/src/lstack/api/lstack_epoll.c
|
|
@@ -15,6 +15,7 @@
|
|
#include <sys/epoll.h>
|
|
#include <time.h>
|
|
#include <poll.h>
|
|
+#include <stdatomic.h>
|
|
|
|
#include <lwip/lwipsock.h>
|
|
#include <lwip/sockets.h>
|
|
@@ -32,10 +33,14 @@
|
|
#include "gazelle_base_func.h"
|
|
#include "lstack_lwip.h"
|
|
#include "lstack_protocol_stack.h"
|
|
+#include "posix/lstack_epoll.h"
|
|
|
|
#define EPOLL_KERNEL_INTERVAL 10 /* ms */
|
|
-#define EPOLL_NSEC_TO_SEC 1000000000
|
|
+#define SEC_TO_NSEC 1000000000
|
|
+#define SEC_TO_MSEC 1000
|
|
+#define MSEC_TO_NSEC 1000000
|
|
#define EPOLL_MAX_EVENTS 512
|
|
+#define POLL_KERNEL_EVENTS 32
|
|
|
|
static PER_THREAD struct wakeup_poll g_wakeup_poll = {0};
|
|
static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */
|
|
@@ -149,12 +154,12 @@ int32_t lstack_epoll_create(int32_t size)
|
|
posix_api->close_fn(fd);
|
|
GAZELLE_RETURN(EINVAL);
|
|
}
|
|
-
|
|
memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll));
|
|
- sem_init(&wakeup->event_sem, 0, 0);
|
|
|
|
- sock->wakeup = wakeup;
|
|
init_list_node(&wakeup->event_list);
|
|
+ wakeup->epollfd = fd;
|
|
+ sem_init(&wakeup->event_sem, 0, 0);
|
|
+ sock->wakeup = wakeup;
|
|
|
|
g_use_epoll = true;
|
|
return fd;
|
|
@@ -162,6 +167,8 @@ int32_t lstack_epoll_create(int32_t size)
|
|
|
|
int32_t lstack_epoll_close(int32_t fd)
|
|
{
|
|
+ posix_api->close_fn(fd);
|
|
+
|
|
struct lwip_sock *sock = get_socket_by_fd(fd);
|
|
if (sock == NULL) {
|
|
LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno);
|
|
@@ -176,6 +183,43 @@ int32_t lstack_epoll_close(int32_t fd)
|
|
return 0;
|
|
}
|
|
|
|
+static uint16_t find_max_cnt_stack(int32_t *stack_count, uint16_t stack_num, struct protocol_stack *last_stack)
|
|
+{
|
|
+ uint16_t max_index = 0;
|
|
+ bool all_same_cnt = true;
|
|
+
|
|
+ for (uint16_t i = 1; i < stack_num; i++) {
|
|
+ if (stack_count[i] != stack_count[0]) {
|
|
+ all_same_cnt = false;
|
|
+ }
|
|
+
|
|
+ if (stack_count[i] > stack_count[max_index]) {
|
|
+ max_index = i;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /* all stack same, don't change */
|
|
+ if (all_same_cnt && last_stack) {
|
|
+ return last_stack->queue_id;
|
|
+ }
|
|
+
|
|
+ /* first bind and all stack same. choice tick as queue_id, avoid all bind to statck_0.*/
|
|
+ static uint16_t tick = 0;
|
|
+ if (all_same_cnt && stack_num) {
|
|
+ max_index = atomic_fetch_add(&tick, 1) % stack_num;
|
|
+ }
|
|
+
|
|
+ return max_index;
|
|
+}
|
|
+
|
|
+static void update_epoll_max_stack(struct wakeup_poll *wakeup)
|
|
+{
|
|
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
|
+ uint16_t bind_id = find_max_cnt_stack(wakeup->stack_fd_cnt, stack_group->stack_num, wakeup->max_stack);
|
|
+
|
|
+ wakeup->max_stack = stack_group->stacks[bind_id];
|
|
+}
|
|
+
|
|
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event)
|
|
{
|
|
LSTACK_LOG(DEBUG, LSTACK, "op=%d events: fd: %d\n", op, fd);
|
|
@@ -185,35 +229,38 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
|
|
GAZELLE_RETURN(EINVAL);
|
|
}
|
|
|
|
+ struct lwip_sock *epoll_sock = get_socket_by_fd(epfd);
|
|
+ if (epoll_sock == NULL || epoll_sock->wakeup == NULL) {
|
|
+ return posix_api->epoll_ctl_fn(epfd, op, fd, event);
|
|
+ }
|
|
+
|
|
struct lwip_sock *sock = get_socket(fd);
|
|
if (sock == NULL) {
|
|
+ epoll_sock->wakeup->have_kernel_fd = true;
|
|
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
|
|
}
|
|
|
|
if (CONN_TYPE_HAS_HOST(sock->conn)) {
|
|
+ epoll_sock->wakeup->have_kernel_fd = true;
|
|
int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event);
|
|
if (ret < 0) {
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
- struct lwip_sock *epoll_sock = get_socket_by_fd(epfd);
|
|
- if (epoll_sock == NULL || epoll_sock->wakeup == NULL) {
|
|
- LSTACK_LOG(ERR, LSTACK, "epfd=%d\n", fd);
|
|
- GAZELLE_RETURN(EINVAL);
|
|
- }
|
|
-
|
|
- uint32_t events = event->events | EPOLLERR | EPOLLHUP;
|
|
do {
|
|
switch (op) {
|
|
case EPOLL_CTL_ADD:
|
|
sock->wakeup = epoll_sock->wakeup;
|
|
+ if (sock->stack) {
|
|
+ epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]++;
|
|
+ }
|
|
if (list_is_empty(&sock->event_list)) {
|
|
list_add_node(&sock->wakeup->event_list, &sock->event_list);
|
|
}
|
|
/* fall through */
|
|
case EPOLL_CTL_MOD:
|
|
- sock->epoll_events = events;
|
|
+ sock->epoll_events = event->events | EPOLLERR | EPOLLHUP;
|
|
sock->ep_data = event->data;
|
|
if (sock->conn && NETCONNTYPE_GROUP(netconn_type(sock->conn)) == NETCONN_TCP) {
|
|
raise_pending_events(sock);
|
|
@@ -222,6 +269,9 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
|
|
case EPOLL_CTL_DEL:
|
|
list_del_node_init(&sock->event_list);
|
|
sock->epoll_events = 0;
|
|
+ if (sock->stack) {
|
|
+ epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]--;
|
|
+ }
|
|
break;
|
|
default:
|
|
GAZELLE_RETURN(EINVAL);
|
|
@@ -230,6 +280,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
|
|
sock = get_socket(fd);
|
|
} while (fd > 0 && sock != NULL);
|
|
|
|
+ update_epoll_max_stack(epoll_sock->wakeup);
|
|
return 0;
|
|
}
|
|
|
|
@@ -346,129 +397,196 @@ static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds)
|
|
return event_num;
|
|
}
|
|
|
|
-static inline bool have_kernel_fd(int32_t epfd, struct pollfd *fds, nfds_t nfds)
|
|
+static void ms_to_timespec(struct timespec *timespec, int32_t timeout)
|
|
{
|
|
- /* when epfd > 0 is epoll type */
|
|
- for (uint32_t i = 0; i < nfds && epfd < 0; i++) {
|
|
- if (get_socket(fds[i].fd) == NULL) {
|
|
- return true;
|
|
+ clock_gettime(CLOCK_REALTIME, timespec);
|
|
+ timespec->tv_sec += timeout / SEC_TO_MSEC;
|
|
+ timespec->tv_nsec += (timeout % SEC_TO_MSEC) * MSEC_TO_NSEC;
|
|
+ timespec->tv_sec += timespec->tv_nsec / SEC_TO_NSEC;
|
|
+ timespec->tv_nsec = timespec->tv_nsec % SEC_TO_NSEC;
|
|
+}
|
|
+
|
|
+static void change_epollfd_kernel_thread(struct wakeup_poll *wakeup, struct protocol_stack *old_stack,
|
|
+ struct protocol_stack *new_stack)
|
|
+{
|
|
+ if (old_stack) {
|
|
+ if (posix_api->epoll_ctl_fn(old_stack->epollfd, EPOLL_CTL_DEL, wakeup->epollfd, NULL) != 0) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
|
|
}
|
|
}
|
|
|
|
- return false;
|
|
+ /* avoid kernel thread post too much, use EPOLLET */
|
|
+ struct epoll_event event;
|
|
+ event.data.ptr = &wakeup->event_sem;
|
|
+ event.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLET;
|
|
+ if (posix_api->epoll_ctl_fn(new_stack->epollfd, EPOLL_CTL_ADD, wakeup->epollfd, &event) != 0) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
|
|
+ }
|
|
}
|
|
|
|
-static inline int32_t poll_kernel_event(struct pollfd *fds, nfds_t nfds)
|
|
+static void epoll_bind_statck(struct wakeup_poll *wakeup)
|
|
{
|
|
- int32_t event_num = 0;
|
|
-
|
|
- for (uint32_t i = 0; i < nfds; i++) {
|
|
- /* lwip event */
|
|
- if (get_socket(fds[i].fd) != NULL || fds[i].fd < 0) {
|
|
- continue;
|
|
- }
|
|
-
|
|
- int32_t ret = posix_api->poll_fn(&fds[i], 1, 0);
|
|
- if (ret < 0) {
|
|
- if (errno != EINTR) {
|
|
- return ret;
|
|
- }
|
|
- } else {
|
|
- event_num += ret;
|
|
- }
|
|
+ /* all fd is kernel, set rand stack */
|
|
+ if (wakeup->bind_stack == NULL && wakeup->max_stack== NULL) {
|
|
+ update_epoll_max_stack(wakeup);
|
|
}
|
|
|
|
- return event_num;
|
|
+ if (wakeup->bind_stack != wakeup->max_stack && wakeup->max_stack) {
|
|
+ bind_to_stack_numa(wakeup->max_stack);
|
|
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, wakeup->max_stack);
|
|
+ wakeup->bind_stack = wakeup->max_stack;
|
|
+ }
|
|
}
|
|
|
|
-static int32_t get_event(struct wakeup_poll *wakeup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout)
|
|
+int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
|
|
{
|
|
- struct pollfd *fds = (struct pollfd *)out;
|
|
- struct epoll_event *events = (struct epoll_event *)out;
|
|
- bool have_kernel = have_kernel_fd(epfd, fds, maxevents);
|
|
+ struct lwip_sock *sock = get_socket_by_fd(epfd);
|
|
+ if (sock == NULL || sock->wakeup == NULL) {
|
|
+ return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
|
|
+ }
|
|
+
|
|
int32_t event_num = 0;
|
|
- int32_t poll_time = 0;
|
|
int32_t ret;
|
|
|
|
- /* when epfd > 0 is epoll type */
|
|
+ struct timespec epoll_time;
|
|
+ if (timeout >= 0) {
|
|
+ ms_to_timespec(&epoll_time, timeout);
|
|
+ }
|
|
+
|
|
+ epoll_bind_statck(sock->wakeup);
|
|
+
|
|
do {
|
|
- event_num += (epfd > 0) ? epoll_lwip_event(wakeup, &events[event_num], maxevents - event_num) :
|
|
- poll_lwip_event(fds, maxevents);
|
|
-
|
|
- if (have_kernel) {
|
|
- int32_t event_kernel_num = (epfd > 0) ?
|
|
- posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0) :
|
|
- poll_kernel_event(fds, maxevents);
|
|
- if (event_kernel_num < 0) {
|
|
- return event_kernel_num;
|
|
- }
|
|
- event_num += event_kernel_num;
|
|
- if (timeout >= 0 && poll_time >= timeout) {
|
|
- break;
|
|
- }
|
|
- poll_time += EPOLL_KERNEL_INTERVAL;
|
|
+ event_num += epoll_lwip_event(sock->wakeup, &events[event_num], maxevents - event_num);
|
|
+
|
|
+ if (sock->wakeup->have_kernel_fd) {
|
|
+ event_num += posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0);
|
|
}
|
|
|
|
if (event_num > 0) {
|
|
break;
|
|
}
|
|
|
|
- int32_t interval = (have_kernel) ? EPOLL_KERNEL_INTERVAL : timeout;
|
|
- struct timespec epoll_interval;
|
|
- clock_gettime(CLOCK_REALTIME, &epoll_interval);
|
|
- epoll_interval.tv_sec += interval / 1000;
|
|
- epoll_interval.tv_nsec += (interval % 1000) * 1000000;
|
|
- epoll_interval.tv_sec += epoll_interval.tv_nsec / 1000000000;
|
|
- epoll_interval.tv_nsec = epoll_interval.tv_nsec % 1000000000;
|
|
-
|
|
- if (timeout < 0 && !have_kernel) {
|
|
- ret = sem_wait(&wakeup->event_sem);
|
|
+ if (timeout < 0) {
|
|
+ ret = sem_wait(&sock->wakeup->event_sem);
|
|
} else {
|
|
- ret = sem_timedwait(&wakeup->event_sem, &epoll_interval);
|
|
+ ret = sem_timedwait(&sock->wakeup->event_sem, &epoll_time);
|
|
}
|
|
-
|
|
- if (!have_kernel && ret < 0) {
|
|
- break;
|
|
- }
|
|
- } while (event_num <= maxevents);
|
|
+ } while (ret == 0);
|
|
|
|
return event_num;
|
|
}
|
|
|
|
-int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
|
|
+static void init_poll_wakeup_data(struct wakeup_poll *wakeup)
|
|
{
|
|
- /* avoid the starvation of epoll events from both netstack */
|
|
- maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents);
|
|
+ sem_init(&wakeup->event_sem, 0, 0);
|
|
|
|
- struct lwip_sock *sock = get_socket_by_fd(epfd);
|
|
- if (sock == NULL) {
|
|
- GAZELLE_RETURN(EINVAL);
|
|
+ wakeup->last_fds = calloc(POLL_KERNEL_EVENTS, sizeof(struct pollfd));
|
|
+ if (wakeup->last_fds == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
|
|
}
|
|
|
|
- if (sock->wakeup == NULL) {
|
|
- return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
|
|
+ wakeup->events = calloc(POLL_KERNEL_EVENTS, sizeof(struct epoll_event));
|
|
+ if (wakeup->events == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
|
|
}
|
|
|
|
- return get_event(sock->wakeup, epfd, events, maxevents, timeout);
|
|
+ wakeup->last_max_nfds = POLL_KERNEL_EVENTS;
|
|
+
|
|
+ wakeup->epollfd = posix_api->epoll_create_fn(POLL_KERNEL_EVENTS);
|
|
+ if (wakeup->epollfd < 0) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "epoll_create_fn errno=%d\n", errno);
|
|
+ }
|
|
}
|
|
|
|
-static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeup)
|
|
+static void resize_kernel_poll(struct wakeup_poll *wakeup, nfds_t nfds)
|
|
{
|
|
- int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
|
|
+ wakeup->last_fds = realloc(wakeup->last_fds, nfds * sizeof(struct pollfd));
|
|
+ if (wakeup->last_fds == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
|
|
+ }
|
|
+
|
|
+ wakeup->events = realloc(wakeup->events, nfds * sizeof(struct epoll_event));
|
|
+ if (wakeup->events == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
|
|
+ }
|
|
+
|
|
+ wakeup->last_max_nfds = nfds;
|
|
+ memset_s(wakeup->last_fds, nfds * sizeof(struct pollfd), 0, nfds * sizeof(struct pollfd));
|
|
+}
|
|
+
|
|
+static void poll_bind_statck(struct wakeup_poll *wakeup, int32_t *stack_count)
|
|
+{
|
|
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
|
+ uint16_t bind_id = find_max_cnt_stack(stack_count, stack_group->stack_num, wakeup->bind_stack);
|
|
+
|
|
+ if (wakeup->bind_stack && wakeup->bind_stack->queue_id == bind_id) {
|
|
+ return;
|
|
+ }
|
|
|
|
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, stack_group->stacks[bind_id]);
|
|
+ bind_to_stack_numa(stack_group->stacks[bind_id]);
|
|
+ wakeup->bind_stack = stack_group->stacks[bind_id];
|
|
+}
|
|
+
|
|
+static void update_kernel_poll(struct wakeup_poll *wakeup, uint32_t index, struct pollfd *new_fd)
|
|
+{
|
|
+ posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_DEL, wakeup->last_fds[index].fd, NULL);
|
|
+
|
|
+ if (new_fd == NULL) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ struct epoll_event event;
|
|
+ event.data.u32 = index;
|
|
+ event.events = new_fd->events;
|
|
+ if (posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_ADD, new_fd->fd, &event) != 0) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
|
|
+ }
|
|
+
|
|
+ wakeup->last_fds[index].fd = new_fd->fd;
|
|
+ wakeup->last_fds[index].events = new_fd->events;
|
|
+
|
|
+ wakeup->have_kernel_fd = true;
|
|
+}
|
|
+
|
|
+static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfds)
|
|
+{
|
|
if (!wakeup->init) {
|
|
wakeup->init = true;
|
|
- sem_init(&wakeup->event_sem, 0, 0);
|
|
+ init_poll_wakeup_data(wakeup);
|
|
} else {
|
|
while (sem_trywait(&wakeup->event_sem) == 0) {}
|
|
}
|
|
|
|
+ if (nfds > wakeup->last_max_nfds) {
|
|
+ resize_kernel_poll(wakeup, nfds);
|
|
+ }
|
|
+
|
|
+ int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
|
|
+ int32_t poll_change = 0;
|
|
+
|
|
+ /* poll fds num less, del old fd */
|
|
+ for (uint32_t i = nfds; i < wakeup->last_nfds; i++) {
|
|
+ update_kernel_poll(wakeup, i, NULL);
|
|
+ poll_change = 1;
|
|
+ }
|
|
+
|
|
for (uint32_t i = 0; i < nfds; i++) {
|
|
- int32_t fd = fds[i].fd;
|
|
fds[i].revents = 0;
|
|
|
|
+ if (fds[i].fd == wakeup->last_fds[i].fd && fds[i].events == wakeup->last_fds[i].events) {
|
|
+ continue;
|
|
+ }
|
|
+ poll_change = 1;
|
|
+
|
|
+ int32_t fd = fds[i].fd;
|
|
+ struct lwip_sock *sock = get_socket(fd);
|
|
+ if (sock == NULL || CONN_TYPE_HAS_HOST(sock->conn)) {
|
|
+ update_kernel_poll(wakeup, i, fds + i);
|
|
+ }
|
|
+
|
|
do {
|
|
- struct lwip_sock *sock = get_socket(fd);
|
|
+ sock = get_socket(fd);
|
|
if (sock == NULL || sock->conn == NULL) {
|
|
break;
|
|
}
|
|
@@ -481,25 +599,50 @@ static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeu
|
|
} while (fd > 0);
|
|
}
|
|
|
|
- if (wakeup->bind_stack) {
|
|
+ wakeup->last_nfds = nfds;
|
|
+ if (poll_change == 0) {
|
|
return;
|
|
}
|
|
|
|
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
|
- uint32_t bind_id = 0;
|
|
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
|
|
- if (stack_count[i] > stack_count[bind_id]) {
|
|
- bind_id = i;
|
|
- }
|
|
- }
|
|
-
|
|
- bind_to_stack_numa(stack_group->stacks[bind_id]);
|
|
- wakeup->bind_stack = stack_group->stacks[bind_id];
|
|
+ poll_bind_statck(wakeup, stack_count);
|
|
}
|
|
|
|
int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
|
|
{
|
|
- poll_init(fds, nfds, &g_wakeup_poll);
|
|
+ poll_init(&g_wakeup_poll, fds, nfds);
|
|
|
|
- return get_event(&g_wakeup_poll, -1, fds, nfds, timeout);
|
|
+ int32_t event_num = 0;
|
|
+ int32_t ret;
|
|
+
|
|
+ struct timespec poll_time;
|
|
+ if (timeout >= 0) {
|
|
+ ms_to_timespec(&poll_time, timeout);
|
|
+ }
|
|
+
|
|
+ /* when epfd > 0 is epoll type */
|
|
+ do {
|
|
+ event_num += poll_lwip_event(fds, nfds);
|
|
+
|
|
+ /* reduce syscall epoll_wait */
|
|
+ if (g_wakeup_poll.have_kernel_fd) {
|
|
+ int32_t kernel_num = posix_api->epoll_wait_fn(g_wakeup_poll.epollfd, g_wakeup_poll.events, nfds, 0);
|
|
+ for (int32_t i = 0; i < kernel_num; i++) {
|
|
+ uint32_t index = g_wakeup_poll.events[i].data.u32;
|
|
+ fds[index].revents = g_wakeup_poll.events[i].events;
|
|
+ }
|
|
+ event_num += kernel_num >= 0 ? kernel_num : 0;
|
|
+ }
|
|
+
|
|
+ if (event_num > 0) {
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ if (timeout < 0) {
|
|
+ ret = sem_wait(&g_wakeup_poll.event_sem);
|
|
+ } else {
|
|
+ ret = sem_timedwait(&g_wakeup_poll.event_sem, &poll_time);
|
|
+ }
|
|
+ } while (ret == 0);
|
|
+
|
|
+ return event_num;
|
|
}
|
|
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
|
|
index f623da3..bf5dcb4 100644
|
|
--- a/src/lstack/api/lstack_wrap.c
|
|
+++ b/src/lstack/api/lstack_wrap.c
|
|
@@ -45,8 +45,7 @@ enum KERNEL_LWIP_PATH {
|
|
static inline enum KERNEL_LWIP_PATH select_path(int fd)
|
|
{
|
|
if (posix_api == NULL) {
|
|
- /* link liblstack.so using LD_PRELOAD mode will read liblstack.so,
|
|
- poisx_api need to be initialized here */
|
|
+ /* posix api maybe call before gazelle init */
|
|
if (posix_api_init() != 0) {
|
|
LSTACK_PRE_LOG(LSTACK_ERR, "posix_api_init failed\n");
|
|
}
|
|
@@ -78,8 +77,7 @@ static inline enum KERNEL_LWIP_PATH select_path(int fd)
|
|
static inline int32_t do_epoll_create(int32_t size)
|
|
{
|
|
if (posix_api == NULL) {
|
|
- /* link liblstack.so using LD_PRELOAD mode will read liblstack.so,
|
|
- poisx_api need to be initialized here */
|
|
+ /* posix api maybe call before gazelle init */
|
|
if (posix_api_init() != 0) {
|
|
LSTACK_PRE_LOG(LSTACK_ERR, "posix_api_init failed\n");
|
|
}
|
|
@@ -99,11 +97,6 @@ static inline int32_t do_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct
|
|
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
|
|
}
|
|
|
|
- struct lwip_sock *sock = get_socket_by_fd(epfd);
|
|
- if (sock == NULL || sock->wakeup == NULL) {
|
|
- return posix_api->epoll_ctl_fn(epfd, op, fd, event);
|
|
- }
|
|
-
|
|
return lstack_epoll_ctl(epfd, op, fd, event);
|
|
}
|
|
|
|
@@ -113,11 +106,6 @@ static inline int32_t do_epoll_wait(int32_t epfd, struct epoll_event* events, in
|
|
return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
|
|
}
|
|
|
|
- struct lwip_sock *sock = get_socket_by_fd(epfd);
|
|
- if (sock == NULL || sock->wakeup == NULL) {
|
|
- return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
|
|
- }
|
|
-
|
|
if (epfd < 0) {
|
|
GAZELLE_RETURN(EBADF);
|
|
}
|
|
@@ -362,6 +350,11 @@ static inline ssize_t do_sendmsg(int32_t s, const struct msghdr *message, int32_
|
|
|
|
static inline int32_t do_close(int32_t s)
|
|
{
|
|
+ struct lwip_sock *sock = get_socket_by_fd(s);
|
|
+ if (sock && sock->wakeup && sock->wakeup->epollfd == s) {
|
|
+ return lstack_epoll_close(s);
|
|
+ }
|
|
+
|
|
if (select_path(s) == PATH_KERNEL) {
|
|
return posix_api->close_fn(s);
|
|
}
|
|
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
|
|
index aa91201..cdd2c05 100644
|
|
--- a/src/lstack/core/lstack_dpdk.c
|
|
+++ b/src/lstack/core/lstack_dpdk.c
|
|
@@ -95,8 +95,8 @@ int32_t dpdk_eal_init(void)
|
|
if (ret < 0) {
|
|
if (rte_errno == EALREADY) {
|
|
LSTACK_PRE_LOG(LSTACK_INFO, "rte_eal_init aleady init\n");
|
|
- /* maybe other program inited, merge init param share init */
|
|
- ret = 0;
|
|
+ /* maybe other program inited, merge init param share init */
|
|
+ ret = 0;
|
|
}
|
|
else {
|
|
LSTACK_PRE_LOG(LSTACK_ERR, "rte_eal_init failed init, rte_errno %d\n", rte_errno);
|
|
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
|
|
index 037b8fd..f8e96bf 100644
|
|
--- a/src/lstack/core/lstack_init.c
|
|
+++ b/src/lstack/core/lstack_init.c
|
|
@@ -270,7 +270,7 @@ __attribute__((constructor)) void gazelle_network_init(void)
|
|
lwip_sock_init();
|
|
|
|
/* wait stack thread and kernel_event thread init finish */
|
|
- wait_sem_value(&get_protocol_stack_group()->all_init, get_protocol_stack_group()->stack_num);
|
|
+ wait_sem_value(&get_protocol_stack_group()->all_init, get_protocol_stack_group()->stack_num * 2);
|
|
if (g_init_fail) {
|
|
LSTACK_EXIT(1, "stack thread or kernel_event thread failed\n");
|
|
}
|
|
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
|
|
index 00a82fb..8544ef7 100644
|
|
--- a/src/lstack/core/lstack_lwip.c
|
|
+++ b/src/lstack/core/lstack_lwip.c
|
|
@@ -29,6 +29,7 @@
|
|
#include "lstack_log.h"
|
|
#include "lstack_dpdk.h"
|
|
#include "lstack_stack_stat.h"
|
|
+#include "posix/lstack_epoll.h"
|
|
#include "lstack_lwip.h"
|
|
|
|
#define HALF_DIVISOR (2)
|
|
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
|
|
index 565d19b..eb975c0 100644
|
|
--- a/src/lstack/core/lstack_protocol_stack.c
|
|
+++ b/src/lstack/core/lstack_protocol_stack.c
|
|
@@ -35,6 +35,7 @@
|
|
#define READ_LIST_MAX 32
|
|
#define SEND_LIST_MAX 32
|
|
#define HANDLE_RPC_MSG_MAX 32
|
|
+#define KERNEL_EPOLL_MAX 256
|
|
|
|
static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX;
|
|
static struct protocol_stack_group g_stack_group = {0};
|
|
@@ -43,9 +44,6 @@ static PER_THREAD long g_stack_tid = 0;
|
|
void set_init_fail(void);
|
|
typedef void *(*stack_thread_func)(void *arg);
|
|
|
|
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
|
|
-void update_stack_events(struct protocol_stack *stack);
|
|
-#endif
|
|
|
|
int32_t bind_to_stack_numa(struct protocol_stack *stack)
|
|
{
|
|
@@ -206,6 +204,10 @@ static void* gazelle_weakup_thread(void *arg)
|
|
LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
|
|
|
|
for (;;) {
|
|
+ if (rte_ring_count(stack->wakeup_ring) == 0) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
sem_t *event_sem;
|
|
if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) {
|
|
continue;
|
|
@@ -268,6 +270,61 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
|
|
return 0;
|
|
}
|
|
|
|
+static void* gazelle_kernel_event(void *arg)
|
|
+{
|
|
+ uint16_t queue_id = *(uint16_t *)arg;
|
|
+
|
|
+ int32_t epoll_fd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN);
|
|
+ if (epoll_fd < 0) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "queue_id=%d epoll_fd=%d errno=%d\n", queue_id, epoll_fd, errno);
|
|
+ /* exit in main thread, avoid create mempool and exit at the same time */
|
|
+ set_init_fail();
|
|
+ sem_post(&get_protocol_stack_group()->all_init);
|
|
+ return NULL;
|
|
+ }
|
|
+
|
|
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
|
|
+ stack->epollfd = epoll_fd;
|
|
+
|
|
+ sem_post(&get_protocol_stack_group()->all_init);
|
|
+ LSTACK_LOG(INFO, LSTACK, "kernel_event_%02d start\n", stack->queue_id);
|
|
+
|
|
+ struct epoll_event events[KERNEL_EPOLL_MAX];
|
|
+ for (;;) {
|
|
+ int32_t event_num = posix_api->epoll_wait_fn(epoll_fd, events, KERNEL_EPOLL_MAX, -1);
|
|
+ if (event_num <= 0) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ for (int32_t i = 0; i < event_num; i++) {
|
|
+ if (events[i].data.ptr) {
|
|
+ sem_post((sem_t *)events[i].data.ptr);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return NULL;
|
|
+}
|
|
+
|
|
+static int32_t create_companion_thread(struct protocol_stack_group *stack_group, struct protocol_stack *stack)
|
|
+{
|
|
+ int32_t ret;
|
|
+
|
|
+ if (stack_group->wakeup_enable) {
|
|
+ ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread);
|
|
+ if (ret != 0) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "gazelleweakup ret=%d errno=%d\n", ret, errno);
|
|
+ return ret;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ ret = create_thread(stack->queue_id, "gazellekernel", gazelle_kernel_event);
|
|
+ if (ret != 0) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "gazellekernelEvent ret=%d errno=%d\n", ret, errno);
|
|
+ }
|
|
+ return ret;
|
|
+}
|
|
+
|
|
void wait_sem_value(sem_t *sem, int32_t wait_value)
|
|
{
|
|
int32_t sem_val;
|
|
@@ -315,12 +372,9 @@ static struct protocol_stack * stack_thread_init(uint16_t queue_id)
|
|
return NULL;
|
|
}
|
|
|
|
- if (stack_group->wakeup_enable) {
|
|
- int32_t ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread);
|
|
- if (ret != 0) {
|
|
- free(stack);
|
|
- return NULL;
|
|
- }
|
|
+ if (create_companion_thread(stack_group, stack) != 0) {
|
|
+ free(stack);
|
|
+ return NULL;
|
|
}
|
|
|
|
return stack;
|
|
@@ -338,6 +392,7 @@ static void* gazelle_stack_thread(void *arg)
|
|
LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%d\n", queue_id);
|
|
return NULL;
|
|
}
|
|
+
|
|
sem_post(&get_protocol_stack_group()->all_init);
|
|
LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
|
|
|
|
@@ -351,10 +406,6 @@ static void* gazelle_stack_thread(void *arg)
|
|
send_stack_list(stack, SEND_LIST_MAX);
|
|
|
|
sys_timer_run();
|
|
-
|
|
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
|
|
- update_stack_events(stack);
|
|
-#endif
|
|
}
|
|
|
|
return NULL;
|
|
@@ -378,7 +429,13 @@ static int32_t init_protocol_sem(void)
|
|
LSTACK_LOG(ERR, PORT, "sem_init failed ret=%d errno=%d\n", ret, errno);
|
|
return -1;
|
|
}
|
|
-
|
|
+
|
|
+ ret = sem_init(&stack_group->all_init, 0, 0);
|
|
+ if (ret < 0) {
|
|
+ LSTACK_LOG(ERR, PORT, "sem_init failed ret=%d errno=%d\n", ret, errno);
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
return 0;
|
|
}
|
|
|
|
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
|
|
index 345a373..987828d 100644
|
|
--- a/src/lstack/include/lstack_cfg.h
|
|
+++ b/src/lstack/include/lstack_cfg.h
|
|
@@ -33,7 +33,6 @@
|
|
#define LOG_DIR_PATH PATH_MAX
|
|
#define LOG_LEVEL_LEN 16
|
|
#define GAZELLE_MAX_NUMA_NODES 8
|
|
-#define LWIP_EPOOL_MAX_EVENTS 512
|
|
|
|
/* Default value of low power mode parameters */
|
|
#define LSTACK_LPM_DETECT_MS_MIN (5 * 1000)
|
|
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
|
|
index 9852878..bc4e4bd 100644
|
|
--- a/src/lstack/include/lstack_protocol_stack.h
|
|
+++ b/src/lstack/include/lstack_protocol_stack.h
|
|
@@ -51,6 +51,7 @@ struct protocol_stack {
|
|
struct list_node send_list;
|
|
struct list_node event_list;
|
|
pthread_spinlock_t event_lock;
|
|
+ int32_t epollfd; /* kernel event thread epoll fd */
|
|
|
|
struct gazelle_stat_pkts stats;
|
|
struct gazelle_stack_latency latency;
|
|
@@ -75,13 +76,6 @@ struct protocol_stack_group {
|
|
uint64_t call_alloc_fail;
|
|
};
|
|
|
|
-struct wakeup_poll {
|
|
- bool init;
|
|
- struct protocol_stack *bind_stack;
|
|
- struct list_node event_list; /* epoll temp use poll */
|
|
- sem_t event_sem;
|
|
-};
|
|
-
|
|
long get_stack_tid(void);
|
|
struct protocol_stack *get_protocol_stack(void);
|
|
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
|
|
diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h
|
|
index cac640b..a83f41f 100644
|
|
--- a/src/lstack/include/posix/lstack_epoll.h
|
|
+++ b/src/lstack/include/posix/lstack_epoll.h
|
|
@@ -18,6 +18,30 @@ extern "C" {
|
|
#endif
|
|
|
|
#include <poll.h>
|
|
+#include <stdbool.h>
|
|
+#include <semaphore.h>
|
|
+
|
|
+#include "lstack_protocol_stack.h"
|
|
+
|
|
+struct wakeup_poll {
|
|
+ bool init;
|
|
+ struct protocol_stack *bind_stack;
|
|
+ sem_t event_sem;
|
|
+
|
|
+ int32_t epollfd;
|
|
+ bool have_kernel_fd;
|
|
+
|
|
+ /* poll */
|
|
+ struct pollfd *last_fds;
|
|
+ nfds_t last_nfds;
|
|
+ nfds_t last_max_nfds;
|
|
+ struct epoll_event *events;
|
|
+
|
|
+ /* epoll */
|
|
+ int32_t stack_fd_cnt[PROTOCOL_STACK_MAX];
|
|
+ struct protocol_stack *max_stack;
|
|
+ struct list_node event_list; /* epoll temp use */
|
|
+};
|
|
|
|
int32_t lstack_epoll_create(int32_t size);
|
|
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event);
|
|
--
|
|
2.23.0
|
|
|