gazelle/0038-refactor-event.patch
jiangheng f2f0d2a6e5 refactor event
(cherry picked from commit 318eb1a6b93f850213a48422c1f0423715b6d41f)
2022-03-31 14:16:14 +08:00

3099 lines
107 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 9b4379914e97d4c0c267033559bf86d20c7381b6 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng12@huawei.com>
Date: Wed, 30 Mar 2022 00:04:46 +0800
Subject: [PATCH] refactor event
---
README.md | 3 +-
src/common/gazelle_dfx_msg.h | 18 +-
src/lstack/api/lstack_epoll.c | 509 +++++++++++++++--------------
src/lstack/api/lstack_signal.c | 1 +
src/lstack/core/lstack_cfg.c | 27 +-
src/lstack/core/lstack_dpdk.c | 99 ++++--
src/lstack/core/lstack_init.c | 13 +-
src/lstack/core/lstack_lwip.c | 352 +++++++++++---------
src/lstack/core/lstack_protocol_stack.c | 345 +++++++++----------
src/lstack/core/lstack_stack_stat.c | 6 -
src/lstack/core/lstack_thread_rpc.c | 106 +++---
src/lstack/include/lstack_cfg.h | 2 +-
src/lstack/include/lstack_dpdk.h | 7 +-
src/lstack/include/lstack_log.h | 9 +-
src/lstack/include/lstack_lwip.h | 11 +-
src/lstack/include/lstack_protocol_stack.h | 35 +-
src/lstack/include/lstack_thread_rpc.h | 5 +-
src/lstack/include/lstack_vdev.h | 2 +-
src/lstack/include/posix/lstack_epoll.h | 2 +
src/lstack/lstack.conf | 1 -
src/lstack/netif/lstack_ethdev.c | 25 +-
src/lstack/netif/lstack_vdev.c | 2 +-
src/ltran/ltran_dfx.c | 31 +-
src/ltran/ltran_opt.h | 6 +-
24 files changed, 822 insertions(+), 795 deletions(-)
diff --git a/README.md b/README.md
index e914b26..24079c7 100644
--- a/README.md
+++ b/README.md
@@ -236,7 +236,7 @@ Usage: gazellectl [-h | help]
- 提供的命令行、配置文件以及配置大页内存需要root权限执行或修改。非root用户使用需先提权以及修改文件权限。
- 若要把用户态网卡绑回内核驱动必须先将Gazelle退出。
- 不支持accept阻塞模式或者connect阻塞模式。
-- 最多只支持20000个链接需要保证进程内非网络连接的fd个数小于2000个
+- 最多只支持1500个连接。
- 协议栈当前只支持tcp、icmp、arp、ipv4。
- 大页内存不支持在挂载点里创建子目录重新挂载。
- 在对端ping时要求指定报文长度小于等于14000。
@@ -253,6 +253,7 @@ Usage: gazellectl [-h | help]
- 不使用ltran模式kni网口只支持本地通讯使用且需要启动前配置NetworkManager不管理kni网卡
- 虚拟kni网口的ip及mac地址需要与lstack配置文件保持一致
- gazelle运行过程中不允许删除运行文件如果删除需要重启gazelle
+- lstack配置的ip需要与应用程序的ip保持一致
## Security risk note
gazelle有如下安全风险用户需要评估使用场景风险
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index de669f5..6db67ee 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -65,11 +65,9 @@ struct gazelle_stat_pkts {
uint64_t rx_allocmbuf_fail;
uint64_t tx_allocmbuf_fail;
uint64_t call_msg_cnt;
- uint16_t weakup_ring_cnt;
uint16_t conn_num;
uint16_t send_idle_ring_cnt;
uint64_t event_list;
- uint64_t wakeup_list;
uint64_t read_lwip_drop;
uint64_t read_lwip_cnt;
uint64_t write_lwip_drop;
@@ -79,22 +77,13 @@ struct gazelle_stat_pkts {
uint64_t app_write_idlefail;
uint64_t app_write_drop;
uint64_t recv_list;
- uint64_t lwip_events;
- uint64_t weakup_events;
+ uint64_t wakeup_events;
uint64_t app_events;
uint64_t call_alloc_fail;
- uint64_t read_events;
- uint64_t write_events;
- uint64_t accept_events;
uint64_t read_null;
- uint64_t remove_event;
- uint64_t send_self_rpc;
uint64_t call_null;
uint64_t arp_copy_fail;
- uint64_t epoll_pending;
- uint64_t epoll_pending_call;
- uint64_t epoll_self_call;
- uint64_t epoll_self_event;
+ uint64_t send_self_rpc;
uint64_t send_list;
};
@@ -169,8 +158,7 @@ struct gazelle_stat_lstack_conn_info {
uint32_t send_ring_cnt;
uint32_t recv_ring_cnt;
uint32_t tcp_sub_state;
- uint32_t event_ring_cnt;
- uint32_t self_ring_cnt;
+ int32_t sem_cnt;
};
struct gazelle_stat_lstack_conn {
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index e54d496..b8d53f6 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -14,6 +14,7 @@
#include <securec.h>
#include <sys/epoll.h>
#include <time.h>
+#include <poll.h>
#include <lwip/lwipsock.h>
#include <lwip/sockets.h>
@@ -21,6 +22,7 @@
#include <lwip/api.h>
#include <lwip/tcp.h>
#include <lwip/timeouts.h>
+#include <lwip/posix_api.h>
#include "lstack_compiler.h"
#include "lstack_ethdev.h"
@@ -28,127 +30,103 @@
#include "lstack_cfg.h"
#include "lstack_log.h"
#include "gazelle_base_func.h"
-#include "lstack_weakup.h"
#include "lstack_lwip.h"
#include "lstack_protocol_stack.h"
-#define EPOLL_INTERVAL_10MS 10000000
+#define EPOLL_KERNEL_INTERVAL 10 /* ms */
+#define EPOLL_NSEC_TO_SEC 1000000000
+#define EPOLL_MAX_EVENTS 512
-static PER_THREAD struct weakup_poll g_weakup_poll = {0};
-
-enum POLL_TYPE {
- TYPE_POLL,
- TYPE_EPOLL,
-};
-
-static inline bool check_event_vaild(struct lwip_sock *sock, uint32_t event)
-{
- if ((event & EPOLLIN) && !NETCONN_IS_ACCEPTIN(sock) && !NETCONN_IS_DATAIN(sock)) {
- event &= ~EPOLLIN;
- }
-
- if ((event & EPOLLOUT) && !NETCONN_IS_DATAOUT(sock)) {
- event &= ~EPOLLOUT;
- }
-
- return (event) ? true : false;
-}
-
-static inline bool report_events(struct lwip_sock *sock, uint32_t event)
-{
- /* error event */
- if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
- return true;
- }
-
- if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) {
- return false;
- }
-
- return check_event_vaild(sock, event);
-}
+static PER_THREAD struct wakeup_poll g_wakeup_poll = {0};
+static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */
void add_epoll_event(struct netconn *conn, uint32_t event)
{
/* conn sock nerver null, because lwip call this func */
struct lwip_sock *sock = get_socket(conn->socket);
- /* close_wait event should be (EPOLLRDHUP | EPOLLIN), but lwip is EPOLLERR */
- if (event == EPOLLERR && conn->pcb.tcp && conn->pcb.tcp->state == CLOSE_WAIT) {
- event = EPOLLRDHUP | EPOLLIN | EPOLLERR;
- }
-
if ((event & sock->epoll_events) == 0) {
return;
}
+
sock->events |= event & sock->epoll_events;
- if (!sock->weakup || !report_events(sock, event)) {
- return;
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+ if (g_use_epoll && list_is_empty(&sock->event_list)) {
+ list_add_node(&sock->stack->event_list, &sock->event_list);
}
+#endif
- if (weakup_enqueue(sock->stack->weakup_ring, sock)) {
- if (list_is_empty(&sock->event_list)) {
- list_add_node(&sock->stack->event_list, &sock->event_list);
+ if (sock->wakeup) {
+ sock->stack->stats.wakeup_events++;
+ if (get_protocol_stack_group()->wakeup_enable) {
+ rte_ring_sp_enqueue(sock->stack->wakeup_ring, &sock->wakeup->event_sem);
+ } else {
+ sem_post(&sock->wakeup->event_sem);
}
- } else {
- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
- sock->stack->stats.weakup_events++;
}
}
-static void raise_pending_events(struct lwip_sock *sock)
+static inline uint32_t update_events(struct lwip_sock *sock)
{
- struct weakup_poll *wakeup = sock->weakup;
- struct protocol_stack *stack = sock->stack;
- struct netconn *conn = sock->conn;
- if (wakeup == NULL || stack == NULL || conn == NULL) {
- return;
+ uint32_t event = 0;
+
+ if (sock->epoll_events & EPOLLIN) {
+ if (sock->attach_fd > 0 && NETCONN_IS_ACCEPTIN(sock)) {
+ event |= EPOLLIN;
+ }
+
+ if (sock->attach_fd < 0 && NETCONN_IS_DATAIN(sock)) {
+ event |= EPOLLIN;
+ }
}
- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock;
- if (attach_sock == NULL) {
- return;
+ if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_OUTIDLE(sock)) {
+ event |= EPOLLOUT;
}
- conn = attach_sock->conn;
- if (conn == NULL) {
- return;
+ if ((sock->epoll_events & EPOLLERR) && (sock->events & EPOLLERR)) {
+ event |= EPOLLERR | EPOLLIN;
}
- struct tcp_pcb *tcp = conn->pcb.tcp;
- if ((tcp == NULL) || (tcp->state < ESTABLISHED)) {
+
+ return event;
+}
+
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+void update_stack_events(struct protocol_stack *stack)
+{
+ if (!g_use_epoll) {
return;
}
- uint32_t event = 0;
- if (sock->epoll_events & EPOLLIN) {
- if (attach_sock->recv_lastdata || rte_ring_count(attach_sock->recv_ring) || NETCONN_IS_ACCEPTIN(attach_sock)) {
- event |= EPOLLIN;
- }
- }
+ struct list_node *node, *temp;
+ list_for_each_safe(node, temp, &stack->event_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
- if (sock->epoll_events & EPOLLOUT) {
- if ((attach_sock->sendevent > 0) ||
- ((tcp_sndbuf(conn->pcb.tcp) > TCP_SNDLOWAT) && (tcp_sndqueuelen(conn->pcb.tcp) < TCP_SNDQUEUELOWAT))) {
- event |= EPOLLOUT;
+ sock->events = update_events(sock);
+ if (sock->events != 0) {
+ continue;
}
- }
- if (attach_sock->errevent > 0) {
- event |= POLLERR | POLLIN;
+ if (pthread_spin_trylock(&stack->event_lock)) {
+ continue;
+ }
+ list_del_node_init(&sock->event_list);
+ pthread_spin_unlock(&stack->event_lock);
}
+}
+#endif
- if (event == 0) {
+static void raise_pending_events(struct lwip_sock *sock)
+{
+ struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock;
+ if (attach_sock == NULL) {
return;
}
- attach_sock->events |= event;
- if (rte_ring_mp_enqueue(wakeup->event_ring, (void *)sock) == 0 ||
- rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
- sem_post(&wakeup->event_sem);
- stack->stats.epoll_pending++;
- } else {
- rpc_call_addevent(stack, attach_sock);
- stack->stats.epoll_pending_call++;
+
+ attach_sock->events = update_events(attach_sock);
+ if (attach_sock->events & attach_sock->epoll_events) {
+ rpc_call_addevent(attach_sock->stack, attach_sock);
}
}
@@ -166,34 +144,35 @@ int32_t lstack_epoll_create(int32_t size)
GAZELLE_RETURN(EINVAL);
}
- struct weakup_poll *weakup = malloc(sizeof(struct weakup_poll));
- if (weakup == NULL) {
+ struct wakeup_poll *wakeup = malloc(sizeof(struct wakeup_poll));
+ if (wakeup == NULL) {
posix_api->close_fn(fd);
GAZELLE_RETURN(EINVAL);
}
- memset_s(weakup, sizeof(struct weakup_poll), 0, sizeof(struct weakup_poll));
- sem_init(&weakup->event_sem, 0, 0);
-
- weakup->event_ring = create_ring("RING_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd);
- if (weakup->event_ring == NULL) {
- posix_api->close_fn(fd);
- GAZELLE_RETURN(ENOMEM);
- }
-
- weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd);
- if (weakup->self_ring == NULL) {
- posix_api->close_fn(fd);
- GAZELLE_RETURN(ENOMEM);
- }
+ memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll));
+ sem_init(&wakeup->event_sem, 0, 0);
- sock->weakup = weakup;
+ sock->wakeup = wakeup;
+ init_list_node(&wakeup->event_list);
+ g_use_epoll = true;
return fd;
}
int32_t lstack_epoll_close(int32_t fd)
{
+ struct lwip_sock *sock = get_socket_by_fd(fd);
+ if (sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno);
+ GAZELLE_RETURN(EINVAL);
+ }
+
+ if (sock->wakeup) {
+ free(sock->wakeup);
+ }
+ sock->wakeup = NULL;
+
return 0;
}
@@ -219,7 +198,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
}
struct lwip_sock *epoll_sock = get_socket_by_fd(epfd);
- if (epoll_sock == NULL || epoll_sock->weakup == NULL) {
+ if (epoll_sock == NULL || epoll_sock->wakeup == NULL) {
LSTACK_LOG(ERR, LSTACK, "epfd=%d\n", fd);
GAZELLE_RETURN(EINVAL);
}
@@ -228,7 +207,10 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
do {
switch (op) {
case EPOLL_CTL_ADD:
- sock->weakup = epoll_sock->weakup;
+ sock->wakeup = epoll_sock->wakeup;
+ if (list_is_empty(&sock->event_list)) {
+ list_add_node(&sock->wakeup->event_list, &sock->event_list);
+ }
/* fall through */
case EPOLL_CTL_MOD:
sock->epoll_events = events;
@@ -238,6 +220,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
}
break;
case EPOLL_CTL_DEL:
+ list_del_node_init(&sock->event_list);
sock->epoll_events = 0;
break;
default:
@@ -250,176 +233,234 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
return 0;
}
-static inline int32_t save_poll_event(struct pollfd *fds, uint32_t maxevents, int32_t fd, uint32_t events)
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents)
{
int32_t event_num = 0;
- for (uint32_t i = 0; i < maxevents; i++) {
- /* fds[i].revents != 0, the events is kernel events */
- if (fds[i].fd == fd && fds[i].revents == 0) {
- fds[i].revents = events;
- event_num = 1;
- break;
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
+ maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents);
+ for (uint32_t i = 0; i < stack_group->stack_num && event_num < maxevents; i++) {
+ struct protocol_stack *stack = stack_group->stacks[i];
+ int32_t start_event_num = event_num;
+
+ if (pthread_spin_trylock(&stack->event_lock)) {
+ continue;
+ }
+
+ struct list_node *node, *temp;
+ list_for_each_safe(node, temp, &stack->event_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
+
+ uint32_t event = sock->events & sock->epoll_events;
+ if (event == 0 || sock->wait_close) {
+ continue;
+ }
+
+ events[event_num].events = event;
+ events[event_num].data = sock->ep_data;
+ event_num++;
+
+ if (event_num >= maxevents) {
+ break;
+ }
}
+
+ pthread_spin_unlock(&stack->event_lock);
+
+ __sync_fetch_and_add(&stack->stats.app_events, event_num - start_event_num);
}
return event_num;
}
-
-static bool remove_event(enum POLL_TYPE etype, struct lwip_sock **sock_list, int32_t event_num, struct lwip_sock *sock,
- struct lwip_sock *attach_sock)
+#else
+static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents)
{
- /* remove duplicate event */
- for (uint32_t i = 0; i < event_num && etype == TYPE_EPOLL; i++) {
- if (sock_list[i] == sock) {
- return true;
+ int32_t event_num = 0;
+ struct list_node *node, *temp;
+ list_for_each_safe(node, temp, &wakeup->event_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
+ if (sock->conn == NULL) {
+ list_del_node_init(&sock->event_list);
+ continue;
}
+
+ struct lwip_sock *temp_sock = sock;
+ do {
+ struct lwip_sock *attach_sock = (temp_sock->attach_fd > 0) ? get_socket(temp_sock->attach_fd) : temp_sock;
+ if (attach_sock == NULL || temp_sock->wait_close) {
+ temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL;
+ continue;
+ }
+
+ uint32_t event = update_events(attach_sock);
+ if (event != 0) {
+ events[event_num].events = event;
+ events[event_num].data = temp_sock->ep_data;
+ event_num++;
+ if (event_num >= maxevents) {
+ break;
+ }
+ }
+
+ temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL;
+ } while (temp_sock);
}
- return !check_event_vaild(attach_sock, attach_sock->events);
+ return event_num;
}
+#endif
-static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t maxevents, enum POLL_TYPE etype)
+static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds)
{
- struct epoll_event *events = (struct epoll_event *)out;
- struct pollfd *fds = (struct pollfd *)out;
-
- if (etype == TYPE_EPOLL) {
- maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents);
- }
int32_t event_num = 0;
- struct lwip_sock *sock = NULL;
- while (event_num < maxevents) {
- if (rte_ring_sc_dequeue(weakup->self_ring, (void **)&sock) &&
- rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock)) {
- break;
- }
- __atomic_store_n(&sock->have_event, false, __ATOMIC_RELEASE);
+ for (uint32_t i = 0; i < nfds; i++) {
+ /* listenfd nextfd pointerto next stack listen, others nextfd=-1 */
+ int32_t fd = fds[i].fd;
+ while (fd > 0) {
+ struct lwip_sock *sock = get_socket(fd);
+ if (sock == NULL) {
+ break;
+ }
- /* sock->stack == NULL mean close sock */
- if (sock->stack == NULL) {
- continue;
- }
+ /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */
+ struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock;
+ if (attach_sock == NULL || sock->wait_close) {
+ fd = sock->nextfd;
+ continue;
+ }
- /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */
- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock;
- if (attach_sock == NULL) {
- continue;
- }
+ uint32_t events = update_events(attach_sock);
+ if (events) {
+ fds[i].revents = events;
+ __sync_fetch_and_add(&sock->stack->stats.app_events, 1);
+ event_num++;
+ break;
+ }
- if (remove_event(etype, weakup->sock_list, event_num, sock, attach_sock)) {
- sock->stack->stats.remove_event++;
- continue;
+ fd = sock->nextfd;
}
+ }
- if (etype == TYPE_EPOLL) {
- events[event_num].events = attach_sock->events;
- events[event_num].data = sock->ep_data;
- weakup->sock_list[event_num] = sock;
- event_num++;
- } else {
- /* shadow_fd event notice listen_fd */
- if (attach_sock->shadowed_sock) {
- attach_sock = attach_sock->shadowed_sock;
- }
+ return event_num;
+}
- if (sock->conn) {
- event_num += save_poll_event(fds, maxevents, sock->conn->socket, attach_sock->events);
- }
+static inline bool have_kernel_fd(int32_t epfd, struct pollfd *fds, nfds_t nfds)
+{
+ /* when epfd > 0 is epoll type */
+ for (uint32_t i = 0; i < nfds && epfd < 0; i++) {
+ if (get_socket(fds[i].fd) == NULL) {
+ return true;
}
-
- sock->stack->stats.app_events++;
- sem_trywait(&weakup->event_sem); /* each event post sem, so every read down sem */
}
- return event_num;
+ return false;
}
-static inline int32_t remove_kernel_invaild_events(struct pollfd *fds, int32_t nfds, int32_t event_count)
+static inline int32_t poll_kernel_event(struct pollfd *fds, nfds_t nfds)
{
- int32_t real_count = 0;
+ int32_t event_num = 0;
- for (int i = 0; i < nfds && real_count < event_count; i++) {
- if (fds[i].fd < 0 || fds[i].revents == 0) {
+ for (uint32_t i = 0; i < nfds; i++) {
+ /* lwip event */
+ if (get_socket(fds[i].fd) != NULL || fds[i].fd < 0) {
continue;
}
- struct lwip_sock *sock = get_socket(fds[i].fd);
- if (sock && CONN_TYPE_IS_LIBOS(sock->conn)) {
- fds[i].revents = 0;
+ int32_t ret = posix_api->poll_fn(&fds[i], 1, 0);
+ if (ret < 0) {
+ if (errno != EINTR) {
+ return ret;
+ }
} else {
- real_count++;
+ event_num += ret;
}
}
- return real_count;
+ return event_num;
}
-static int32_t poll_event(struct weakup_poll *weakup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout)
+static int32_t get_event(struct wakeup_poll *wakeup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout)
{
- struct epoll_event *events = (struct epoll_event *)out;
struct pollfd *fds = (struct pollfd *)out;
+ struct epoll_event *events = (struct epoll_event *)out;
+ bool have_kernel = have_kernel_fd(epfd, fds, maxevents);
int32_t event_num = 0;
- int32_t event_kernel_num = 0;
- struct timespec epoll_interval = {
- .tv_sec = 0,
- .tv_nsec = EPOLL_INTERVAL_10MS,
- };
- uint32_t start_time = sys_now();
+ int32_t poll_time = 0;
+ int32_t ret;
+ /* when epfd > 0 is epoll type */
do {
- /* epoll_wait type */
- if (epfd > 0) {
- event_num += get_lwip_events(weakup, &events[event_num], maxevents - event_num, TYPE_EPOLL);
- if (event_num >= maxevents) {
- break;
- }
+ event_num += (epfd > 0) ? epoll_lwip_event(wakeup, &events[event_num], maxevents - event_num) :
+ poll_lwip_event(fds, maxevents);
- event_kernel_num = posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0);
+ if (have_kernel) {
+ int32_t event_kernel_num = (epfd > 0) ?
+ posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0) :
+ poll_kernel_event(fds, maxevents);
if (event_kernel_num < 0) {
- break;
+ return event_kernel_num;
}
event_num += event_kernel_num;
- } else {
- /* for poll events, we need to distiguish kernel events and gazelle events */
- event_kernel_num = posix_api->poll_fn(fds, maxevents, 0);
- if (event_kernel_num < 0) {
+ if (timeout >= 0 && poll_time >= timeout) {
break;
}
- event_kernel_num = remove_kernel_invaild_events(fds, maxevents, event_kernel_num);
- event_num += event_kernel_num;
-
- event_num += get_lwip_events(weakup, fds, maxevents, TYPE_POLL);
+ poll_time += EPOLL_KERNEL_INTERVAL;
}
if (event_num > 0) {
break;
}
- sem_timedwait(&weakup->event_sem, &epoll_interval);
- if (timeout > 0) {
- timeout = update_timeout(timeout, start_time);
+ int32_t interval = (have_kernel) ? EPOLL_KERNEL_INTERVAL : timeout;
+ struct timespec epoll_interval;
+ clock_gettime(CLOCK_REALTIME, &epoll_interval);
+ epoll_interval.tv_sec += interval / 1000;
+ epoll_interval.tv_nsec += (interval % 1000) * 1000000;
+ epoll_interval.tv_sec += epoll_interval.tv_nsec / 1000000000;
+ epoll_interval.tv_nsec = epoll_interval.tv_nsec % 1000000000;
+
+ if (timeout < 0 && !have_kernel) {
+ ret = sem_wait(&wakeup->event_sem);
+ } else {
+ ret = sem_timedwait(&wakeup->event_sem, &epoll_interval);
+ }
+
+ if (!have_kernel && ret < 0) {
+ break;
}
- } while (timeout != 0);
+ } while (event_num <= maxevents);
- return (event_kernel_num < 0) ? event_kernel_num : event_num;
+ return event_num;
}
-static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *weakup)
+int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
{
- int32_t stack_id = 0;
- int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
+ /* avoid the starvation of epoll events from both netstack */
+ maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents);
- if (weakup->event_ring == NULL) {
- weakup->event_ring = create_ring("POLL_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid());
- if (weakup->event_ring == NULL) {
- GAZELLE_RETURN(ENOMEM);
- }
+ struct lwip_sock *sock = get_socket_by_fd(epfd);
+ if (sock == NULL) {
+ GAZELLE_RETURN(EINVAL);
+ }
- weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid());
- if (weakup->self_ring == NULL) {
- GAZELLE_RETURN(ENOMEM);
- }
+ if (sock->wakeup == NULL) {
+ return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
+ }
+
+ return get_event(sock->wakeup, epfd, events, maxevents, timeout);
+}
+
+static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeup)
+{
+ int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
+
+ if (!wakeup->init) {
+ wakeup->init = true;
+ sem_init(&wakeup->event_sem, 0, 0);
+ } else {
+ while (sem_trywait(&wakeup->event_sem) == 0) {}
}
for (uint32_t i = 0; i < nfds; i++) {
@@ -432,51 +473,33 @@ static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *we
break;
}
sock->epoll_events = fds[i].events | POLLERR;
- sock->weakup = weakup;
-
- raise_pending_events(sock);
-
- stack_count[sock->stack->queue_id]++;
+ sock->wakeup = wakeup;
/* listenfd list */
fd = sock->nextfd;
+ stack_count[sock->stack->queue_id]++;
} while (fd > 0);
}
- for (uint32_t i = 0; i < get_protocol_stack_group()->stack_num; i++) {
- if (stack_count[i] > stack_count[stack_id]) {
- stack_id = i;
- }
- }
-
- bind_to_stack_numa(stack_id);
-
- return 0;
-}
-
-int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
-{
- /* avoid the starvation of epoll events from both netstack */
- maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents);
-
- struct lwip_sock *sock = get_socket_by_fd(epfd);
- if (sock == NULL) {
- GAZELLE_RETURN(EINVAL);
+ if (wakeup->bind_stack) {
+ return;
}
- if (sock->weakup == NULL) {
- return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint32_t bind_id = 0;
+ for (uint32_t i = 0; i < stack_group->stack_num; i++) {
+ if (stack_count[i] > stack_count[bind_id]) {
+ bind_id = i;
+ }
}
- return poll_event(sock->weakup, epfd, events, maxevents, timeout);
+ bind_to_stack_numa(stack_group->stacks[bind_id]);
+ wakeup->bind_stack = stack_group->stacks[bind_id];
}
int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
{
- int32_t ret = poll_init(fds, nfds, &g_weakup_poll);
- if (ret != 0) {
- return -1;
- }
+ poll_init(fds, nfds, &g_wakeup_poll);
- return poll_event(&g_weakup_poll, -1, fds, nfds, timeout);
+ return get_event(&g_wakeup_poll, -1, fds, nfds, timeout);
}
diff --git a/src/lstack/api/lstack_signal.c b/src/lstack/api/lstack_signal.c
index 5e4af56..f4763e8 100644
--- a/src/lstack/api/lstack_signal.c
+++ b/src/lstack/api/lstack_signal.c
@@ -16,6 +16,7 @@
#include <execinfo.h>
#include <unistd.h>
#include <lwip/lwipsock.h>
+#include <lwip/posix_api.h>
#include "lstack_log.h"
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 53712a8..13086a3 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -33,14 +33,14 @@
#include "gazelle_reg_msg.h"
#include "lstack_log.h"
#include "gazelle_base_func.h"
-#include "gazelle_parse_config.h"
#include "lstack_protocol_stack.h"
+#include "gazelle_parse_config.h"
#define DEFAULT_CONF_FILE "/etc/gazelle/lstack.conf"
#define LSTACK_CONF_ENV "LSTACK_CONF_PATH"
#define NUMA_CPULIST_PATH "/sys/devices/system/node/node%u/cpulist"
#define DEV_MAC_LEN 17
-#define CPUS_RANGE_NUM 32
+#define CPUS_MAX_NUM 256
static struct cfg_params g_config_params;
@@ -50,7 +50,7 @@ static int32_t parse_host_addr(void);
static int32_t parse_low_power_mode(void);
static int32_t parse_stack_cpu_number(void);
static int32_t parse_use_ltran(void);
-static int32_t parse_weakup_cpu_number(void);
+static int32_t parse_wakeup_cpu_number(void);
static int32_t parse_mask_addr(void);
static int32_t parse_devices(void);
static int32_t parse_dpdk_args(void);
@@ -70,7 +70,7 @@ static struct config_vector_t g_config_tbl[] = {
{ "devices", parse_devices },
{ "dpdk_args", parse_dpdk_args },
{ "num_cpus", parse_stack_cpu_number },
- { "num_wakeup", parse_weakup_cpu_number },
+ { "num_wakeup", parse_wakeup_cpu_number },
{ "low_power_mode", parse_low_power_mode },
{ "kni_switch", parse_kni_switch },
{ NULL, NULL }
@@ -240,7 +240,6 @@ static int32_t parse_stack_cpu_number(void)
}
g_config_params.num_cpu = cnt;
- get_protocol_stack_group()->stack_num = g_config_params.num_cpu;
return 0;
}
@@ -275,10 +274,10 @@ static int32_t numa_to_cpusnum(unsigned socket_id, uint32_t *cpulist, int32_t nu
static int32_t stack_idle_cpuset(struct protocol_stack *stack, cpu_set_t *exclude)
{
- uint32_t cpulist[CPUS_RANGE_NUM];
+ uint32_t cpulist[CPUS_MAX_NUM];
- int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_RANGE_NUM);
- if (cpunum <= 0 ) {
+ int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_MAX_NUM);
+ if (cpunum <= 0) {
LSTACK_LOG(ERR, LSTACK, "numa_to_cpusnum failed\n");
return -1;
}
@@ -308,7 +307,7 @@ int32_t init_stack_numa_cpuset(void)
CPU_SET(cfg->cpus[idx], &stack_cpuset);
}
for (int32_t idx = 0; idx < cfg->num_wakeup; ++idx) {
- CPU_SET(cfg->weakup[idx], &stack_cpuset);
+ CPU_SET(cfg->wakeup[idx], &stack_cpuset);
}
for (int32_t idx = 0; idx < stack_group->stack_num; ++idx) {
@@ -621,7 +620,7 @@ static int32_t parse_low_power_mode(void)
return 0;
}
-static int32_t parse_weakup_cpu_number(void)
+static int32_t parse_wakeup_cpu_number(void)
{
const config_setting_t *cfg_args = NULL;
const char *args = NULL;
@@ -639,13 +638,19 @@ static int32_t parse_weakup_cpu_number(void)
}
char *tmp_arg = strdup(args);
- int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.weakup, CFG_MAX_CPUS);
+ int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.wakeup, CFG_MAX_CPUS);
free(tmp_arg);
if (cnt <= 0 || cnt > CFG_MAX_CPUS) {
return -EINVAL;
}
g_config_params.num_wakeup = cnt;
+ if (g_config_params.num_wakeup < g_config_params.num_cpu) {
+ LSTACK_PRE_LOG(LSTACK_ERR, "num_wakeup=%d less than num_stack_cpu=%d.\n", g_config_params.num_wakeup,
+ g_config_params.num_cpu);
+ return -EINVAL;
+ }
+
return 0;
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 430c6e5..3f446ea 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -28,6 +28,7 @@
#include <rte_errno.h>
#include <rte_kni.h>
#include <lwip/posix_api.h>
+#include <lwipopts.h>
#include "lstack_log.h"
#include "dpdk_common.h"
@@ -109,35 +110,39 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_
char pool_name[PATH_MAX];
struct rte_mempool *pool;
- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id);
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
if (ret < 0) {
return NULL;
}
/* time stamp before pbuf_custom as priv_data */
+ pthread_mutex_lock(get_mem_mutex());
pool = rte_pktmbuf_pool_create(pool_name, nb_mbuf, mbuf_cache_size,
sizeof(struct pbuf_custom) + GAZELLE_MBUFF_PRIV_SIZE, MBUF_SZ, rte_socket_id());
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
}
+ pthread_mutex_unlock(get_mem_mutex());
return pool;
}
-static struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id)
+struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id)
{
char pool_name[PATH_MAX];
struct rte_mempool *pool;
int32_t ret;
- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id);
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
if (ret < 0) {
return NULL;
}
+ pthread_mutex_lock(get_mem_mutex());
pool = rte_mempool_create(pool_name, CALL_POOL_SZ, sizeof(struct rpc_msg), 0, 0, NULL, NULL, NULL,
NULL, rte_socket_id(), 0);
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
}
+ pthread_mutex_unlock(get_mem_mutex());
return pool;
}
@@ -147,7 +152,7 @@ static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_
char pool_name[PATH_MAX];
struct reg_ring_msg *reg_buf;
- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id);
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
if (ret < 0) {
return NULL;
}
@@ -167,21 +172,18 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num)
return -1;
}
- stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, 0, stack->queue_id);
+ stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, RX_MBUF_CACHE_SZ,
+ stack->queue_id);
if (stack->rx_pktmbuf_pool == NULL) {
return -1;
}
- stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, 0, stack->queue_id);
+ stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, TX_MBUF_CACHE_SZ,
+ stack->queue_id);
if (stack->tx_pktmbuf_pool == NULL) {
return -1;
}
- stack->rpc_pool = create_rpc_mempool("rpc_msg", stack->queue_id);
- if (stack->rpc_pool == NULL) {
- return -1;
- }
-
if (use_ltran()) {
stack->reg_buf = create_reg_mempool("reg_ring_msg", stack->queue_id);
if (stack->reg_buf == NULL) {
@@ -214,16 +216,12 @@ int32_t create_shared_ring(struct protocol_stack *stack)
{
lockless_queue_init(&stack->rpc_queue);
- stack->weakup_ring = create_ring("SHARED_WEAKUP_RING", VDEV_WEAKUP_QUEUE_SZ, 0, stack->queue_id);
- if (stack->weakup_ring == NULL) {
- return -1;
- }
-
- stack->send_idle_ring = create_ring("SEND_IDLE_RING", VDEV_IDLE_QUEUE_SZ, 0, stack->queue_id);
- if (stack->send_idle_ring == NULL) {
- return -1;
+ if (get_protocol_stack_group()->wakeup_enable) {
+ stack->wakeup_ring = create_ring("WAKEUP_RING", VDEV_WAKEUP_QUEUE_SZ, 0, stack->queue_id);
+ if (stack->wakeup_ring == NULL) {
+ return -1;
+ }
}
- stack->in_replenish = 0;
if (use_ltran()) {
stack->rx_ring = create_ring("RING_RX", VDEV_RX_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ, stack->queue_id);
@@ -328,8 +326,19 @@ static struct eth_params *alloc_eth_params(uint16_t port_id, uint16_t nb_queues)
return eth_params;
}
+uint64_t get_eth_params_rx_ol(void)
+{
+ return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.rxmode.offloads;
+}
+
+uint64_t get_eth_params_tx_ol(void)
+{
+ return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.txmode.offloads;
+}
+
static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev_info)
{
+#if CHECKSUM_OFFLOAD_ALL
uint64_t rx_ol = 0;
uint64_t tx_ol = 0;
@@ -337,43 +346,48 @@ static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_inf
uint64_t tx_ol_capa = dev_info->tx_offload_capa;
// rx ip
- if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) {
#if CHECKSUM_CHECK_IP_HW
+ if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) {
rx_ol |= DEV_RX_OFFLOAD_IPV4_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_IPV4_CKSUM\n");
-#endif
}
+#endif
// rx tcp
- if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) {
#if CHECKSUM_CHECK_TCP_HW
+ if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) {
rx_ol |= DEV_RX_OFFLOAD_TCP_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_TCP_CKSUM\n");
-#endif
}
+#endif
// tx ip
- if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) {
#if CHECKSUM_GEN_IP_HW
+ if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) {
tx_ol |= DEV_TX_OFFLOAD_IPV4_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_IPV4_CKSUM\n");
-#endif
}
+#endif
// tx tcp
- if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
#if CHECKSUM_GEN_TCP_HW
+ if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
tx_ol |= DEV_TX_OFFLOAD_TCP_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_TCP_CKSUM\n");
+ }
#endif
+ if (!(rx_ol & DEV_RX_OFFLOAD_TCP_CKSUM) || !(rx_ol & DEV_RX_OFFLOAD_IPV4_CKSUM)) {
+ rx_ol = 0;
+ }
+ if (!(tx_ol & DEV_TX_OFFLOAD_TCP_CKSUM) || !(tx_ol & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
+ tx_ol = 0;
}
conf->rxmode.offloads = rx_ol;
conf->txmode.offloads = tx_ol;
-#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW || CHECKSUM_GEN_IP_HW || CHECKSUM_GEN_TCP_HW
LSTACK_LOG(INFO, LSTACK, "set checksum offloads\n");
-#endif
+#endif /* CHECKSUM_OFFLOAD_ALL */
return 0;
}
@@ -580,3 +594,30 @@ void dpdk_skip_nic_init(void)
}
}
+int32_t init_dpdk_ethdev(void)
+{
+ int32_t ret;
+
+ ret = dpdk_ethdev_init();
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n");
+ return -1;
+ }
+
+ ret = dpdk_ethdev_start();
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
+ return -1;
+ }
+
+ if (get_global_cfg_params()->kni_switch) {
+ ret = dpdk_init_lstack_kni();
+ if (ret < 0) {
+ return -1;
+ }
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ sem_post(&stack_group->ethdev_init);
+ return 0;
+}
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
index 17195c8..774d0f3 100644
--- a/src/lstack/core/lstack_init.c
+++ b/src/lstack/core/lstack_init.c
@@ -30,6 +30,7 @@
#include <lwip/tcpip.h>
#include <lwip/memp_def.h>
#include <lwip/lwipopts.h>
+#include <lwip/posix_api.h>
#include "lstack_cfg.h"
#include "lstack_control_plane.h"
@@ -225,16 +226,18 @@ __attribute__((constructor)) void gazelle_network_init(void)
lstack_log_level_init();
- /*
- * Phase 8: memory and nic */
ret = init_protocol_stack();
if (ret != 0) {
LSTACK_EXIT(1, "init_protocol_stack failed\n");
}
- ret = create_stack_thread();
- if (ret != 0) {
- LSTACK_EXIT(1, "create_stack_thread failed\n");
+ /*
+ * Phase 8: nic */
+ if (!use_ltran()) {
+ ret = init_dpdk_ethdev();
+ if (ret != 0) {
+ LSTACK_EXIT(1, "init_dpdk_ethdev failed\n");
+ }
}
/*
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index b4d75d2..887464d 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -17,6 +17,7 @@
#include <arch/sys_arch.h>
#include <lwip/pbuf.h>
#include <lwip/priv/tcp_priv.h>
+#include <lwip/posix_api.h>
#include <securec.h>
#include <rte_errno.h>
#include <rte_malloc.h>
@@ -25,7 +26,6 @@
#include "lstack_ethdev.h"
#include "lstack_protocol_stack.h"
#include "lstack_log.h"
-#include "lstack_weakup.h"
#include "lstack_dpdk.h"
#include "lstack_stack_stat.h"
#include "lstack_lwip.h"
@@ -49,37 +49,82 @@ void listen_list_add_node(int32_t head_fd, int32_t add_fd)
sock->nextfd = add_fd;
}
+static void free_ring_pbuf(struct rte_ring *ring)
+{
+ while (1) {
+ struct pbuf *pbuf = NULL;
+ int32_t ret = rte_ring_sc_dequeue(ring, (void **)&pbuf);
+ if (ret != 0) {
+ break;
+ }
+
+ pbuf_free(pbuf);
+ }
+}
+
static void reset_sock_data(struct lwip_sock *sock)
{
/* check null pointer in ring_free func */
if (sock->recv_ring) {
+ free_ring_pbuf(sock->recv_ring);
rte_ring_free(sock->recv_ring);
}
sock->recv_ring = NULL;
+ if (sock->recv_wait_free) {
+ free_ring_pbuf(sock->recv_wait_free);
+ rte_ring_free(sock->recv_wait_free);
+ }
+ sock->recv_wait_free = NULL;
+
if (sock->send_ring) {
+ free_ring_pbuf(sock->send_ring);
rte_ring_free(sock->send_ring);
}
sock->send_ring = NULL;
+ if (sock->send_idle_ring) {
+ free_ring_pbuf(sock->send_idle_ring);
+ rte_ring_free(sock->send_idle_ring);
+ }
+ sock->send_idle_ring = NULL;
+
sock->stack = NULL;
- sock->weakup = NULL;
+ sock->wakeup = NULL;
sock->events = 0;
sock->nextfd = -1;
sock->attach_fd = -1;
sock->wait_close = false;
- sock->have_event = false;
- sock->have_rpc_send = false;
sock->shadowed_sock = NULL;
+ sock->epoll_events = 0;
+ sock->events = 0;
if (sock->recv_lastdata) {
pbuf_free(sock->recv_lastdata);
- sock->recv_lastdata = NULL;
}
+ sock->recv_lastdata = NULL;
if (sock->send_lastdata) {
pbuf_free(sock->send_lastdata);
- sock->send_lastdata = NULL;
+ }
+ sock->send_lastdata = NULL;
+}
+
+static void replenish_send_idlembuf(struct rte_ring *ring)
+{
+ uint32_t replenish_cnt = rte_ring_free_count(ring);
+
+ for (uint32_t i = 0; i < replenish_cnt; i++) {
+ struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM);
+ if (pbuf == NULL) {
+ break;
+ }
+
+ int32_t ret = rte_ring_sp_enqueue(ring, (void *)pbuf);
+ if (ret < 0) {
+ pbuf_free(pbuf);
+ break;
+ }
}
}
@@ -99,19 +144,31 @@ void gazelle_init_sock(int32_t fd)
return;
}
+ sock->recv_wait_free = create_ring("wait_free", SOCK_RECV_RING_SIZE, 0, name_tick++);
+ if (sock->recv_wait_free == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "wait_free create failed. errno: %d.\n", rte_errno);
+ return;
+ }
+
sock->send_ring = create_ring("sock_send", SOCK_SEND_RING_SIZE, 0, name_tick++);
if (sock->send_ring == NULL) {
LSTACK_LOG(ERR, LSTACK, "sock_send create failed. errno: %d.\n", rte_errno);
return;
}
+ sock->send_idle_ring = create_ring("idle_send", SOCK_SEND_RING_SIZE, 0, name_tick++);
+ if (sock->send_idle_ring == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "idle_send create failed. errno: %d.\n", rte_errno);
+ return;
+ }
+ replenish_send_idlembuf(sock->send_idle_ring);
+
sock->stack = get_protocol_stack();
sock->stack->conn_num++;
init_list_node(&sock->recv_list);
init_list_node(&sock->attach_list);
init_list_node(&sock->listen_list);
init_list_node(&sock->event_list);
- init_list_node(&sock->wakeup_list);
init_list_node(&sock->send_list);
}
@@ -129,7 +186,9 @@ void gazelle_clean_sock(int32_t fd)
list_del_node_init(&sock->recv_list);
list_del_node_init(&sock->attach_list);
list_del_node_init(&sock->listen_list);
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
list_del_node_init(&sock->event_list);
+#endif
list_del_node_init(&sock->send_list);
}
@@ -206,101 +265,60 @@ struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type)
void *data = rte_pktmbuf_mtod(mbuf, void *);
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
- return pbuf;
-}
-
-void stack_replenish_send_idlembuf(struct protocol_stack *stack)
-{
- uint32_t replenish_cnt = rte_ring_free_count(stack->send_idle_ring);
-
- for (uint32_t i = 0; i < replenish_cnt; i++) {
- struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM);
- if (pbuf == NULL) {
- break;
- }
-
- int32_t ret = rte_ring_sp_enqueue(stack->send_idle_ring, (void *)pbuf);
- if (ret < 0) {
- gazelle_free_pbuf(pbuf);
- break;
- }
+#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW
+ if (pbuf) {
+ pbuf->ol_flags = 0;
+ pbuf->l2_len = 0;
+ pbuf->l3_len = 0;
}
+#endif
+
+ return pbuf;
}
-ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags)
+struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags)
{
struct pbuf *pbuf = NULL;
- ssize_t send_ret = 0;
- ssize_t send_len = 0;
- do {
- if (sock->send_lastdata) {
- pbuf = sock->send_lastdata;
- sock->send_lastdata = NULL;
- } else {
- int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf);
- if (ret != 0) {
- break;
- }
- }
-
- if (sock->conn == NULL || sock->conn->pcb.tcp == NULL) {
- GAZELLE_RETURN(ENOENT);
- }
-
- uint16_t available = tcp_sndbuf(sock->conn->pcb.tcp);
- if (available < pbuf->tot_len) {
- sock->send_lastdata = pbuf;
- break;
- }
-
- ssize_t pbuf_len = pbuf->tot_len;
- send_ret = lwip_send(fd, pbuf, pbuf->tot_len, flags);
- if (send_ret > 0) {
- send_len += send_ret;
- }
- if (send_ret != pbuf_len) {
- sock->stack->stats.write_lwip_drop++;
- break;
+ if (sock->send_lastdata) {
+ pbuf = sock->send_lastdata;
+ sock->send_lastdata = NULL;
+ } else {
+ int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf);
+ if (ret != 0) {
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ return NULL;
}
+ }
- sock->stack->stats.write_lwip_cnt++;
- } while (1);
-
- return (send_ret < 0) ? send_ret : send_len;
-}
-
-void add_self_event(struct lwip_sock *sock, uint32_t events)
-{
- struct weakup_poll *wakeup = sock->weakup;
- struct protocol_stack *stack = sock->stack;
- if (wakeup == NULL || stack == NULL) {
- return;
+ if (pbuf->tot_len >= remain_size) {
+ sock->send_lastdata = pbuf;
+ *apiflags |= TCP_WRITE_FLAG_MORE; /* set TCP_PSH flag */
+ return NULL;
}
- sock->events |= events;
+ replenish_send_idlembuf(sock->send_idle_ring);
- if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) {
- return;
+ if ((sock->epoll_events & EPOLLOUT) && rte_ring_free_count(sock->send_ring)) {
+ add_epoll_event(sock->conn, EPOLLOUT);
}
- if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
- sem_post(&sock->weakup->event_sem);
- stack->stats.epoll_self_event++;
- } else {
- rpc_call_addevent(stack, sock);
- stack->stats.epoll_self_call++;
- }
+ sock->stack->stats.write_lwip_cnt++;
+ return pbuf;
}
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
{
+ if (sock->events & EPOLLERR) {
+ return 0;
+ }
+
uint32_t free_count = rte_ring_free_count(sock->send_ring);
if (free_count == 0) {
- GAZELLE_RETURN(EAGAIN);
+ return -1;
}
- uint32_t avaible_cont = rte_ring_count(sock->stack->send_idle_ring);
+
+ uint32_t avaible_cont = rte_ring_count(sock->send_idle_ring);
avaible_cont = LWIP_MIN(free_count, avaible_cont);
struct pbuf *pbuf = NULL;
@@ -309,7 +327,7 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
uint32_t send_pkt = 0;
while (send_len < len && send_pkt < avaible_cont) {
- int32_t ret = rte_ring_sc_dequeue(sock->stack->send_idle_ring, (void **)&pbuf);
+ int32_t ret = rte_ring_sc_dequeue(sock->send_idle_ring, (void **)&pbuf);
if (ret < 0) {
sock->stack->stats.app_write_idlefail++;
break;
@@ -322,28 +340,16 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
ret = rte_ring_sp_enqueue(sock->send_ring, pbuf);
if (ret != 0) {
sock->stack->stats.app_write_drop++;
- gazelle_free_pbuf(pbuf);
+ pbuf_free(pbuf);
break;
}
- sock->stack->stats.app_write_cnt++;
send_len += copy_len;
send_pkt++;
}
+ __sync_fetch_and_add(&sock->stack->stats.app_write_cnt, send_pkt);
- if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) {
- add_self_event(sock, EPOLLOUT);
- sock->stack->stats.write_events++;
- } else {
- sock->events &= ~EPOLLOUT;
- }
-
- if (rte_ring_free_count(sock->stack->send_idle_ring) > USED_IDLE_WATERMARK && !sock->stack->in_replenish) {
- sock->stack->in_replenish = true;
- rpc_call_replenish_idlembuf(sock->stack);
- }
-
- return send_len;
+ return (send_len <= 0) ? -1 : send_len;
}
void stack_send(struct rpc_msg *msg)
@@ -351,27 +357,62 @@ void stack_send(struct rpc_msg *msg)
int32_t fd = msg->args[MSG_ARG_0].i;
int32_t flags = msg->args[MSG_ARG_2].i;
- struct protocol_stack *stack = get_protocol_stack();
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
msg->result = -1;
return;
}
- msg->result = write_lwip_data(sock, fd, flags);
- __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE);
+ if (!NETCONN_IS_DATAOUT(sock)) {
+ return;
+ }
- if (msg->result >= 0 &&
- (rte_ring_count(sock->send_ring) || sock->send_lastdata)) {
+ /* send all send_ring, so len set lwip send max. */
+ ssize_t len = lwip_send(fd, sock, UINT16_MAX, flags);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ add_epoll_event(sock->conn, EPOLLERR);
+ }
+
+ /* have remain data add sendlist */
+ if (NETCONN_IS_DATAOUT(sock)) {
if (list_is_empty(&sock->send_list)) {
- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
- list_add_node(&stack->send_list, &sock->send_list);
- sock->stack->stats.send_self_rpc++;
+ sock->send_flags = flags;
+ list_add_node(&sock->stack->send_list, &sock->send_list);
}
+ sock->stack->stats.send_self_rpc++;
}
+}
- if (rte_ring_free_count(sock->send_ring)) {
- add_epoll_event(sock->conn, EPOLLOUT);
+void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
+{
+ struct list_node *node, *temp;
+ struct lwip_sock *sock;
+ uint32_t read_num = 0;
+
+ list_for_each_safe(node, temp, &stack->send_list) {
+ sock = container_of(node, struct lwip_sock, send_list);
+
+ if (sock->conn == NULL || !NETCONN_IS_DATAOUT(sock)) {
+ list_del_node_init(&sock->send_list);
+ continue;
+ }
+
+ /* send all send_ring, so len set lwip send max. */
+ ssize_t len = lwip_send(sock->conn->socket, sock, UINT16_MAX, sock->send_flags);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ add_epoll_event(sock->conn, EPOLLERR);
+ list_del_node_init(&sock->send_list);
+ }
+
+ if (!NETCONN_IS_DATAOUT(sock)) {
+ list_del_node_init(&sock->send_list);
+ }
+
+ if (++read_num >= send_max) {
+ break;
+ }
}
}
@@ -381,6 +422,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
return 0;
}
+ if (rte_ring_count(sock->recv_wait_free)) {
+ free_ring_pbuf(sock->recv_wait_free);
+ }
+
uint32_t free_count = rte_ring_free_count(sock->recv_ring);
uint32_t data_count = rte_ring_count(sock->conn->recvmbox->ring);
uint32_t read_max = LWIP_MIN(free_count, data_count);
@@ -411,6 +456,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
read_count++;
}
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_LWIP);
+ }
+
recv_len += pbuf->len;
/* once we have some data to return, only add more if we don't need to wait */
@@ -425,6 +474,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
add_epoll_event(sock->conn, EPOLLIN);
}
sock->stack->stats.read_lwip_cnt += read_count;
+
+ if (recv_len == 0) {
+ GAZELLE_RETURN(EAGAIN);
+ }
return recv_len;
}
@@ -440,7 +493,7 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags)
if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) ||
((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) ||
((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) {
- GAZELLE_RETURN(EINVAL);
+ GAZELLE_RETURN(EINVAL);
}
buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len);
}
@@ -479,16 +532,14 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
sock->send_flags = flags;
ssize_t send = write_stack_data(sock, buf, len);
-
- ssize_t ret = 0;
- if (!__atomic_load_n(&sock->have_rpc_send, __ATOMIC_ACQUIRE)) {
- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
- ret = rpc_call_send(fd, buf, len, flags);
- }
-
- if (send <= 0 || ret < 0) {
+ if (send < 0) {
GAZELLE_RETURN(EAGAIN);
+ } else if (send == 0) {
+ return 0;
}
+ rte_smp_mb();
+
+ rpc_call_send(fd, NULL, send, flags);
return send;
}
@@ -505,7 +556,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) ||
((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) ||
((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) {
- GAZELLE_RETURN(EINVAL);
+ GAZELLE_RETURN(EINVAL);
}
buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len);
}
@@ -513,7 +564,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
for (i = 0; i < message->msg_iovlen; i++) {
ret = gazelle_send(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags);
if (ret < 0) {
- return buflen == 0 ? ret : buflen;
+ return buflen == 0 ? ret : buflen;
}
buflen += ret;
}
@@ -536,6 +587,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
}
sock->recv_flags = flags;
+ if ((sock->events & EPOLLERR) && !NETCONN_IS_DATAIN(sock)) {
+ return 0;
+ }
+
while (recv_left > 0) {
if (sock->recv_lastdata) {
pbuf = sock->recv_lastdata;
@@ -556,22 +611,18 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
if (pbuf->tot_len > copy_len) {
sock->recv_lastdata = pbuf_free_header(pbuf, copy_len);
} else {
- pbuf_free(pbuf);
- sock->recv_lastdata = NULL;
- sock->stack->stats.app_read_cnt++;
if (get_protocol_stack_group()->latency_start) {
calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ);
}
+ ret = rte_ring_sp_enqueue(sock->recv_wait_free, pbuf);
+ if (ret != 0) {
+ pbuf_free(pbuf);
+ }
+ sock->recv_lastdata = NULL;
+ __sync_fetch_and_add(&sock->stack->stats.app_read_cnt, 1);
}
}
- if ((sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) {
- add_self_event(sock, EPOLLIN);
- sock->stack->stats.read_events++;
- } else {
- sock->events &= ~EPOLLIN;
- }
-
if (recvd == 0) {
sock->stack->stats.read_null++;
GAZELLE_RETURN(EAGAIN);
@@ -588,30 +639,32 @@ void add_recv_list(int32_t fd)
}
}
-void read_recv_list(void)
+void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
{
- struct protocol_stack *stack = get_protocol_stack();
struct list_node *list = &(stack->recv_list);
struct list_node *node, *temp;
struct lwip_sock *sock;
- struct lwip_sock *first_sock = NULL;
+ uint32_t read_num = 0;
list_for_each_safe(node, temp, list) {
sock = container_of(node, struct lwip_sock, recv_list);
- /* when read_lwip_data have data wait to read, add sock into recv_list. read_recv_list read this sock again.
- this is dead loop. so every sock just read one time */
- if (sock == first_sock) {
- break;
- }
- if (first_sock == NULL) {
- first_sock = sock;
+ if (sock->conn == NULL || sock->recv_ring == NULL || sock->send_ring == NULL || sock->conn->pcb.tcp == NULL) {
+ list_del_node_init(&sock->recv_list);
+ continue;
}
- /* recv_ring and send_ring maybe create fail, so check here */
- if (sock->conn && sock->recv_ring && sock->send_ring && rte_ring_free_count(sock->recv_ring)) {
+ if (rte_ring_free_count(sock->recv_ring)) {
list_del_node_init(&sock->recv_list);
- lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags);
+ ssize_t len = lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ add_epoll_event(sock->conn, EPOLLERR);
+ }
+ }
+
+ if (++read_num >= max_num) {
+ break;
}
}
}
@@ -633,11 +686,13 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s
struct lwip_sock *sock = get_socket(netconn->socket);
if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
conn->recv_ring_cnt = rte_ring_count(sock->recv_ring);
+ conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0;
+
conn->send_ring_cnt = rte_ring_count(sock->send_ring);
- struct weakup_poll *weakup = sock->weakup;
- if (weakup) {
- conn->event_ring_cnt = rte_ring_count(weakup->event_ring);
- conn->self_ring_cnt = rte_ring_count(weakup->self_ring);
+ conn->send_ring_cnt += (sock->send_lastdata) ? 1 : 0;
+
+ if (sock->wakeup) {
+ sem_getvalue(&sock->wakeup->event_sem, &conn->sem_cnt);
}
}
}
@@ -786,11 +841,6 @@ static uint32_t get_list_count(struct list_node *list)
return count;
}
-void stack_wakeuplist_count(struct rpc_msg *msg)
-{
- msg->result = get_list_count(get_protocol_stack()->wakeup_list);
-}
-
void stack_eventlist_count(struct rpc_msg *msg)
{
msg->result = get_list_count(&get_protocol_stack()->event_list);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index e5761a4..da320e2 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -17,6 +17,7 @@
#include <lwip/tcp.h>
#include <lwip/memp_def.h>
#include <lwipsock.h>
+#include <lwip/posix_api.h>
#include <rte_kni.h>
#include <securec.h>
#include <numa.h>
@@ -28,29 +29,34 @@
#include "lstack_lwip.h"
#include "lstack_protocol_stack.h"
#include "lstack_cfg.h"
-#include "lstack_weakup.h"
#include "lstack_control_plane.h"
#include "lstack_stack_stat.h"
+#define READ_LIST_MAX 32
+#define SEND_LIST_MAX 32
+#define HANDLE_RPC_MSG_MAX 32
+
static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX;
static struct protocol_stack_group g_stack_group = {0};
static PER_THREAD long g_stack_tid = 0;
+static pthread_mutex_t g_mem_mutex = PTHREAD_MUTEX_INITIALIZER;
typedef void *(*stack_thread_func)(void *arg);
-int32_t bind_to_stack_numa(int32_t stack_id)
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+void update_stack_events(struct protocol_stack *stack);
+#endif
+
+pthread_mutex_t *get_mem_mutex(void)
{
- static PER_THREAD int32_t last_stack_id = -1;
+ return &g_mem_mutex;
+}
+int32_t bind_to_stack_numa(struct protocol_stack *stack)
+{
int32_t ret;
- struct protocol_stack *stack = get_protocol_stack_group()->stacks[stack_id];
pthread_t tid = pthread_self();
- if (last_stack_id == stack_id) {
- return 0;
- }
- last_stack_id = stack_id;
-
ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %d failed\n", rte_gettid(), stack->queue_id);
@@ -159,88 +165,27 @@ void lstack_low_power_idling(void)
}
}
-int32_t init_protocol_stack(void)
+static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_func func)
{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct protocol_stack *stack = NULL;
+ /* thread may run slow, if arg is temp var maybe have relese */
+ static uint16_t queue[PROTOCOL_STACK_MAX];
+ char name[PATH_MAX];
+ pthread_t tid;
int32_t ret;
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- stack = malloc(sizeof(*stack));
- if (stack == NULL) {
- return -ENOMEM;
- }
- memset_s(stack, sizeof(*stack), 0, sizeof(*stack));
-
- stack->queue_id = i;
- stack->port_id = stack_group->port_id;
- stack->cpu_id = get_global_cfg_params()->cpus[i];
- stack->socket_id = numa_node_of_cpu(stack->cpu_id);
- if (stack->socket_id < 0) {
- LSTACK_LOG(ERR, PORT, "numa_node_of_cpu failed\n");
- return -EINVAL;
- }
-
- ret = pktmbuf_pool_init(stack, stack_group->stack_num);
- if (ret != 0) {
- return ret;
- }
-
- ret = create_shared_ring(stack);
- if (ret != 0) {
- return ret;
- }
-
- init_list_node(&stack->recv_list);
- init_list_node(&stack->listen_list);
- init_list_node(&stack->event_list);
- init_list_node(&stack->send_list);
-
- stack_group->stacks[i] = stack;
- }
-
- ret = init_stack_numa_cpuset();
- if (ret < 0) {
+ if (queue_id >= PROTOCOL_STACK_MAX) {
+ LSTACK_LOG(ERR, LSTACK, "queue_id is %d exceed max=%d\n", queue_id, PROTOCOL_STACK_MAX);
return -1;
}
+ queue[queue_id] = queue_id;
- if (!use_ltran()) {
- ret = dpdk_ethdev_init();
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n");
- return -1;
- }
-
- ret = dpdk_ethdev_start();
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
- return -1;
- }
-
- if (get_global_cfg_params()->kni_switch) {
- ret = dpdk_init_lstack_kni();
- if (ret < 0) {
- return -1;
- }
- }
- }
-
- return 0;
-}
-
-static int32_t create_thread(struct protocol_stack *stack, char *thread_name, stack_thread_func func)
-{
- char name[PATH_MAX];
- pthread_t tid;
- int32_t ret;
-
- ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, stack->queue_id);
+ ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, queue[queue_id]);
if (ret < 0) {
LSTACK_LOG(ERR, LSTACK, "set name failed\n");
return -1;
}
- ret = pthread_create(&tid, NULL, func, stack);
+ ret = pthread_create(&tid, NULL, func, &queue[queue_id]);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "pthread_create ret=%d\n", ret);
return -1;
@@ -257,148 +202,185 @@ static int32_t create_thread(struct protocol_stack *stack, char *thread_name, st
static void* gazelle_weakup_thread(void *arg)
{
- struct protocol_stack *stack = (struct protocol_stack *)arg;
- int32_t lcore_id = get_global_cfg_params()->weakup[stack->queue_id];
+ uint16_t queue_id = *(uint16_t *)arg;
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
+ int32_t lcore_id = get_global_cfg_params()->wakeup[stack->queue_id];
thread_affinity_init(lcore_id);
- LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
- struct list_node wakeup_list;
- init_list_node(&wakeup_list);
- stack->wakeup_list = &wakeup_list;
+ LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
for (;;) {
- wakeup_list_sock(&wakeup_list);
+ sem_t *event_sem;
+ if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) {
+ continue;
+ }
- weakup_thread(stack->weakup_ring, &wakeup_list);
+ sem_post(event_sem);
}
return NULL;
}
-static void stack_thread_init(struct protocol_stack *stack)
+static void init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
{
- uint16_t queue_id = stack->queue_id;
- int32_t ret;
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
+ memset_s(stack, sizeof(*stack), 0, sizeof(*stack));
set_stack_idx(queue_id);
stack->tid = gettid();
+ stack->queue_id = queue_id;
+ stack->port_id = stack_group->port_id;
+ stack->cpu_id = get_global_cfg_params()->cpus[queue_id];
stack->lwip_stats = &lwip_stats;
- RTE_PER_LCORE(_lcore_id) = stack->cpu_id;
- thread_affinity_init(stack->cpu_id);
+ init_list_node(&stack->recv_list);
+ init_list_node(&stack->listen_list);
+ init_list_node(&stack->event_list);
+ init_list_node(&stack->send_list);
+
+ pthread_spin_init(&stack->event_lock, PTHREAD_PROCESS_SHARED);
sys_calibrate_tsc();
+ stack_stat_init();
- hugepage_init();
+ stack_group->stacks[queue_id] = stack;
+}
- stack_replenish_send_idlembuf(stack);
+static struct protocol_stack * stack_thread_init(uint16_t queue_id)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
- tcpip_init(NULL, NULL);
+ struct protocol_stack *stack = malloc(sizeof(*stack));
+ if (stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n");
+ return NULL;
+ }
+ init_stack_value(stack, queue_id);
- if (use_ltran()) {
- ret = client_reg_thrd_ring();
- if (ret != 0) {
- LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret);
- }
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(stack->cpu_id, &cpuset);
+ if (rte_thread_set_affinity(&cpuset) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "rte_thread_set_affinity failed\n");
+ free(stack);
+ return NULL;
}
+ RTE_PER_LCORE(_lcore_id) = stack->cpu_id;
- ret = ethdev_init(stack);
+ stack->socket_id = numa_node_of_cpu(stack->cpu_id);
+ if (stack->socket_id < 0) {
+ LSTACK_LOG(ERR, LSTACK, "numa_node_of_cpu failed\n");
+ free(stack);
+ return NULL;
+ }
+
+ int32_t ret = pktmbuf_pool_init(stack, stack_group->stack_num);
if (ret != 0) {
- LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret);
+ free(stack);
+ return NULL;
}
- stack_stat_init();
+ ret = create_shared_ring(stack);
+ if (ret != 0) {
+ free(stack);
+ return NULL;
+ }
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- sem_post(&stack_group->thread_inited);
- LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
-}
+ thread_affinity_init(stack->cpu_id);
-static void report_stack_event(struct protocol_stack *stack)
-{
- struct list_node *list = &(stack->event_list);
- struct list_node *node, *temp;
- struct lwip_sock *sock;
+ hugepage_init();
- list_for_each_safe(node, temp, list) {
- sock = container_of(node, struct lwip_sock, event_list);
+ tcpip_init(NULL, NULL);
- if (weakup_enqueue(stack->weakup_ring, sock) == 0) {
- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
- list_del_node_init(&sock->event_list);
- stack->stats.weakup_events++;
- } else {
- break;
+ if (use_ltran()) {
+ ret = client_reg_thrd_ring();
+ if (ret != 0) {
+ free(stack);
+ return NULL;
}
}
-}
-
-static void send_stack_list(struct protocol_stack *stack)
-{
- struct list_node *list = &(stack->send_list);
- struct list_node *node, *temp;
- struct lwip_sock *sock;
- list_for_each_safe(node, temp, list) {
- sock = container_of(node, struct lwip_sock, send_list);
+ sem_post(&stack_group->thread_phase1);
- if (sock->conn == NULL || sock->stack == NULL) {
- list_del_node_init(&sock->send_list);
- continue;
- }
+ int32_t sem_val;
+ do {
+ sem_getvalue(&stack_group->ethdev_init, &sem_val);
+ } while (!sem_val && !use_ltran());
- ssize_t ret = write_lwip_data(sock, sock->conn->socket, sock->send_flags);
- __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE);
- if (ret >= 0 &&
- (rte_ring_count(sock->send_ring) || sock->send_lastdata)) {
- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
- } else {
- list_del_node_init(&sock->send_list);
- }
+ ret = ethdev_init(stack);
+ if (ret != 0) {
+ free(stack);
+ return NULL;
+ }
- if (rte_ring_free_count(sock->send_ring)) {
- add_epoll_event(sock->conn, EPOLLOUT);
+ if (stack_group->wakeup_enable) {
+ ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread);
+ if (ret != 0) {
+ free(stack);
+ return NULL;
}
}
+
+ return stack;
}
static void* gazelle_stack_thread(void *arg)
{
- struct protocol_stack *stack = (struct protocol_stack *)arg;
+ uint16_t queue_id = *(uint16_t *)arg;
- stack_thread_init(stack);
+ struct protocol_stack *stack = stack_thread_init(queue_id);
+ if (stack == NULL) {
+ pthread_mutex_lock(&g_mem_mutex);
+ LSTACK_EXIT(1, "stack_thread_init failed\n");
+ pthread_mutex_unlock(&g_mem_mutex);
+ }
+ LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
for (;;) {
- poll_rpc_msg(stack);
+ poll_rpc_msg(stack, HANDLE_RPC_MSG_MAX);
eth_dev_poll();
- read_recv_list();
+ read_recv_list(stack, READ_LIST_MAX);
- sys_timer_run();
+ send_stack_list(stack, SEND_LIST_MAX);
- report_stack_event(stack);
+ sys_timer_run();
- send_stack_list(stack);
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+ update_stack_events(stack);
+#endif
}
return NULL;
}
-int32_t create_stack_thread(void)
+int32_t init_protocol_stack(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
int32_t ret;
- ret = sem_init(&stack_group->thread_inited, 0, 0);
+ stack_group->stack_num = get_global_cfg_params()->num_cpu;
+ stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false;
+
+ if (!use_ltran()) {
+ ret = sem_init(&stack_group->ethdev_init, 0, 0);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, PORT, "sem_init failed\n");
+ return -1;
+ }
+ }
+
+ ret = sem_init(&stack_group->thread_phase1, 0, 0);
if (ret < 0) {
LSTACK_LOG(ERR, PORT, "sem_init failed\n");
return -1;
}
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- ret = create_thread(stack_group->stacks[i], "gazellestack", gazelle_stack_thread);
+ ret = create_thread(i, "gazellestack", gazelle_stack_thread);
if (ret != 0) {
return ret;
}
@@ -406,14 +388,12 @@ int32_t create_stack_thread(void)
int32_t thread_inited_num;
do {
- sem_getvalue(&stack_group->thread_inited, &thread_inited_num);
+ sem_getvalue(&stack_group->thread_phase1, &thread_inited_num);
} while (thread_inited_num < stack_group->stack_num);
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- ret = create_thread(stack_group->stacks[i], "gazelleweakup", gazelle_weakup_thread);
- if (ret != 0) {
- return ret;
- }
+ ret = init_stack_numa_cpuset();
+ if (ret < 0) {
+ return -1;
}
return 0;
@@ -440,7 +420,6 @@ static inline bool is_real_close(int32_t fd)
/* last sock */
if (list_is_empty(&sock->attach_list)) {
- list_del_node_init(&sock->attach_list);
return true;
}
@@ -557,24 +536,6 @@ void stack_listen(struct rpc_msg *msg)
}
}
-static bool have_accept_event(int32_t fd)
-{
- do {
- struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
- break;
- }
-
- if (NETCONN_IS_ACCEPTIN(sock)) {
- return true;
- }
-
- fd = sock->nextfd;
- } while (fd > 0);
-
- return false;
-}
-
void stack_accept(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
@@ -593,7 +554,7 @@ void stack_accept(struct rpc_msg *msg)
}
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d attach_fd %d failed %d\n", get_stack_tid(), msg->args[MSG_ARG_0].i,
- fd, accept_fd);
+ fd, accept_fd);
msg->result = -1;
}
@@ -768,13 +729,11 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen)
{
struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
+ if (sock == NULL || sock->attach_fd < 0) {
errno = EINVAL;
return -1;
}
-
fd = sock->attach_fd;
- int32_t head_fd = fd;
struct lwip_sock *min_sock = NULL;
int32_t min_fd = fd;
@@ -783,15 +742,19 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add
if (sock == NULL) {
GAZELLE_RETURN(EINVAL);
}
+ struct lwip_sock *attach_sock = get_socket(sock->attach_fd);
+ if (attach_sock == NULL) {
+ GAZELLE_RETURN(EINVAL);
+ }
- if (!NETCONN_IS_ACCEPTIN(sock)) {
+ if (!NETCONN_IS_ACCEPTIN(attach_sock)) {
fd = sock->nextfd;
continue;
}
- if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) {
- min_sock = sock;
- min_fd = fd;
+ if (min_sock == NULL || min_sock->stack->conn_num > attach_sock->stack->conn_num) {
+ min_sock = attach_sock;
+ min_fd = sock->attach_fd;
}
fd = sock->nextfd;
@@ -802,13 +765,7 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add
ret = rpc_call_accept(min_fd, addr, addrlen);
}
- if (have_accept_event(head_fd)) {
- add_self_event(sock, EPOLLIN);
- sock = get_socket(head_fd);
- sock->stack->stats.accept_events++;
- }
-
- if(ret < 0) {
+ if (ret < 0) {
errno = EAGAIN;
}
return ret;
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 1813906..743857f 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -105,8 +105,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
lstack_get_low_power_info(&dfx->low_power_info);
memcpy_s(&dfx->data.pkts, sizeof(dfx->data.pkts), &stack->stats, sizeof(dfx->data.pkts));
dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail;
- dfx->data.pkts.weakup_ring_cnt = rte_ring_count(stack->weakup_ring);
- dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring);
int32_t rpc_call_result = rpc_call_msgcnt(stack);
dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
@@ -120,10 +118,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
rpc_call_result = rpc_call_sendlistcnt(stack);
dfx->data.pkts.send_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
- if (stack->wakeup_list) {
- rpc_call_result = rpc_call_eventlistcnt(stack);
- dfx->data.pkts.wakeup_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
- }
dfx->data.pkts.conn_num = stack->conn_num;
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 2a67333..26725f7 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -19,9 +19,10 @@
#include "lstack_protocol_stack.h"
#include "lstack_control_plane.h"
#include "gazelle_base_func.h"
+#include "lstack_dpdk.h"
#include "lstack_thread_rpc.h"
-#define HANDLE_RPC_MSG_MAX (8)
+static PER_THREAD struct rte_mempool *rpc_pool = NULL;
static inline __attribute__((always_inline))
struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
@@ -33,11 +34,20 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
return NULL;
}
- ret = rte_mempool_get(stack->rpc_pool, (void **)&msg);
+ static uint16_t pool_index = 0;
+ if (rpc_pool == NULL) {
+ rpc_pool = create_rpc_mempool("rpc_msg", pool_index++);
+ if (rpc_pool == NULL) {
+ return NULL;
+ }
+ }
+
+ ret = rte_mempool_get(rpc_pool, (void **)&msg);
if (ret < 0) {
get_protocol_stack_group()->call_alloc_fail++;
return NULL;
}
+ msg->pool = rpc_pool;
pthread_spin_init(&msg->lock, PTHREAD_PROCESS_SHARED);
msg->func = func;
@@ -47,13 +57,13 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
}
static inline __attribute__((always_inline))
-void rpc_msg_free(struct rte_mempool *pool, struct rpc_msg *msg)
+void rpc_msg_free(struct rpc_msg *msg)
{
pthread_spin_destroy(&msg->lock);
msg->self_release = 0;
msg->func = NULL;
- rte_mempool_put(pool, (void *)msg);
+ rte_mempool_put(msg->pool, (void *)msg);
}
static inline __attribute__((always_inline))
@@ -64,7 +74,7 @@ void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
}
static inline __attribute__((always_inline))
-int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rpc_msg *msg)
+int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg)
{
int32_t ret;
@@ -74,20 +84,20 @@ int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rp
pthread_spin_lock(&msg->lock);
ret = msg->result;
- rpc_msg_free(pool, msg);
+ rpc_msg_free(msg);
return ret;
}
-void poll_rpc_msg(struct protocol_stack *stack)
+void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
{
- int32_t num;
+ uint32_t num;
struct rpc_msg *msg = NULL;
num = 0;
- while (num++ < HANDLE_RPC_MSG_MAX) {
+ while (num++ < max_num) {
lockless_queue_node *node = lockless_queue_mpsc_pop(&stack->rpc_queue);
if (node == NULL) {
- return;
+ break;
}
msg = container_of(node, struct rpc_msg, queue_node);
@@ -103,7 +113,7 @@ void poll_rpc_msg(struct protocol_stack *stack)
if (msg->self_release) {
pthread_spin_unlock(&msg->lock);
} else {
- rpc_msg_free(stack->rpc_pool, msg);
+ rpc_msg_free(msg);
}
}
}
@@ -118,7 +128,7 @@ int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint3
msg->args[MSG_ARG_0].p = conn_table;
msg->args[MSG_ARG_1].u = max_conn;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_connnum(struct protocol_stack *stack)
@@ -128,7 +138,7 @@ int32_t rpc_call_connnum(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
@@ -142,7 +152,7 @@ int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struc
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
static void rpc_msgcnt(struct rpc_msg *msg)
@@ -158,7 +168,7 @@ int32_t rpc_call_msgcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn)
@@ -168,7 +178,7 @@ int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn)
return -1;
}
msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
@@ -178,17 +188,7 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
return -1;
}
msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
-}
-
-int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_wakeuplist_count);
- if (msg == NULL) {
- return -1;
- }
-
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
@@ -198,7 +198,7 @@ int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
@@ -208,7 +208,7 @@ int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
@@ -218,7 +218,7 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
void add_epoll_event(struct netconn *conn, uint32_t event);
@@ -243,24 +243,6 @@ void rpc_call_addevent(struct protocol_stack *stack, void *sock)
rpc_call(&stack->rpc_queue, msg);
}
-static void rpc_replenish_idlembuf(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- stack_replenish_send_idlembuf(stack);
- stack->in_replenish = 0;
-}
-
-void rpc_call_replenish_idlembuf(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_replenish_idlembuf);
- if (msg == NULL) {
- return;
- }
-
- msg->self_release = 0;
- rpc_call(&stack->rpc_queue, msg);
-}
-
int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp);
@@ -287,7 +269,7 @@ int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol)
msg->args[MSG_ARG_1].i = type;
msg->args[MSG_ARG_2].i = protocol;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_close(int fd)
@@ -300,7 +282,7 @@ int32_t rpc_call_close(int fd)
msg->args[MSG_ARG_0].i = fd;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
@@ -315,7 +297,7 @@ int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_listen(int s, int backlog)
@@ -329,7 +311,7 @@ int32_t rpc_call_listen(int s, int backlog)
msg->args[MSG_ARG_0].i = s;
msg->args[MSG_ARG_1].i = backlog;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen)
@@ -344,7 +326,7 @@ int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
@@ -359,7 +341,7 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen)
@@ -374,7 +356,7 @@ int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen)
@@ -389,7 +371,7 @@ int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen)
@@ -406,7 +388,7 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle
msg->args[MSG_ARG_3].p = optval;
msg->args[MSG_ARG_4].p = optlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen)
@@ -423,7 +405,7 @@ int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval,
msg->args[MSG_ARG_3].cp = optval;
msg->args[MSG_ARG_4].socklen = optlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_fcntl(int fd, int cmd, long val)
@@ -438,7 +420,7 @@ int32_t rpc_call_fcntl(int fd, int cmd, long val)
msg->args[MSG_ARG_1].i = cmd;
msg->args[MSG_ARG_2].l = val;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
@@ -453,7 +435,7 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
msg->args[MSG_ARG_1].l = cmd;
msg->args[MSG_ARG_2].p = argp;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
@@ -486,7 +468,7 @@ int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags)
msg->args[MSG_ARG_1].cp = msghdr;
msg->args[MSG_ARG_2].i = flags;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags)
@@ -501,5 +483,5 @@ int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags)
msg->args[MSG_ARG_1].p = msghdr;
msg->args[MSG_ARG_2].i = flags;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 48b7e44..345a373 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -66,7 +66,7 @@ struct cfg_params {
uint16_t num_cpu;
uint32_t cpus[CFG_MAX_CPUS];
uint16_t num_wakeup;
- uint32_t weakup[CFG_MAX_CPUS];
+ uint32_t wakeup[CFG_MAX_CPUS];
uint8_t num_ports;
uint16_t ports[CFG_MAX_PORTS];
char log_file[PATH_MAX];
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index e76a9a6..e8080e1 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -37,10 +37,10 @@ struct protocol_stack;
#define MBUF_SZ (MAX_PACKET_SZ + RTE_PKTMBUF_HEADROOM)
-#define MAX_CORE_NUM 256
+#define MAX_CORE_NUM 256
#define CALL_MSG_RING_SIZE (unsigned long long)32
-#define CALL_CACHE_SZ 64
-#define CALL_POOL_SZ ((VDEV_CALL_QUEUE_SZ << 1) + (2 * CALL_CACHE_SZ * MAX_CORE_NUM))
+#define CALL_CACHE_SZ 0
+#define CALL_POOL_SZ 128
/* Layout:
* | rte_mbuf | pbuf | custom_free_function | payload |
@@ -62,6 +62,7 @@ void dpdk_eal_init(void);
int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num);
struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id);
int32_t create_shared_ring(struct protocol_stack *stack);
+struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id);
void lstack_log_level_init(void);
int dpdk_ethdev_init(void);
int dpdk_ethdev_start(void);
diff --git a/src/lstack/include/lstack_log.h b/src/lstack/include/lstack_log.h
index 383495d..8b4209a 100644
--- a/src/lstack/include/lstack_log.h
+++ b/src/lstack/include/lstack_log.h
@@ -15,17 +15,14 @@
#include <stdio.h>
#include <syslog.h>
-
#include <rte_log.h>
-#include "lwipopts.h"
-
-#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1
+#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1
#define LSTACK_EXIT(a, fmt, ...) rte_exit(a, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__)
#define LSTACK_LOG(a, b, fmt, ...) (void)RTE_LOG(a, b, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__)
-#define LSTACK_INFO LOG_INFO
-#define LSTACK_ERR LOG_ERR
+#define LSTACK_INFO LOG_INFO
+#define LSTACK_ERR LOG_ERR
/* before rte_eal_init */
#define LSTACK_PRE_LOG(level, fmt, ...) \
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index ffd3b80..c73e3a7 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -21,32 +21,31 @@
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
#define NETCONN_IS_DATAIN(sock) ((rte_ring_count((sock)->recv_ring) || (sock)->recv_lastdata))
-#define NETCONN_IS_DATAOUT(sock) rte_ring_free_count((sock)->send_ring)
+#define NETCONN_IS_DATAOUT(sock) ((rte_ring_count((sock)->send_ring) || (sock)->send_lastdata))
+#define NETCONN_IS_OUTIDLE(sock) rte_ring_free_count((sock)->send_ring)
void create_shadow_fd(struct rpc_msg *msg);
void listen_list_add_node(int32_t head_fd, int32_t add_fd);
void gazelle_init_sock(int32_t fd);
int32_t gazelle_socket(int domain, int type, int protocol);
void gazelle_clean_sock(int32_t fd);
-ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags);
+struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags);
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len);
ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags);
-void read_recv_list(void);
+void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
+void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
void add_recv_list(int32_t fd);
void stack_sendlist_count(struct rpc_msg *msg);
void stack_eventlist_count(struct rpc_msg *msg);
-void stack_wakeuplist_count(struct rpc_msg *msg);
void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
void stack_send(struct rpc_msg *msg);
-void stack_replenish_send_idlembuf(struct protocol_stack *stack);
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num);
void gazelle_free_pbuf(struct pbuf *pbuf);
ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags);
ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags);
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags);
-void add_self_event(struct lwip_sock *sock, uint32_t events);
#endif
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 39052e1..9753385 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -14,6 +14,7 @@
#define __GAZELLE_PROTOCOL_STACK_H__
#include <semaphore.h>
+#include <pthread.h>
#include <lwip/list.h>
#include <lwip/netif.h>
#include "dpdk_common.h"
@@ -28,38 +29,32 @@ struct protocol_stack {
uint16_t socket_id;
uint16_t cpu_id;
volatile uint16_t conn_num;
- volatile bool in_replenish;
-
- // for dispatcher thread
- cpu_set_t idle_cpuset;
+ cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */
lockless_queue rpc_queue;
- struct rte_ring *weakup_ring;
-
struct rte_mempool *rx_pktmbuf_pool;
struct rte_mempool *tx_pktmbuf_pool;
- struct rte_mempool *rpc_pool;
struct rte_ring *rx_ring;
struct rte_ring *tx_ring;
struct rte_ring *reg_ring;
- struct rte_ring *send_idle_ring;
+ struct rte_ring *wakeup_ring;
+
struct reg_ring_msg *reg_buf;
struct netif netif;
uint32_t rx_ring_used;
uint32_t tx_ring_used;
+ struct eth_dev_ops *dev_ops;
struct list_node recv_list;
struct list_node listen_list;
- struct list_node event_list;
struct list_node send_list;
- struct list_node *wakeup_list;
+ struct list_node event_list;
+ pthread_spinlock_t event_lock;
struct gazelle_stat_pkts stats;
struct gazelle_stack_latency latency;
struct stats_ *lwip_stats;
-
- struct eth_dev_ops *dev_ops;
};
struct eth_params;
@@ -67,25 +62,35 @@ struct eth_params;
struct protocol_stack_group {
uint16_t stack_num;
uint16_t port_id;
- sem_t thread_inited;
+ sem_t thread_phase1;
+ sem_t ethdev_init;
struct rte_mempool *kni_pktmbuf_pool;
struct eth_params *eth_params;
struct protocol_stack *stacks[PROTOCOL_STACK_MAX];
+ bool wakeup_enable;
/* dfx stats */
bool latency_start;
uint64_t call_alloc_fail;
};
+struct wakeup_poll {
+ bool init;
+ struct protocol_stack *bind_stack;
+ struct list_node event_list; /* epoll temp use poll */
+ sem_t event_sem;
+};
+
long get_stack_tid(void);
+pthread_mutex_t *get_mem_mutex(void);
struct protocol_stack *get_protocol_stack(void);
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
struct protocol_stack *get_minconn_protocol_stack(void);
struct protocol_stack_group *get_protocol_stack_group(void);
int32_t init_protocol_stack(void);
-int32_t create_stack_thread(void);
-int bind_to_stack_numa(int stack_id);
+int32_t bind_to_stack_numa(struct protocol_stack *stack);
+int32_t init_dpdk_ethdev(void);
/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack);
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index 20539d9..61bcd38 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -42,21 +42,20 @@ struct rpc_msg {
int32_t self_release; /* 0:msg handler release msg 1:msg sender release msg */
int64_t result; /* func return val */
lockless_queue_node queue_node;
+ struct rte_mempool *pool;
rpc_msg_func func; /* msg handle func hook */
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
};
struct protocol_stack;
-void poll_rpc_msg(struct protocol_stack *stack);
-void rpc_call_replenish_idlembuf(struct protocol_stack *stack);
+void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num);
void rpc_call_addevent(struct protocol_stack *stack, void *sock);
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
int32_t rpc_call_eventlistcnt(struct protocol_stack *stack);
int32_t rpc_call_sendlistcnt(struct protocol_stack *stack);
-int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack);
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn);
diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h
index 916b1e2..31a997d 100644
--- a/src/lstack/include/lstack_vdev.h
+++ b/src/lstack/include/lstack_vdev.h
@@ -23,7 +23,7 @@
#define VDEV_EVENT_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_REG_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_CALL_QUEUE_SZ (DEFAULT_RING_SIZE)
-#define VDEV_WEAKUP_QUEUE_SZ (DEFAULT_RING_SIZE)
+#define VDEV_WAKEUP_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_IDLE_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_TX_QUEUE_SZ (DEFAULT_RING_SIZE)
diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h
index 2b3cff4..cac640b 100644
--- a/src/lstack/include/posix/lstack_epoll.h
+++ b/src/lstack/include/posix/lstack_epoll.h
@@ -17,6 +17,8 @@
extern "C" {
#endif
+#include <poll.h>
+
int32_t lstack_epoll_create(int32_t size);
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event);
int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event *events, int32_t maxevents, int32_t timeout);
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index fdca602..696dfb9 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -16,7 +16,6 @@ kni_switch=0
low_power_mode=0
num_cpus="2"
-num_wakeup="3"
host_addr="192.168.1.10"
mask_addr="255.255.255.0"
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 8b2193f..ae39403 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -51,6 +51,9 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
stack->stats.rx_allocmbuf_fail++;
break;
}
+#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW
+ next->ol_flags = m->ol_flags;
+#endif
if (head == NULL) {
head = next;
@@ -73,18 +76,19 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
}
}
+#define READ_PKTS_MAX 32
int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
- struct rte_mbuf *pkts[DPDK_PKT_BURST_SIZE];
+ struct rte_mbuf *pkts[READ_PKTS_MAX];
struct protocol_stack *stack = get_protocol_stack();
- nr_pkts = stack->dev_ops->rx_poll(stack, pkts, DPDK_PKT_BURST_SIZE);
+ nr_pkts = stack->dev_ops->rx_poll(stack, pkts, READ_PKTS_MAX);
if (nr_pkts == 0) {
return nr_pkts;
}
- if (get_protocol_stack_group()->latency_start) {
+ if (!use_ltran() && get_protocol_stack_group()->latency_start) {
uint64_t time_stamp = get_current_time();
time_stamp_into_mbuf(nr_pkts, pkts, time_stamp);
}
@@ -146,19 +150,6 @@ static err_t eth_dev_output(struct netif *netif, struct pbuf *pbuf)
return ERR_OK;
}
-static err_t eth_dev_input(struct pbuf *p, struct netif *netif)
-{
- err_t ret = ethernet_input(p, netif);
- if (ret != ERR_OK) {
- return ret;
- }
-
- if (get_protocol_stack_group()->latency_start) {
- calculate_lstack_latency(&get_protocol_stack()->latency, p, GAZELLE_LATENCY_LWIP);
- }
- return ret;
-}
-
static err_t eth_dev_init(struct netif *netif)
{
struct cfg_params *cfg = get_global_cfg_params();
@@ -200,7 +191,7 @@ int32_t ethdev_init(struct protocol_stack *stack)
netif_set_default(&stack->netif);
struct netif *netif = netif_add(&stack->netif, &cfg->host_addr, &cfg->netmask, &cfg->gateway_addr, NULL,
- eth_dev_init, eth_dev_input);
+ eth_dev_init, ethernet_input);
if (netif == NULL) {
LSTACK_LOG(ERR, LSTACK, "netif_add failed\n");
return ERR_IF;
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 57d3bce..5a4e86a 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -91,7 +91,7 @@ static uint32_t vdev_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkt
uint32_t sent_pkts = 0;
do {
- sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts);
+ sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts - sent_pkts);
} while (sent_pkts < nr_pkts);
return sent_pkts;
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 8db5791..8d71966 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -561,27 +561,16 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("app_write_drop: %-13"PRIu64" ", lstack_stat->data.pkts.app_write_drop);
printf("write_lwip_drop: %-12"PRIu64" ", lstack_stat->data.pkts.write_lwip_drop);
printf("app_write_idlebuf: %-10"PRIu16" \n", lstack_stat->data.pkts.send_idle_ring_cnt);
+ printf("event_list: %-17"PRIu64" ", lstack_stat->data.pkts.event_list);
printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list);
- printf("weakup_ring_cnt: %-12"PRIu16" ", lstack_stat->data.pkts.weakup_ring_cnt);
printf("conn_num: %-19"PRIu16" \n", lstack_stat->data.pkts.conn_num);
- printf("weakup_events: %-14"PRIu64" ", lstack_stat->data.pkts.weakup_events);
- printf("lwip_events: %-16"PRIu64" ", lstack_stat->data.pkts.lwip_events);
- printf("app_events: %-17"PRIu64"\n", lstack_stat->data.pkts.app_events);
- printf("epoll_pending: %-14"PRIu64" ", lstack_stat->data.pkts.epoll_pending);
- printf("epoll_self_event: %-11"PRIu64" ", lstack_stat->data.pkts.epoll_self_event);
- printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event);
- printf("read_events: %-16"PRIu64" ", lstack_stat->data.pkts.read_events);
- printf("write_events: %-15"PRIu64" ", lstack_stat->data.pkts.write_events);
- printf("accept_events: %-14"PRIu64" \n", lstack_stat->data.pkts.accept_events);
- printf("read_null: %-18"PRIu64" ", lstack_stat->data.pkts.read_null);
- printf("wakeup_list: %-16"PRIu64" ", lstack_stat->data.pkts.wakeup_list);
- printf("event_list: %-17"PRIu64" \n", lstack_stat->data.pkts.event_list);
- printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc);
- printf("epoll_pending_call: %-9"PRIu64" ", lstack_stat->data.pkts.epoll_pending_call);
- printf("epoll_self_call: %-12"PRIu64" \n", lstack_stat->data.pkts.epoll_self_call);
+ printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.wakeup_events);
+ printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.app_events);
+ printf("read_null: %-18"PRIu64" \n", lstack_stat->data.pkts.read_null);
printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail);
printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null);
+ printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc);
printf("send_list: %-18"PRIu64" \n", lstack_stat->data.pkts.send_list);
}
@@ -884,7 +873,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
printf("Active Internet connections (servers and established)\n");
do {
printf("\n------ stack tid: %6u ------\n", stat->tid);
- printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address"
+ printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt Local Address "
" Foreign Address State\n");
uint32_t unread_pkts = 0;
uint32_t unsend_pkts = 0;
@@ -894,13 +883,13 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
rip.s_addr = conn_info->rip;
lip.s_addr = conn_info->lip;
if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) {
- printf("%-6utcp %-10u%-11u%-9u%-11u%-7u%-12u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
- conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->event_ring_cnt,
- conn_info->self_ring_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
+ printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
+ conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->sem_cnt,
+ inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port,
tcp_state_to_str(conn_info->tcp_sub_state));
} else if (conn_info->state == GAZELLE_LISTEN_LIST) {
- printf("%-6utcp %-60u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
+ printf("%-6utcp %-50u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
} else {
printf("Got unknow tcp conn::%s:%5hu, state:%u\n",
diff --git a/src/ltran/ltran_opt.h b/src/ltran/ltran_opt.h
index e4e085d..1117898 100644
--- a/src/ltran/ltran_opt.h
+++ b/src/ltran/ltran_opt.h
@@ -34,12 +34,12 @@
#define GAZELLE_KNI_ETHERNET_HEADER_SIZE 14
#define GAZELLE_KNI_ETHERNET_FCS_SIZE 4
-#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%d"
-#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%d"
+#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%u"
+#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%u"
#define GAZELLE_PKT_MBUF_POOL_NAME_LENGTH 64
#define GAZELLE_BOND_NAME_LENGTH 64
-#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%d"
+#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%hu"
#define GAZELLE_BOND_QUEUE_MIN 1
#define GAZELLE_BOND_QUEUE_MAX 64
--
1.8.3.1