gazelle/0100-merger-wakeup.patch
wu-changsheng 9e74a22461 adapt ceph client
(cherry picked from commit 1d918d78398d92f67e4b36f31d64b3531476fcae)
2022-10-20 19:38:13 +08:00

985 lines
34 KiB
Diff

From 045c0ea6fa5a2251a4a205bc9a732e694ddbb5a7 Mon Sep 17 00:00:00 2001
From: wu-changsheng <wuchangsheng2@huawei.com>
Date: Sat, 8 Oct 2022 16:51:11 +0800
Subject: [PATCH 16/21] merger wakeup
---
src/lstack/api/lstack_epoll.c | 366 +++++++++------------
src/lstack/core/lstack_lwip.c | 29 +-
src/lstack/core/lstack_protocol_stack.c | 92 ++----
src/lstack/core/lstack_stack_stat.c | 34 +-
src/lstack/include/lstack_protocol_stack.h | 4 +-
src/lstack/include/lstack_stack_stat.h | 5 +-
src/lstack/include/posix/lstack_epoll.h | 19 +-
7 files changed, 232 insertions(+), 317 deletions(-)
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index 9c44f87..cc1cbf0 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -40,33 +40,35 @@
#define SEC_TO_NSEC 1000000000
#define SEC_TO_MSEC 1000
#define MSEC_TO_NSEC 1000000
-#define POLL_KERNEL_EVENTS 32
+#define POLL_KERNEL_EVENTS 128
-void add_epoll_event(struct netconn *conn, uint32_t event)
+void add_sock_event(struct lwip_sock *sock, uint32_t event)
{
- /* conn sock nerver null, because lwip call this func */
- struct lwip_sock *sock = get_socket_by_fd(conn->socket);
- if (sock->wakeup == NULL || (event & sock->epoll_events) == 0) {
+ struct wakeup_poll *wakeup = sock->wakeup;
+ if (wakeup == NULL || (event & sock->epoll_events) == 0) {
return;
}
- struct wakeup_poll *wakeup = sock->wakeup;
- struct protocol_stack *stack = sock->stack;
- if (wakeup->type == WAKEUP_EPOLL) {
- pthread_spin_lock(&wakeup->event_list_lock);
- sock->events |= (event == EPOLLERR) ? (EPOLLIN | EPOLLERR) : (event & sock->epoll_events);
- if (list_is_null(&sock->event_list)) {
- list_add_node(&wakeup->event_list, &sock->event_list);
- }
- pthread_spin_unlock(&wakeup->event_list_lock);
+ wakeup->have_event = true;
+
+ if (wakeup->type == WAKEUP_POLL) {
+ return;
}
- stack->stats.wakeup_events++;
- sem_t *sem = &wakeup->event_sem;
- if (get_protocol_stack_group()->wakeup_enable) {
- gazelle_light_ring_enqueue_busrt(stack->wakeup_ring, (void **)&sem, 1);
- } else {
- sem_post(sem);
+ pthread_spin_lock(&wakeup->event_list_lock);
+ sock->events |= (event == EPOLLERR) ? (EPOLLIN | EPOLLERR) : (event & sock->epoll_events);
+ if (list_is_null(&sock->event_list)) {
+ list_add_node(&wakeup->event_list, &sock->event_list);
+ }
+ pthread_spin_unlock(&wakeup->event_list_lock);
+}
+
+void wakeup_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup)
+{
+ if (__atomic_load_n(&wakeup->in_wait, __ATOMIC_ACQUIRE)) {
+ uint64_t tmp = 1;
+ posix_api->write_fn(wakeup->eventfd, &tmp, sizeof(tmp));
+ stack->stats.wakeup_events++;
}
}
@@ -98,11 +100,7 @@ static void raise_pending_events(struct wakeup_poll *wakeup, struct lwip_sock *s
{
sock->events = update_events(sock);
if (sock->events) {
- pthread_spin_lock(&wakeup->event_list_lock);
- if (list_is_null(&sock->event_list)) {
- list_add_node(&wakeup->event_list, &sock->event_list);
- }
- pthread_spin_unlock(&wakeup->event_list_lock);
+ add_sock_event(sock, sock->events);
}
}
@@ -120,28 +118,38 @@ int32_t lstack_epoll_create(int32_t size)
GAZELLE_RETURN(EINVAL);
}
- struct wakeup_poll *wakeup = malloc(sizeof(struct wakeup_poll));
+ struct wakeup_poll *wakeup = calloc(1, sizeof(struct wakeup_poll));
if (wakeup == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "calloc null\n");
posix_api->close_fn(fd);
GAZELLE_RETURN(EINVAL);
}
- if (memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll)) != 0) {
- LSTACK_LOG(ERR, LSTACK, "memset_s failed\n");
+
+ wakeup->eventfd = eventfd(0, EFD_NONBLOCK);
+ if (wakeup->eventfd < 0) {
+ LSTACK_LOG(ERR, LSTACK, "eventfd fail=%d errno=%d\n", wakeup->eventfd, errno);
+ posix_api->close_fn(fd);
free(wakeup);
+ GAZELLE_RETURN(EINVAL);
+ }
+
+ struct epoll_event event;
+ event.data.fd = wakeup->eventfd;
+ event.events = EPOLLIN | EPOLLET;
+ if (posix_api->epoll_ctl_fn(fd, EPOLL_CTL_ADD, wakeup->eventfd, &event) < 0) {
+ LSTACK_LOG(ERR, LSTACK, "eventfd errno=%d\n", errno);
posix_api->close_fn(fd);
+ free(wakeup);
GAZELLE_RETURN(EINVAL);
}
init_list_node(&wakeup->event_list);
- sem_init(&wakeup->event_sem, 0, 0);
pthread_spin_init(&wakeup->event_list_lock, PTHREAD_PROCESS_PRIVATE);
wakeup->type = WAKEUP_EPOLL;
wakeup->epollfd = fd;
sock->wakeup = wakeup;
- register_wakeup(wakeup);
-
return fd;
}
@@ -156,8 +164,10 @@ int32_t lstack_epoll_close(int32_t fd)
}
if (sock->wakeup) {
- unregister_wakeup(sock->wakeup);
- sem_destroy(&sock->wakeup->event_sem);
+ if (sock->stack) {
+ unregister_wakeup(sock->stack, sock->wakeup);
+ }
+ posix_api->close_fn(sock->wakeup->eventfd);
pthread_spin_destroy(&sock->wakeup->event_list_lock);
free(sock->wakeup);
}
@@ -217,19 +227,22 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
}
+
+ struct wakeup_poll *wakeup = epoll_sock->wakeup;
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
+ wakeup->have_kernel_fd = true;
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
}
if (CONN_TYPE_HAS_HOST(sock->conn)) {
+ wakeup->have_kernel_fd = true;
int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event);
if (ret < 0) {
LSTACK_LOG(ERR, LSTACK, "fd=%d epfd=%d op=%d\n", fd, epfd, op);
}
}
- struct wakeup_poll *wakeup = epoll_sock->wakeup;
do {
switch (op) {
case EPOLL_CTL_ADD:
@@ -258,34 +271,10 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
return 0;
}
-static void del_node_array(struct epoll_event *events, int32_t event_num, int32_t del_index)
-{
- for (int32_t i = del_index; i + 1 < event_num; i++) {
- events[i] = events[i + 1];
- }
-}
-
-static int32_t del_duplicate_event(struct epoll_event *events, int32_t event_num)
-{
- int32_t num = event_num;
-
- for (int32_t i = 0; i < num; i++) {
- for (int32_t j = i + 1; j < num; j++) {
- if (events[i].data.u64 == events[j].data.u64) {
- del_node_array(events, num, j);
- num--;
- }
- }
- }
-
- return num;
-}
-
static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents)
{
int32_t event_num = 0;
struct list_node *node, *temp;
- int32_t accept_num = 0;
pthread_spin_lock(&wakeup->event_list_lock);
@@ -297,10 +286,6 @@ static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *
continue;
}
- if (sock->conn && sock->conn->acceptmbox) {
- accept_num++;
- }
-
if (sock->epoll_events & EPOLLET) {
list_del_node_null(&sock->event_list);
}
@@ -323,10 +308,6 @@ static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *
pthread_spin_unlock(&wakeup->event_list_lock);
- if (accept_num > 1) {
- event_num = del_duplicate_event(events, event_num);
- }
-
wakeup->stat.app_events += event_num;
return event_num;
}
@@ -354,33 +335,6 @@ static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds)
return event_num;
}
-static void ms_to_timespec(struct timespec *timespec, int32_t timeout)
-{
- clock_gettime(CLOCK_REALTIME, timespec);
- timespec->tv_sec += timeout / SEC_TO_MSEC;
- timespec->tv_nsec += (timeout % SEC_TO_MSEC) * MSEC_TO_NSEC;
- timespec->tv_sec += timespec->tv_nsec / SEC_TO_NSEC;
- timespec->tv_nsec = timespec->tv_nsec % SEC_TO_NSEC;
-}
-
-static void change_epollfd_kernel_thread(struct wakeup_poll *wakeup, struct protocol_stack *old_stack,
- struct protocol_stack *new_stack)
-{
- if (old_stack) {
- if (posix_api->epoll_ctl_fn(old_stack->epollfd, EPOLL_CTL_DEL, wakeup->epollfd, NULL) != 0) {
- LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
- }
- }
-
- /* avoid kernel thread post too much, use EPOLLET */
- struct epoll_event event;
- event.data.ptr = wakeup;
- event.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLET;
- if (posix_api->epoll_ctl_fn(new_stack->epollfd, EPOLL_CTL_ADD, wakeup->epollfd, &event) != 0) {
- LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
- }
-}
-
static void epoll_bind_statck(struct wakeup_poll *wakeup)
{
/* all fd is kernel, set rand stack */
@@ -390,11 +344,27 @@ static void epoll_bind_statck(struct wakeup_poll *wakeup)
if (wakeup->bind_stack != wakeup->max_stack && wakeup->max_stack) {
bind_to_stack_numa(wakeup->max_stack);
- change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, wakeup->max_stack);
+ if (wakeup->bind_stack) {
+ unregister_wakeup(wakeup->bind_stack, wakeup);
+ }
wakeup->bind_stack = wakeup->max_stack;
+ register_wakeup(wakeup->bind_stack, wakeup);
}
}
+static bool del_event_fd(struct epoll_event* events, int32_t eventnum, int32_t eventfd)
+{
+ for (int32_t i = 0; i < eventnum; i++) {
+ if (events[i].data.fd == eventfd) {
+ events[i].data.u64 = events[eventnum - 1].data.u64;
+ events[i].events = events[eventnum - 1].events;
+ return true;
+ }
+ }
+
+ return false;
+}
+
int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
{
struct lwip_sock *sock = get_socket_by_fd(epfd);
@@ -402,61 +372,66 @@ int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxe
return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
}
- int32_t event_num = 0;
- int32_t ret;
-
- struct timespec epoll_time;
- if (timeout >= 0) {
- ms_to_timespec(&epoll_time, timeout);
- }
+ struct wakeup_poll *wakeup = sock->wakeup;
+ int32_t kernel_num = 0;
epoll_bind_statck(sock->wakeup);
- do {
- event_num += epoll_lwip_event(sock->wakeup, &events[event_num], maxevents - event_num);
- sock->wakeup->stat.app_events += event_num;
+ __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
+ rte_mb();
- if (__atomic_load_n(&sock->wakeup->have_kernel_event, __ATOMIC_RELAXED)) {
- event_num += posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0);
- }
+ int32_t lwip_num = epoll_lwip_event(wakeup, events, maxevents);
+ wakeup->stat.app_events += lwip_num;
+ if (!wakeup->have_kernel_fd && lwip_num > 0) {
+ return lwip_num;
+ }
- if (event_num > 0) {
- while (sem_trywait(&sock->wakeup->event_sem) == 0);
- break;
- }
+ if (lwip_num > 0) {
+ __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
+ rte_mb();
+ kernel_num = posix_api->epoll_wait_fn(epfd, &events[lwip_num], maxevents - lwip_num, 0);
+ } else {
+ kernel_num = posix_api->epoll_wait_fn(epfd, &events[lwip_num], maxevents - lwip_num, timeout);
+ rte_mb();
+ __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
+ }
+
+ if (kernel_num <= 0) {
+ return (lwip_num > 0) ? lwip_num : kernel_num;
+ }
- sock->wakeup->have_kernel_event = false;
- if (timeout < 0) {
- ret = sem_wait(&sock->wakeup->event_sem);
- } else {
- ret = sem_timedwait(&sock->wakeup->event_sem, &epoll_time);
+ if (del_event_fd(&events[lwip_num], kernel_num, wakeup->eventfd)) {
+ if (lwip_num == 0) {
+ lwip_num = epoll_lwip_event(wakeup, events, maxevents);
}
- } while (ret == 0);
+ kernel_num--;
+ }
- return event_num;
+ return lwip_num + kernel_num;
}
-static void init_poll_wakeup_data(struct wakeup_poll *wakeup)
+static int32_t init_poll_wakeup_data(struct wakeup_poll *wakeup)
{
- sem_init(&wakeup->event_sem, 0, 0);
wakeup->type = WAKEUP_POLL;
- wakeup->last_fds = calloc(POLL_KERNEL_EVENTS, sizeof(struct pollfd));
- if (wakeup->last_fds == NULL) {
- LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
+ wakeup->eventfd = eventfd(0, EFD_NONBLOCK);
+ if (wakeup->eventfd < 0) {
+ LSTACK_LOG(ERR, LSTACK, "eventfd failed errno=%d\n", errno);
+ GAZELLE_RETURN(EINVAL);
}
- wakeup->events = calloc(POLL_KERNEL_EVENTS, sizeof(struct epoll_event));
- if (wakeup->events == NULL) {
+ wakeup->last_fds = calloc(POLL_KERNEL_EVENTS, sizeof(struct pollfd));
+ if (wakeup->last_fds == NULL) {
LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
+ posix_api->close_fn(wakeup->eventfd);
+ GAZELLE_RETURN(EINVAL);
}
+ wakeup->last_fds[0].fd = wakeup->eventfd;
+ wakeup->last_fds[0].events = POLLIN;
wakeup->last_max_nfds = POLL_KERNEL_EVENTS;
- wakeup->epollfd = posix_api->epoll_create_fn(POLL_KERNEL_EVENTS);
- if (wakeup->epollfd < 0) {
- LSTACK_LOG(ERR, LSTACK, "epoll_create_fn errno=%d\n", errno);
- }
+ return 0;
}
static void resize_kernel_poll(struct wakeup_poll *wakeup, nfds_t nfds)
@@ -464,23 +439,14 @@ static void resize_kernel_poll(struct wakeup_poll *wakeup, nfds_t nfds)
if (wakeup->last_fds) {
free(wakeup->last_fds);
}
- wakeup->last_fds = calloc(nfds, sizeof(struct pollfd));
+ wakeup->last_fds = calloc(nfds + 1, sizeof(struct pollfd));
if (wakeup->last_fds == NULL) {
LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
}
- if (wakeup->events) {
- free(wakeup->events);
- }
- wakeup->events = calloc(nfds, sizeof(struct epoll_event));
- if (wakeup->events == NULL) {
- LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
- }
-
+ wakeup->last_fds[0].fd = wakeup->eventfd;
+ wakeup->last_fds[0].events = POLLIN;
wakeup->last_max_nfds = nfds;
- if (memset_s(wakeup->last_fds, nfds * sizeof(struct pollfd), 0, nfds * sizeof(struct pollfd)) != 0) {
- LSTACK_LOG(ERR, LSTACK, "memset_s faile\n");
- }
}
static void poll_bind_statck(struct wakeup_poll *wakeup, int32_t *stack_count)
@@ -492,35 +458,16 @@ static void poll_bind_statck(struct wakeup_poll *wakeup, int32_t *stack_count)
return;
}
- change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, stack_group->stacks[bind_id]);
+ if (wakeup->bind_stack) {
+ unregister_wakeup(wakeup->bind_stack, wakeup);
+ }
bind_to_stack_numa(stack_group->stacks[bind_id]);
wakeup->bind_stack = stack_group->stacks[bind_id];
-}
-
-static void update_kernel_poll(struct wakeup_poll *wakeup, uint32_t index, struct pollfd *new_fd)
-{
- posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_DEL, wakeup->last_fds[index].fd, NULL);
-
- if (new_fd == NULL) {
- return;
- }
-
- struct epoll_event event;
- event.data.u32 = index;
- event.events = new_fd->events;
- if (posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_ADD, new_fd->fd, &event) != 0) {
- LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
- }
+ register_wakeup(wakeup->bind_stack, wakeup);
}
static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfds)
{
- if (!wakeup->init) {
- wakeup->init = true;
- init_poll_wakeup_data(wakeup);
- register_wakeup(wakeup);
- }
-
int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
int32_t poll_change = 0;
@@ -529,31 +476,22 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
resize_kernel_poll(wakeup, nfds);
poll_change = 1;
}
- /* poll fds num less, del old fd */
- for (uint32_t i = nfds; i < wakeup->last_nfds; i++) {
- update_kernel_poll(wakeup, i, NULL);
- poll_change = 1;
- }
for (uint32_t i = 0; i < nfds; i++) {
int32_t fd = fds[i].fd;
fds[i].revents = 0;
struct lwip_sock *sock = get_socket_by_fd(fd);
- if (fd == wakeup->last_fds[i].fd && fds[i].events == wakeup->last_fds[i].events) {
+ if (fd == wakeup->last_fds[i + 1].fd && fds[i].events == wakeup->last_fds[i + 1].events) {
/* fd close then socket may get same fd. */
if (sock == NULL || sock->wakeup != NULL) {
continue;
}
}
- wakeup->last_fds[i].fd = fd;
- wakeup->last_fds[i].events = fds[i].events;
+ wakeup->last_fds[i + 1].fd = fd;
+ wakeup->last_fds[i + 1].events = fds[i].events;
poll_change = 1;
- if (sock == NULL || sock->conn == NULL || CONN_TYPE_HAS_HOST(sock->conn)) {
- update_kernel_poll(wakeup, i, fds + i);
- }
-
while (sock && sock->conn) {
if (sock->epoll_events != (fds[i].events | POLLERR)) {
sock->epoll_events = fds[i].events | POLLERR;
@@ -571,51 +509,65 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
if (poll_change == 0) {
return;
}
- wakeup->last_nfds = nfds;
+ wakeup->last_nfds = nfds + 1;
poll_bind_statck(wakeup, stack_count);
}
int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
{
- static PER_THREAD struct wakeup_poll wakeup_poll = {0};
+ static PER_THREAD struct wakeup_poll *wakeup = NULL;
+ if (wakeup == NULL) {
+ wakeup = calloc(1, sizeof(struct wakeup_poll));
+ if (wakeup == NULL) {
+ GAZELLE_RETURN(EINVAL);
+ }
- poll_init(&wakeup_poll, fds, nfds);
+ if (init_poll_wakeup_data(wakeup) < 0) {
+ free(wakeup);
+ GAZELLE_RETURN(EINVAL);
+ }
+ }
- int32_t event_num = 0;
- int32_t ret;
+ poll_init(wakeup, fds, nfds);
+
+ __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
+ rte_mb();
- struct timespec poll_time;
- if (timeout >= 0) {
- ms_to_timespec(&poll_time, timeout);
+ int32_t lwip_num = poll_lwip_event(fds, nfds);
+ wakeup->stat.app_events += lwip_num;
+ if (lwip_num >= nfds) {
+ __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
+ return lwip_num;
}
- /* when epfd > 0 is epoll type */
- do {
- event_num += poll_lwip_event(fds, nfds);
-
- /* reduce syscall epoll_wait */
- if (__atomic_load_n(&wakeup_poll.have_kernel_event, __ATOMIC_RELAXED)) {
- int32_t kernel_num = posix_api->epoll_wait_fn(wakeup_poll.epollfd, wakeup_poll.events, nfds, 0);
- for (int32_t i = 0; i < kernel_num; i++) {
- uint32_t index = wakeup_poll.events[i].data.u32;
- fds[index].revents = wakeup_poll.events[i].events;
- }
- event_num += kernel_num >= 0 ? kernel_num : 0;
- }
+ int32_t kernel_num = 0;
+ if (lwip_num > 0) {
+ __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
+ rte_mb();
+ kernel_num = posix_api->poll_fn(wakeup->last_fds, wakeup->last_nfds, 0);
+ } else {
+ kernel_num = posix_api->poll_fn(wakeup->last_fds, wakeup->last_nfds, timeout);
+ rte_mb();
+ __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
+ }
- if (event_num > 0) {
- while (sem_trywait(&wakeup_poll.event_sem) == 0);
- break;
+ if (kernel_num <= 0) {
+ return (lwip_num > 0) ? lwip_num : kernel_num;
+ }
+
+ for (nfds_t i = 0; i < nfds; i++) {
+ if (fds[i].revents == 0 && wakeup->last_fds[i + 1].revents != 0) {
+ fds[i].revents = wakeup->last_fds[i + 1].revents;
}
+ }
- wakeup_poll.have_kernel_event = false;
- if (timeout < 0) {
- ret = sem_wait(&wakeup_poll.event_sem);
- } else {
- ret = sem_timedwait(&wakeup_poll.event_sem, &poll_time);
+ if (wakeup->last_fds[0].revents) {
+ if (lwip_num == 0) {
+ lwip_num = poll_lwip_event(fds, nfds);
}
- } while (ret == 0);
+ kernel_num--;
+ }
- return event_num;
+ return kernel_num + lwip_num;
}
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 3f21a3a..bb5a7e5 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -309,7 +309,7 @@ static void do_lwip_send(int32_t fd, struct lwip_sock *sock, int32_t flags)
if (len == 0) {
/* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
sock->errevent = 1;
- add_epoll_event(sock->conn, EPOLLERR);
+ add_sock_event(sock, EPOLLERR);
}
if (gazelle_ring_readable_count(sock->send_ring) < SOCK_SEND_REPLENISH_THRES) {
@@ -317,7 +317,7 @@ static void do_lwip_send(int32_t fd, struct lwip_sock *sock, int32_t flags)
}
if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_OUTIDLE(sock)) {
- add_epoll_event(sock->conn, EPOLLOUT);
+ add_sock_event(sock, EPOLLOUT);
}
}
@@ -678,9 +678,9 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
if (len == 0) {
/* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
sock->errevent = 1;
- add_epoll_event(sock->conn, EPOLLERR);
+ add_sock_event(sock, EPOLLERR);
} else if (len > 0) {
- add_epoll_event(sock->conn, EPOLLIN);
+ add_sock_event(sock, EPOLLIN);
}
/* last_node:recv only once per sock. max_num avoid cost too much time this loop */
@@ -690,6 +690,23 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
}
}
+void gazelle_connected_callback(struct netconn *conn)
+{
+ if (conn == NULL) {
+ return;
+ }
+
+ int32_t fd = conn->socket;
+ struct lwip_sock *sock = get_socket_by_fd(fd);
+ if (sock == NULL || sock->conn == NULL) {
+ return;
+ }
+
+ SET_CONN_TYPE_LIBOS(conn);
+
+ add_sock_event(sock, EPOLLOUT);
+}
+
static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const struct tcp_pcb *pcb)
{
struct netconn *netconn = (struct netconn *)pcb->callback_arg;
@@ -711,10 +728,6 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s
conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0;
conn->send_ring_cnt = gazelle_ring_readover_count(sock->send_ring);
-
- if (sock->wakeup) {
- sem_getvalue(&sock->wakeup->event_sem, &conn->sem_cnt);
- }
}
}
}
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 1dc6c3f..6119975 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -262,6 +262,8 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
stack->cpu_id = get_global_cfg_params()->cpus[queue_id];
stack->lwip_stats = &lwip_stats;
+ pthread_spin_init(&stack->wakeup_list_lock, PTHREAD_PROCESS_PRIVATE);
+
init_list_node(&stack->recv_list);
init_list_node(&stack->send_list);
@@ -292,70 +294,6 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
return 0;
}
-static void* gazelle_kernel_event(void *arg)
-{
- uint16_t queue_id = *(uint16_t *)arg;
- struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
-
- bind_to_stack_numa(stack);
-
- int32_t epoll_fd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN);
- if (epoll_fd < 0) {
- LSTACK_LOG(ERR, LSTACK, "queue_id=%hu epoll_fd=%d errno=%d\n", queue_id, epoll_fd, errno);
- /* exit in main thread, avoid create mempool and exit at the same time */
- set_init_fail();
- stack->epollfd = -1;
- return NULL;
- }
-
- stack->epollfd = epoll_fd;
-
- LSTACK_LOG(INFO, LSTACK, "kernel_event_%02hu start\n", queue_id);
-
- struct epoll_event events[KERNEL_EPOLL_MAX];
- for (;;) {
- int32_t event_num = posix_api->epoll_wait_fn(epoll_fd, events, KERNEL_EPOLL_MAX, -1);
- if (event_num <= 0) {
- continue;
- }
-
- for (int32_t i = 0; i < event_num; i++) {
- struct wakeup_poll *wakeup = events[i].data.ptr;
- if (wakeup) {
- __atomic_store_n(&wakeup->have_kernel_event, true, __ATOMIC_RELEASE);
- sem_post(&wakeup->event_sem);
- }
- }
- }
-
- return NULL;
-}
-
-static int32_t create_companion_thread(struct protocol_stack_group *stack_group, struct protocol_stack *stack)
-{
- int32_t ret;
-
- ret = create_thread(stack->queue_id, "gazellekernel", gazelle_kernel_event);
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "gazellekernelEvent ret=%d errno=%d\n", ret, errno);
- return ret;
- }
-
- /* wait gazelle_kernel_event finish use stack.avoid use stack after free when create gazelle_weakup_thread fail */
- while (stack->epollfd == 0) {
- usleep(1);
- }
-
- if (stack_group->wakeup_enable) {
- ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_wakeup_thread);
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "gazelleweakup ret=%d errno=%d\n", ret, errno);
- }
- }
-
- return ret;
-}
-
void wait_sem_value(sem_t *sem, int32_t wait_value)
{
int32_t sem_val;
@@ -404,14 +342,29 @@ static struct protocol_stack *stack_thread_init(uint16_t queue_id)
return NULL;
}
- if (create_companion_thread(stack_group, stack) != 0) {
- free(stack);
- return NULL;
+ if (stack_group->wakeup_enable) {
+ if (create_thread(stack->queue_id, "gazelleweakup", gazelle_wakeup_thread) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "gazelleweakup errno=%d\n", errno);
+ free(stack);
+ return NULL;
+ }
}
return stack;
}
+static void wakeup_stack_wait(struct protocol_stack *stack)
+{
+ struct wakeup_poll *node = stack->wakeup_list;
+ while (node) {
+ if (node->have_event) {
+ wakeup_epoll(stack, node);
+ node->have_event = false;
+ }
+ node = node->next;
+ }
+}
+
static void* gazelle_stack_thread(void *arg)
{
uint16_t queue_id = *(uint16_t *)arg;
@@ -437,6 +390,8 @@ static void* gazelle_stack_thread(void *arg)
send_stack_list(stack, SEND_LIST_MAX);
+ wakeup_stack_wait(stack);
+
sys_timer_run();
if (get_global_cfg_params()->low_power_mod != 0) {
@@ -452,8 +407,6 @@ static int32_t init_protocol_sem(void)
int32_t ret;
struct protocol_stack_group *stack_group = get_protocol_stack_group();
- pthread_spin_init(&stack_group->wakeup_list_lock, PTHREAD_PROCESS_PRIVATE);
-
if (!use_ltran()) {
ret = sem_init(&stack_group->ethdev_init, 0, 0);
if (ret < 0) {
@@ -484,7 +437,6 @@ int32_t init_protocol_stack(void)
stack_group->stack_num = get_global_cfg_params()->num_cpu;
stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false;
- stack_group->wakeup_list = NULL;
if (init_protocol_sem() != 0) {
return -1;
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index e8c5bc3..245bcd7 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -92,25 +92,21 @@ static void set_latency_start_flag(bool start)
}
}
-void register_wakeup(struct wakeup_poll *wakeup)
+void register_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup)
{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
-
- pthread_spin_lock(&stack_group->wakeup_list_lock);
+ pthread_spin_lock(&stack->wakeup_list_lock);
- wakeup->next = stack_group->wakeup_list;
- stack_group->wakeup_list = wakeup;
+ wakeup->next = stack->wakeup_list;
+ stack->wakeup_list = wakeup;
- pthread_spin_unlock(&stack_group->wakeup_list_lock);
+ pthread_spin_unlock(&stack->wakeup_list_lock);
}
-void unregister_wakeup(struct wakeup_poll *wakeup)
+void unregister_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup)
{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ pthread_spin_lock(&stack->wakeup_list_lock);
- pthread_spin_lock(&stack_group->wakeup_list_lock);
-
- struct wakeup_poll *node = stack_group->wakeup_list;
+ struct wakeup_poll *node = stack->wakeup_list;
struct wakeup_poll *pre = NULL;
while (node && node != wakeup) {
@@ -119,26 +115,24 @@ void unregister_wakeup(struct wakeup_poll *wakeup)
}
if (node == NULL) {
- pthread_spin_unlock(&stack_group->wakeup_list_lock);
+ pthread_spin_unlock(&stack->wakeup_list_lock);
return;
}
if (pre) {
pre->next = node->next;
} else {
- stack_group->wakeup_list = node->next;
+ stack->wakeup_list = node->next;
}
- pthread_spin_unlock(&stack_group->wakeup_list_lock);
+ pthread_spin_unlock(&stack->wakeup_list_lock);
}
static void get_wakeup_stat(struct protocol_stack *stack, struct gazelle_wakeup_stat *stat)
{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
-
- pthread_spin_lock(&stack_group->wakeup_list_lock);
+ pthread_spin_lock(&stack->wakeup_list_lock);
- struct wakeup_poll *node = stack_group->wakeup_list;
+ struct wakeup_poll *node = stack->wakeup_list;
while (node) {
if (node->bind_stack == stack) {
stat->app_events += node->stat.app_events;
@@ -151,7 +145,7 @@ static void get_wakeup_stat(struct protocol_stack *stack, struct gazelle_wakeup_
node = node->next;
}
- pthread_spin_unlock(&stack_group->wakeup_list_lock);
+ pthread_spin_unlock(&stack->wakeup_list_lock);
}
void lstack_get_low_power_info(struct gazelle_stat_low_power_info *low_power_info)
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 36340ab..0a060b4 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -50,6 +50,8 @@ struct protocol_stack {
struct reg_ring_msg *reg_buf;
volatile bool low_power;
+ struct wakeup_poll *wakeup_list;
+ pthread_spinlock_t wakeup_list_lock;
lockless_queue rpc_queue __rte_cache_aligned;
char pad __rte_cache_aligned;
@@ -84,8 +86,6 @@ struct protocol_stack_group {
/* dfx stats */
bool latency_start;
uint64_t call_alloc_fail;
- pthread_spinlock_t wakeup_list_lock;
- struct wakeup_poll *wakeup_list __rte_cache_aligned;
};
long get_stack_tid(void);
diff --git a/src/lstack/include/lstack_stack_stat.h b/src/lstack/include/lstack_stack_stat.h
index aacade1..98ffe8f 100644
--- a/src/lstack/include/lstack_stack_stat.h
+++ b/src/lstack/include/lstack_stack_stat.h
@@ -17,6 +17,7 @@ struct gazelle_stack_latency;
struct pbuf;
struct gazelle_stat_low_power_info;
struct wakeup_poll;
+struct protocol_stack;
enum GAZELLE_LATENCY_TYPE;
enum GAZELLE_STAT_MODE;
@@ -26,7 +27,7 @@ void stack_stat_init(void);
int32_t handle_stack_cmd(int fd, enum GAZELLE_STAT_MODE stat_mode);
uint64_t get_current_time(void);
void lstack_get_low_power_info(struct gazelle_stat_low_power_info *low_power_info);
-void register_wakeup(struct wakeup_poll *wakeup);
-void unregister_wakeup(struct wakeup_poll *wakeup);
+void register_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup);
+void unregister_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup);
#endif /* GAZELLE_STACK_STAT_H */
diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h
index a94b49f..5799028 100644
--- a/src/lstack/include/posix/lstack_epoll.h
+++ b/src/lstack/include/posix/lstack_epoll.h
@@ -17,6 +17,7 @@
#include <stdbool.h>
#include <semaphore.h>
#include <pthread.h>
+#include <sys/eventfd.h>
#include <lwip/list.h>
@@ -35,24 +36,24 @@ enum wakeup_type {
struct protocol_stack;
struct wakeup_poll {
/* stack thread read frequently */
- sem_t event_sem __rte_cache_aligned;
- enum wakeup_type type __rte_cache_aligned;
- volatile bool have_kernel_event __rte_cache_aligned;
- struct gazelle_wakeup_stat stat __rte_cache_aligned;
+ int32_t eventfd;
+ enum wakeup_type type;
+ bool have_event;
+ volatile bool in_wait __rte_cache_aligned;
char pad __rte_cache_aligned;
- bool init;
+ struct gazelle_wakeup_stat stat;
struct protocol_stack *bind_stack;
- int32_t epollfd; /* epoll kernel fd, ctl add into gazelle_kernel_event thread */
struct wakeup_poll *next;
/* poll */
struct pollfd *last_fds;
nfds_t last_nfds;
nfds_t last_max_nfds;
- struct epoll_event *events;
/* epoll */
+ int32_t epollfd; /* epoll kernel fd */
+ bool have_kernel_fd;
int32_t stack_fd_cnt[PROTOCOL_STACK_MAX];
struct protocol_stack *max_stack;
struct list_node event_list;
@@ -60,7 +61,9 @@ struct wakeup_poll {
};
struct netconn;
-void add_epoll_event(struct netconn *conn, uint32_t event);
+struct lwip_sock;
+void add_sock_event(struct lwip_sock *sock, uint32_t event);
+void wakeup_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup);
int32_t lstack_epoll_create(int32_t size);
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event);
int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event *events, int32_t maxevents, int32_t timeout);
--
2.23.0