From 9b4379914e97d4c0c267033559bf86d20c7381b6 Mon Sep 17 00:00:00 2001 From: jiangheng Date: Wed, 30 Mar 2022 00:04:46 +0800 Subject: [PATCH] refactor event --- README.md | 3 +- src/common/gazelle_dfx_msg.h | 18 +- src/lstack/api/lstack_epoll.c | 509 +++++++++++++++-------------- src/lstack/api/lstack_signal.c | 1 + src/lstack/core/lstack_cfg.c | 27 +- src/lstack/core/lstack_dpdk.c | 99 ++++-- src/lstack/core/lstack_init.c | 13 +- src/lstack/core/lstack_lwip.c | 352 +++++++++++--------- src/lstack/core/lstack_protocol_stack.c | 345 +++++++++---------- src/lstack/core/lstack_stack_stat.c | 6 - src/lstack/core/lstack_thread_rpc.c | 106 +++--- src/lstack/include/lstack_cfg.h | 2 +- src/lstack/include/lstack_dpdk.h | 7 +- src/lstack/include/lstack_log.h | 9 +- src/lstack/include/lstack_lwip.h | 11 +- src/lstack/include/lstack_protocol_stack.h | 35 +- src/lstack/include/lstack_thread_rpc.h | 5 +- src/lstack/include/lstack_vdev.h | 2 +- src/lstack/include/posix/lstack_epoll.h | 2 + src/lstack/lstack.conf | 1 - src/lstack/netif/lstack_ethdev.c | 25 +- src/lstack/netif/lstack_vdev.c | 2 +- src/ltran/ltran_dfx.c | 31 +- src/ltran/ltran_opt.h | 6 +- 24 files changed, 822 insertions(+), 795 deletions(-) diff --git a/README.md b/README.md index e914b26..24079c7 100644 --- a/README.md +++ b/README.md @@ -236,7 +236,7 @@ Usage: gazellectl [-h | help] - 提供的命令行、配置文件以及配置大页内存需要root权限执行或修改。非root用户使用,需先提权以及修改文件权限。 - 若要把用户态网卡绑回内核驱动,必须先将Gazelle退出。 - 不支持accept阻塞模式或者connect阻塞模式。 -- 最多只支持20000个链接(需要保证进程内,非网络连接的fd个数小于2000个)。 +- 最多只支持1500个连接。 - 协议栈当前只支持tcp、icmp、arp、ipv4。 - 大页内存不支持在挂载点里创建子目录重新挂载。 - 在对端ping时,要求指定报文长度小于等于14000。 @@ -253,6 +253,7 @@ Usage: gazellectl [-h | help] - 不使用ltran模式,kni网口只支持本地通讯使用,且需要启动前配置NetworkManager不管理kni网卡 - 虚拟kni网口的ip及mac地址,需要与lstack配置文件保持一致 - gazelle运行过程中,不允许删除运行文件,如果删除,需要重启gazelle +- lstack配置的ip需要与应用程序的ip保持一致 ## Security risk note gazelle有如下安全风险,用户需要评估使用场景风险 diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h index de669f5..6db67ee 100644 --- a/src/common/gazelle_dfx_msg.h +++ b/src/common/gazelle_dfx_msg.h @@ -65,11 +65,9 @@ struct gazelle_stat_pkts { uint64_t rx_allocmbuf_fail; uint64_t tx_allocmbuf_fail; uint64_t call_msg_cnt; - uint16_t weakup_ring_cnt; uint16_t conn_num; uint16_t send_idle_ring_cnt; uint64_t event_list; - uint64_t wakeup_list; uint64_t read_lwip_drop; uint64_t read_lwip_cnt; uint64_t write_lwip_drop; @@ -79,22 +77,13 @@ struct gazelle_stat_pkts { uint64_t app_write_idlefail; uint64_t app_write_drop; uint64_t recv_list; - uint64_t lwip_events; - uint64_t weakup_events; + uint64_t wakeup_events; uint64_t app_events; uint64_t call_alloc_fail; - uint64_t read_events; - uint64_t write_events; - uint64_t accept_events; uint64_t read_null; - uint64_t remove_event; - uint64_t send_self_rpc; uint64_t call_null; uint64_t arp_copy_fail; - uint64_t epoll_pending; - uint64_t epoll_pending_call; - uint64_t epoll_self_call; - uint64_t epoll_self_event; + uint64_t send_self_rpc; uint64_t send_list; }; @@ -169,8 +158,7 @@ struct gazelle_stat_lstack_conn_info { uint32_t send_ring_cnt; uint32_t recv_ring_cnt; uint32_t tcp_sub_state; - uint32_t event_ring_cnt; - uint32_t self_ring_cnt; + int32_t sem_cnt; }; struct gazelle_stat_lstack_conn { diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c index e54d496..b8d53f6 100644 --- a/src/lstack/api/lstack_epoll.c +++ b/src/lstack/api/lstack_epoll.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -21,6 +22,7 @@ #include #include #include +#include #include "lstack_compiler.h" #include "lstack_ethdev.h" @@ -28,127 +30,103 @@ #include "lstack_cfg.h" #include "lstack_log.h" #include "gazelle_base_func.h" -#include "lstack_weakup.h" #include "lstack_lwip.h" #include "lstack_protocol_stack.h" -#define EPOLL_INTERVAL_10MS 10000000 +#define EPOLL_KERNEL_INTERVAL 10 /* ms */ +#define EPOLL_NSEC_TO_SEC 1000000000 +#define EPOLL_MAX_EVENTS 512 -static PER_THREAD struct weakup_poll g_weakup_poll = {0}; - -enum POLL_TYPE { - TYPE_POLL, - TYPE_EPOLL, -}; - -static inline bool check_event_vaild(struct lwip_sock *sock, uint32_t event) -{ - if ((event & EPOLLIN) && !NETCONN_IS_ACCEPTIN(sock) && !NETCONN_IS_DATAIN(sock)) { - event &= ~EPOLLIN; - } - - if ((event & EPOLLOUT) && !NETCONN_IS_DATAOUT(sock)) { - event &= ~EPOLLOUT; - } - - return (event) ? true : false; -} - -static inline bool report_events(struct lwip_sock *sock, uint32_t event) -{ - /* error event */ - if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) { - return true; - } - - if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) { - return false; - } - - return check_event_vaild(sock, event); -} +static PER_THREAD struct wakeup_poll g_wakeup_poll = {0}; +static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */ void add_epoll_event(struct netconn *conn, uint32_t event) { /* conn sock nerver null, because lwip call this func */ struct lwip_sock *sock = get_socket(conn->socket); - /* close_wait event should be (EPOLLRDHUP | EPOLLIN), but lwip is EPOLLERR */ - if (event == EPOLLERR && conn->pcb.tcp && conn->pcb.tcp->state == CLOSE_WAIT) { - event = EPOLLRDHUP | EPOLLIN | EPOLLERR; - } - if ((event & sock->epoll_events) == 0) { return; } + sock->events |= event & sock->epoll_events; - if (!sock->weakup || !report_events(sock, event)) { - return; +#ifdef GAZELLE_USE_EPOLL_EVENT_STACK + if (g_use_epoll && list_is_empty(&sock->event_list)) { + list_add_node(&sock->stack->event_list, &sock->event_list); } +#endif - if (weakup_enqueue(sock->stack->weakup_ring, sock)) { - if (list_is_empty(&sock->event_list)) { - list_add_node(&sock->stack->event_list, &sock->event_list); + if (sock->wakeup) { + sock->stack->stats.wakeup_events++; + if (get_protocol_stack_group()->wakeup_enable) { + rte_ring_sp_enqueue(sock->stack->wakeup_ring, &sock->wakeup->event_sem); + } else { + sem_post(&sock->wakeup->event_sem); } - } else { - __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); - sock->stack->stats.weakup_events++; } } -static void raise_pending_events(struct lwip_sock *sock) +static inline uint32_t update_events(struct lwip_sock *sock) { - struct weakup_poll *wakeup = sock->weakup; - struct protocol_stack *stack = sock->stack; - struct netconn *conn = sock->conn; - if (wakeup == NULL || stack == NULL || conn == NULL) { - return; + uint32_t event = 0; + + if (sock->epoll_events & EPOLLIN) { + if (sock->attach_fd > 0 && NETCONN_IS_ACCEPTIN(sock)) { + event |= EPOLLIN; + } + + if (sock->attach_fd < 0 && NETCONN_IS_DATAIN(sock)) { + event |= EPOLLIN; + } } - struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock; - if (attach_sock == NULL) { - return; + if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_OUTIDLE(sock)) { + event |= EPOLLOUT; } - conn = attach_sock->conn; - if (conn == NULL) { - return; + if ((sock->epoll_events & EPOLLERR) && (sock->events & EPOLLERR)) { + event |= EPOLLERR | EPOLLIN; } - struct tcp_pcb *tcp = conn->pcb.tcp; - if ((tcp == NULL) || (tcp->state < ESTABLISHED)) { + + return event; +} + +#ifdef GAZELLE_USE_EPOLL_EVENT_STACK +void update_stack_events(struct protocol_stack *stack) +{ + if (!g_use_epoll) { return; } - uint32_t event = 0; - if (sock->epoll_events & EPOLLIN) { - if (attach_sock->recv_lastdata || rte_ring_count(attach_sock->recv_ring) || NETCONN_IS_ACCEPTIN(attach_sock)) { - event |= EPOLLIN; - } - } + struct list_node *node, *temp; + list_for_each_safe(node, temp, &stack->event_list) { + struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list); - if (sock->epoll_events & EPOLLOUT) { - if ((attach_sock->sendevent > 0) || - ((tcp_sndbuf(conn->pcb.tcp) > TCP_SNDLOWAT) && (tcp_sndqueuelen(conn->pcb.tcp) < TCP_SNDQUEUELOWAT))) { - event |= EPOLLOUT; + sock->events = update_events(sock); + if (sock->events != 0) { + continue; } - } - if (attach_sock->errevent > 0) { - event |= POLLERR | POLLIN; + if (pthread_spin_trylock(&stack->event_lock)) { + continue; + } + list_del_node_init(&sock->event_list); + pthread_spin_unlock(&stack->event_lock); } +} +#endif - if (event == 0) { +static void raise_pending_events(struct lwip_sock *sock) +{ + struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock; + if (attach_sock == NULL) { return; } - attach_sock->events |= event; - if (rte_ring_mp_enqueue(wakeup->event_ring, (void *)sock) == 0 || - rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) { - sem_post(&wakeup->event_sem); - stack->stats.epoll_pending++; - } else { - rpc_call_addevent(stack, attach_sock); - stack->stats.epoll_pending_call++; + + attach_sock->events = update_events(attach_sock); + if (attach_sock->events & attach_sock->epoll_events) { + rpc_call_addevent(attach_sock->stack, attach_sock); } } @@ -166,34 +144,35 @@ int32_t lstack_epoll_create(int32_t size) GAZELLE_RETURN(EINVAL); } - struct weakup_poll *weakup = malloc(sizeof(struct weakup_poll)); - if (weakup == NULL) { + struct wakeup_poll *wakeup = malloc(sizeof(struct wakeup_poll)); + if (wakeup == NULL) { posix_api->close_fn(fd); GAZELLE_RETURN(EINVAL); } - memset_s(weakup, sizeof(struct weakup_poll), 0, sizeof(struct weakup_poll)); - sem_init(&weakup->event_sem, 0, 0); - - weakup->event_ring = create_ring("RING_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd); - if (weakup->event_ring == NULL) { - posix_api->close_fn(fd); - GAZELLE_RETURN(ENOMEM); - } - - weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd); - if (weakup->self_ring == NULL) { - posix_api->close_fn(fd); - GAZELLE_RETURN(ENOMEM); - } + memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll)); + sem_init(&wakeup->event_sem, 0, 0); - sock->weakup = weakup; + sock->wakeup = wakeup; + init_list_node(&wakeup->event_list); + g_use_epoll = true; return fd; } int32_t lstack_epoll_close(int32_t fd) { + struct lwip_sock *sock = get_socket_by_fd(fd); + if (sock == NULL) { + LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno); + GAZELLE_RETURN(EINVAL); + } + + if (sock->wakeup) { + free(sock->wakeup); + } + sock->wakeup = NULL; + return 0; } @@ -219,7 +198,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even } struct lwip_sock *epoll_sock = get_socket_by_fd(epfd); - if (epoll_sock == NULL || epoll_sock->weakup == NULL) { + if (epoll_sock == NULL || epoll_sock->wakeup == NULL) { LSTACK_LOG(ERR, LSTACK, "epfd=%d\n", fd); GAZELLE_RETURN(EINVAL); } @@ -228,7 +207,10 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even do { switch (op) { case EPOLL_CTL_ADD: - sock->weakup = epoll_sock->weakup; + sock->wakeup = epoll_sock->wakeup; + if (list_is_empty(&sock->event_list)) { + list_add_node(&sock->wakeup->event_list, &sock->event_list); + } /* fall through */ case EPOLL_CTL_MOD: sock->epoll_events = events; @@ -238,6 +220,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even } break; case EPOLL_CTL_DEL: + list_del_node_init(&sock->event_list); sock->epoll_events = 0; break; default: @@ -250,176 +233,234 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even return 0; } -static inline int32_t save_poll_event(struct pollfd *fds, uint32_t maxevents, int32_t fd, uint32_t events) +#ifdef GAZELLE_USE_EPOLL_EVENT_STACK +static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents) { int32_t event_num = 0; - for (uint32_t i = 0; i < maxevents; i++) { - /* fds[i].revents != 0, the events is kernel events */ - if (fds[i].fd == fd && fds[i].revents == 0) { - fds[i].revents = events; - event_num = 1; - break; + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + + maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents); + for (uint32_t i = 0; i < stack_group->stack_num && event_num < maxevents; i++) { + struct protocol_stack *stack = stack_group->stacks[i]; + int32_t start_event_num = event_num; + + if (pthread_spin_trylock(&stack->event_lock)) { + continue; + } + + struct list_node *node, *temp; + list_for_each_safe(node, temp, &stack->event_list) { + struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list); + + uint32_t event = sock->events & sock->epoll_events; + if (event == 0 || sock->wait_close) { + continue; + } + + events[event_num].events = event; + events[event_num].data = sock->ep_data; + event_num++; + + if (event_num >= maxevents) { + break; + } } + + pthread_spin_unlock(&stack->event_lock); + + __sync_fetch_and_add(&stack->stats.app_events, event_num - start_event_num); } return event_num; } - -static bool remove_event(enum POLL_TYPE etype, struct lwip_sock **sock_list, int32_t event_num, struct lwip_sock *sock, - struct lwip_sock *attach_sock) +#else +static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents) { - /* remove duplicate event */ - for (uint32_t i = 0; i < event_num && etype == TYPE_EPOLL; i++) { - if (sock_list[i] == sock) { - return true; + int32_t event_num = 0; + struct list_node *node, *temp; + list_for_each_safe(node, temp, &wakeup->event_list) { + struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list); + if (sock->conn == NULL) { + list_del_node_init(&sock->event_list); + continue; } + + struct lwip_sock *temp_sock = sock; + do { + struct lwip_sock *attach_sock = (temp_sock->attach_fd > 0) ? get_socket(temp_sock->attach_fd) : temp_sock; + if (attach_sock == NULL || temp_sock->wait_close) { + temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL; + continue; + } + + uint32_t event = update_events(attach_sock); + if (event != 0) { + events[event_num].events = event; + events[event_num].data = temp_sock->ep_data; + event_num++; + if (event_num >= maxevents) { + break; + } + } + + temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL; + } while (temp_sock); } - return !check_event_vaild(attach_sock, attach_sock->events); + return event_num; } +#endif -static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t maxevents, enum POLL_TYPE etype) +static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds) { - struct epoll_event *events = (struct epoll_event *)out; - struct pollfd *fds = (struct pollfd *)out; - - if (etype == TYPE_EPOLL) { - maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents); - } int32_t event_num = 0; - struct lwip_sock *sock = NULL; - while (event_num < maxevents) { - if (rte_ring_sc_dequeue(weakup->self_ring, (void **)&sock) && - rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock)) { - break; - } - __atomic_store_n(&sock->have_event, false, __ATOMIC_RELEASE); + for (uint32_t i = 0; i < nfds; i++) { + /* listenfd nextfd pointerto next stack listen, others nextfd=-1 */ + int32_t fd = fds[i].fd; + while (fd > 0) { + struct lwip_sock *sock = get_socket(fd); + if (sock == NULL) { + break; + } - /* sock->stack == NULL mean close sock */ - if (sock->stack == NULL) { - continue; - } + /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */ + struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock; + if (attach_sock == NULL || sock->wait_close) { + fd = sock->nextfd; + continue; + } - /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */ - struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock; - if (attach_sock == NULL) { - continue; - } + uint32_t events = update_events(attach_sock); + if (events) { + fds[i].revents = events; + __sync_fetch_and_add(&sock->stack->stats.app_events, 1); + event_num++; + break; + } - if (remove_event(etype, weakup->sock_list, event_num, sock, attach_sock)) { - sock->stack->stats.remove_event++; - continue; + fd = sock->nextfd; } + } - if (etype == TYPE_EPOLL) { - events[event_num].events = attach_sock->events; - events[event_num].data = sock->ep_data; - weakup->sock_list[event_num] = sock; - event_num++; - } else { - /* shadow_fd event notice listen_fd */ - if (attach_sock->shadowed_sock) { - attach_sock = attach_sock->shadowed_sock; - } + return event_num; +} - if (sock->conn) { - event_num += save_poll_event(fds, maxevents, sock->conn->socket, attach_sock->events); - } +static inline bool have_kernel_fd(int32_t epfd, struct pollfd *fds, nfds_t nfds) +{ + /* when epfd > 0 is epoll type */ + for (uint32_t i = 0; i < nfds && epfd < 0; i++) { + if (get_socket(fds[i].fd) == NULL) { + return true; } - - sock->stack->stats.app_events++; - sem_trywait(&weakup->event_sem); /* each event post sem, so every read down sem */ } - return event_num; + return false; } -static inline int32_t remove_kernel_invaild_events(struct pollfd *fds, int32_t nfds, int32_t event_count) +static inline int32_t poll_kernel_event(struct pollfd *fds, nfds_t nfds) { - int32_t real_count = 0; + int32_t event_num = 0; - for (int i = 0; i < nfds && real_count < event_count; i++) { - if (fds[i].fd < 0 || fds[i].revents == 0) { + for (uint32_t i = 0; i < nfds; i++) { + /* lwip event */ + if (get_socket(fds[i].fd) != NULL || fds[i].fd < 0) { continue; } - struct lwip_sock *sock = get_socket(fds[i].fd); - if (sock && CONN_TYPE_IS_LIBOS(sock->conn)) { - fds[i].revents = 0; + int32_t ret = posix_api->poll_fn(&fds[i], 1, 0); + if (ret < 0) { + if (errno != EINTR) { + return ret; + } } else { - real_count++; + event_num += ret; } } - return real_count; + return event_num; } -static int32_t poll_event(struct weakup_poll *weakup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout) +static int32_t get_event(struct wakeup_poll *wakeup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout) { - struct epoll_event *events = (struct epoll_event *)out; struct pollfd *fds = (struct pollfd *)out; + struct epoll_event *events = (struct epoll_event *)out; + bool have_kernel = have_kernel_fd(epfd, fds, maxevents); int32_t event_num = 0; - int32_t event_kernel_num = 0; - struct timespec epoll_interval = { - .tv_sec = 0, - .tv_nsec = EPOLL_INTERVAL_10MS, - }; - uint32_t start_time = sys_now(); + int32_t poll_time = 0; + int32_t ret; + /* when epfd > 0 is epoll type */ do { - /* epoll_wait type */ - if (epfd > 0) { - event_num += get_lwip_events(weakup, &events[event_num], maxevents - event_num, TYPE_EPOLL); - if (event_num >= maxevents) { - break; - } + event_num += (epfd > 0) ? epoll_lwip_event(wakeup, &events[event_num], maxevents - event_num) : + poll_lwip_event(fds, maxevents); - event_kernel_num = posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0); + if (have_kernel) { + int32_t event_kernel_num = (epfd > 0) ? + posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0) : + poll_kernel_event(fds, maxevents); if (event_kernel_num < 0) { - break; + return event_kernel_num; } event_num += event_kernel_num; - } else { - /* for poll events, we need to distiguish kernel events and gazelle events */ - event_kernel_num = posix_api->poll_fn(fds, maxevents, 0); - if (event_kernel_num < 0) { + if (timeout >= 0 && poll_time >= timeout) { break; } - event_kernel_num = remove_kernel_invaild_events(fds, maxevents, event_kernel_num); - event_num += event_kernel_num; - - event_num += get_lwip_events(weakup, fds, maxevents, TYPE_POLL); + poll_time += EPOLL_KERNEL_INTERVAL; } if (event_num > 0) { break; } - sem_timedwait(&weakup->event_sem, &epoll_interval); - if (timeout > 0) { - timeout = update_timeout(timeout, start_time); + int32_t interval = (have_kernel) ? EPOLL_KERNEL_INTERVAL : timeout; + struct timespec epoll_interval; + clock_gettime(CLOCK_REALTIME, &epoll_interval); + epoll_interval.tv_sec += interval / 1000; + epoll_interval.tv_nsec += (interval % 1000) * 1000000; + epoll_interval.tv_sec += epoll_interval.tv_nsec / 1000000000; + epoll_interval.tv_nsec = epoll_interval.tv_nsec % 1000000000; + + if (timeout < 0 && !have_kernel) { + ret = sem_wait(&wakeup->event_sem); + } else { + ret = sem_timedwait(&wakeup->event_sem, &epoll_interval); + } + + if (!have_kernel && ret < 0) { + break; } - } while (timeout != 0); + } while (event_num <= maxevents); - return (event_kernel_num < 0) ? event_kernel_num : event_num; + return event_num; } -static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *weakup) +int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout) { - int32_t stack_id = 0; - int32_t stack_count[PROTOCOL_STACK_MAX] = {0}; + /* avoid the starvation of epoll events from both netstack */ + maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents); - if (weakup->event_ring == NULL) { - weakup->event_ring = create_ring("POLL_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid()); - if (weakup->event_ring == NULL) { - GAZELLE_RETURN(ENOMEM); - } + struct lwip_sock *sock = get_socket_by_fd(epfd); + if (sock == NULL) { + GAZELLE_RETURN(EINVAL); + } - weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid()); - if (weakup->self_ring == NULL) { - GAZELLE_RETURN(ENOMEM); - } + if (sock->wakeup == NULL) { + return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); + } + + return get_event(sock->wakeup, epfd, events, maxevents, timeout); +} + +static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeup) +{ + int32_t stack_count[PROTOCOL_STACK_MAX] = {0}; + + if (!wakeup->init) { + wakeup->init = true; + sem_init(&wakeup->event_sem, 0, 0); + } else { + while (sem_trywait(&wakeup->event_sem) == 0) {} } for (uint32_t i = 0; i < nfds; i++) { @@ -432,51 +473,33 @@ static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *we break; } sock->epoll_events = fds[i].events | POLLERR; - sock->weakup = weakup; - - raise_pending_events(sock); - - stack_count[sock->stack->queue_id]++; + sock->wakeup = wakeup; /* listenfd list */ fd = sock->nextfd; + stack_count[sock->stack->queue_id]++; } while (fd > 0); } - for (uint32_t i = 0; i < get_protocol_stack_group()->stack_num; i++) { - if (stack_count[i] > stack_count[stack_id]) { - stack_id = i; - } - } - - bind_to_stack_numa(stack_id); - - return 0; -} - -int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout) -{ - /* avoid the starvation of epoll events from both netstack */ - maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents); - - struct lwip_sock *sock = get_socket_by_fd(epfd); - if (sock == NULL) { - GAZELLE_RETURN(EINVAL); + if (wakeup->bind_stack) { + return; } - if (sock->weakup == NULL) { - return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout); + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + uint32_t bind_id = 0; + for (uint32_t i = 0; i < stack_group->stack_num; i++) { + if (stack_count[i] > stack_count[bind_id]) { + bind_id = i; + } } - return poll_event(sock->weakup, epfd, events, maxevents, timeout); + bind_to_stack_numa(stack_group->stacks[bind_id]); + wakeup->bind_stack = stack_group->stacks[bind_id]; } int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout) { - int32_t ret = poll_init(fds, nfds, &g_weakup_poll); - if (ret != 0) { - return -1; - } + poll_init(fds, nfds, &g_wakeup_poll); - return poll_event(&g_weakup_poll, -1, fds, nfds, timeout); + return get_event(&g_wakeup_poll, -1, fds, nfds, timeout); } diff --git a/src/lstack/api/lstack_signal.c b/src/lstack/api/lstack_signal.c index 5e4af56..f4763e8 100644 --- a/src/lstack/api/lstack_signal.c +++ b/src/lstack/api/lstack_signal.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "lstack_log.h" diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c index 53712a8..13086a3 100644 --- a/src/lstack/core/lstack_cfg.c +++ b/src/lstack/core/lstack_cfg.c @@ -33,14 +33,14 @@ #include "gazelle_reg_msg.h" #include "lstack_log.h" #include "gazelle_base_func.h" -#include "gazelle_parse_config.h" #include "lstack_protocol_stack.h" +#include "gazelle_parse_config.h" #define DEFAULT_CONF_FILE "/etc/gazelle/lstack.conf" #define LSTACK_CONF_ENV "LSTACK_CONF_PATH" #define NUMA_CPULIST_PATH "/sys/devices/system/node/node%u/cpulist" #define DEV_MAC_LEN 17 -#define CPUS_RANGE_NUM 32 +#define CPUS_MAX_NUM 256 static struct cfg_params g_config_params; @@ -50,7 +50,7 @@ static int32_t parse_host_addr(void); static int32_t parse_low_power_mode(void); static int32_t parse_stack_cpu_number(void); static int32_t parse_use_ltran(void); -static int32_t parse_weakup_cpu_number(void); +static int32_t parse_wakeup_cpu_number(void); static int32_t parse_mask_addr(void); static int32_t parse_devices(void); static int32_t parse_dpdk_args(void); @@ -70,7 +70,7 @@ static struct config_vector_t g_config_tbl[] = { { "devices", parse_devices }, { "dpdk_args", parse_dpdk_args }, { "num_cpus", parse_stack_cpu_number }, - { "num_wakeup", parse_weakup_cpu_number }, + { "num_wakeup", parse_wakeup_cpu_number }, { "low_power_mode", parse_low_power_mode }, { "kni_switch", parse_kni_switch }, { NULL, NULL } @@ -240,7 +240,6 @@ static int32_t parse_stack_cpu_number(void) } g_config_params.num_cpu = cnt; - get_protocol_stack_group()->stack_num = g_config_params.num_cpu; return 0; } @@ -275,10 +274,10 @@ static int32_t numa_to_cpusnum(unsigned socket_id, uint32_t *cpulist, int32_t nu static int32_t stack_idle_cpuset(struct protocol_stack *stack, cpu_set_t *exclude) { - uint32_t cpulist[CPUS_RANGE_NUM]; + uint32_t cpulist[CPUS_MAX_NUM]; - int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_RANGE_NUM); - if (cpunum <= 0 ) { + int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_MAX_NUM); + if (cpunum <= 0) { LSTACK_LOG(ERR, LSTACK, "numa_to_cpusnum failed\n"); return -1; } @@ -308,7 +307,7 @@ int32_t init_stack_numa_cpuset(void) CPU_SET(cfg->cpus[idx], &stack_cpuset); } for (int32_t idx = 0; idx < cfg->num_wakeup; ++idx) { - CPU_SET(cfg->weakup[idx], &stack_cpuset); + CPU_SET(cfg->wakeup[idx], &stack_cpuset); } for (int32_t idx = 0; idx < stack_group->stack_num; ++idx) { @@ -621,7 +620,7 @@ static int32_t parse_low_power_mode(void) return 0; } -static int32_t parse_weakup_cpu_number(void) +static int32_t parse_wakeup_cpu_number(void) { const config_setting_t *cfg_args = NULL; const char *args = NULL; @@ -639,13 +638,19 @@ static int32_t parse_weakup_cpu_number(void) } char *tmp_arg = strdup(args); - int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.weakup, CFG_MAX_CPUS); + int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.wakeup, CFG_MAX_CPUS); free(tmp_arg); if (cnt <= 0 || cnt > CFG_MAX_CPUS) { return -EINVAL; } g_config_params.num_wakeup = cnt; + if (g_config_params.num_wakeup < g_config_params.num_cpu) { + LSTACK_PRE_LOG(LSTACK_ERR, "num_wakeup=%d less than num_stack_cpu=%d.\n", g_config_params.num_wakeup, + g_config_params.num_cpu); + return -EINVAL; + } + return 0; } diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index 430c6e5..3f446ea 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -28,6 +28,7 @@ #include #include #include +#include #include "lstack_log.h" #include "dpdk_common.h" @@ -109,35 +110,39 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_ char pool_name[PATH_MAX]; struct rte_mempool *pool; - ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id); + ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id); if (ret < 0) { return NULL; } /* time stamp before pbuf_custom as priv_data */ + pthread_mutex_lock(get_mem_mutex()); pool = rte_pktmbuf_pool_create(pool_name, nb_mbuf, mbuf_cache_size, sizeof(struct pbuf_custom) + GAZELLE_MBUFF_PRIV_SIZE, MBUF_SZ, rte_socket_id()); if (pool == NULL) { LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno); } + pthread_mutex_unlock(get_mem_mutex()); return pool; } -static struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id) +struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id) { char pool_name[PATH_MAX]; struct rte_mempool *pool; int32_t ret; - ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id); + ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id); if (ret < 0) { return NULL; } + pthread_mutex_lock(get_mem_mutex()); pool = rte_mempool_create(pool_name, CALL_POOL_SZ, sizeof(struct rpc_msg), 0, 0, NULL, NULL, NULL, NULL, rte_socket_id(), 0); if (pool == NULL) { LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno); } + pthread_mutex_unlock(get_mem_mutex()); return pool; } @@ -147,7 +152,7 @@ static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_ char pool_name[PATH_MAX]; struct reg_ring_msg *reg_buf; - ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id); + ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id); if (ret < 0) { return NULL; } @@ -167,21 +172,18 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num) return -1; } - stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, 0, stack->queue_id); + stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, RX_MBUF_CACHE_SZ, + stack->queue_id); if (stack->rx_pktmbuf_pool == NULL) { return -1; } - stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, 0, stack->queue_id); + stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, TX_MBUF_CACHE_SZ, + stack->queue_id); if (stack->tx_pktmbuf_pool == NULL) { return -1; } - stack->rpc_pool = create_rpc_mempool("rpc_msg", stack->queue_id); - if (stack->rpc_pool == NULL) { - return -1; - } - if (use_ltran()) { stack->reg_buf = create_reg_mempool("reg_ring_msg", stack->queue_id); if (stack->reg_buf == NULL) { @@ -214,16 +216,12 @@ int32_t create_shared_ring(struct protocol_stack *stack) { lockless_queue_init(&stack->rpc_queue); - stack->weakup_ring = create_ring("SHARED_WEAKUP_RING", VDEV_WEAKUP_QUEUE_SZ, 0, stack->queue_id); - if (stack->weakup_ring == NULL) { - return -1; - } - - stack->send_idle_ring = create_ring("SEND_IDLE_RING", VDEV_IDLE_QUEUE_SZ, 0, stack->queue_id); - if (stack->send_idle_ring == NULL) { - return -1; + if (get_protocol_stack_group()->wakeup_enable) { + stack->wakeup_ring = create_ring("WAKEUP_RING", VDEV_WAKEUP_QUEUE_SZ, 0, stack->queue_id); + if (stack->wakeup_ring == NULL) { + return -1; + } } - stack->in_replenish = 0; if (use_ltran()) { stack->rx_ring = create_ring("RING_RX", VDEV_RX_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ, stack->queue_id); @@ -328,8 +326,19 @@ static struct eth_params *alloc_eth_params(uint16_t port_id, uint16_t nb_queues) return eth_params; } +uint64_t get_eth_params_rx_ol(void) +{ + return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.rxmode.offloads; +} + +uint64_t get_eth_params_tx_ol(void) +{ + return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.txmode.offloads; +} + static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev_info) { +#if CHECKSUM_OFFLOAD_ALL uint64_t rx_ol = 0; uint64_t tx_ol = 0; @@ -337,43 +346,48 @@ static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_inf uint64_t tx_ol_capa = dev_info->tx_offload_capa; // rx ip - if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) { #if CHECKSUM_CHECK_IP_HW + if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) { rx_ol |= DEV_RX_OFFLOAD_IPV4_CKSUM; LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_IPV4_CKSUM\n"); -#endif } +#endif // rx tcp - if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) { #if CHECKSUM_CHECK_TCP_HW + if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) { rx_ol |= DEV_RX_OFFLOAD_TCP_CKSUM; LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_TCP_CKSUM\n"); -#endif } +#endif // tx ip - if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) { #if CHECKSUM_GEN_IP_HW + if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) { tx_ol |= DEV_TX_OFFLOAD_IPV4_CKSUM; LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_IPV4_CKSUM\n"); -#endif } +#endif // tx tcp - if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) { #if CHECKSUM_GEN_TCP_HW + if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) { tx_ol |= DEV_TX_OFFLOAD_TCP_CKSUM; LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_TCP_CKSUM\n"); + } #endif + if (!(rx_ol & DEV_RX_OFFLOAD_TCP_CKSUM) || !(rx_ol & DEV_RX_OFFLOAD_IPV4_CKSUM)) { + rx_ol = 0; + } + if (!(tx_ol & DEV_TX_OFFLOAD_TCP_CKSUM) || !(tx_ol & DEV_TX_OFFLOAD_IPV4_CKSUM)) { + tx_ol = 0; } conf->rxmode.offloads = rx_ol; conf->txmode.offloads = tx_ol; -#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW || CHECKSUM_GEN_IP_HW || CHECKSUM_GEN_TCP_HW LSTACK_LOG(INFO, LSTACK, "set checksum offloads\n"); -#endif +#endif /* CHECKSUM_OFFLOAD_ALL */ return 0; } @@ -580,3 +594,30 @@ void dpdk_skip_nic_init(void) } } +int32_t init_dpdk_ethdev(void) +{ + int32_t ret; + + ret = dpdk_ethdev_init(); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n"); + return -1; + } + + ret = dpdk_ethdev_start(); + if (ret < 0) { + LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n"); + return -1; + } + + if (get_global_cfg_params()->kni_switch) { + ret = dpdk_init_lstack_kni(); + if (ret < 0) { + return -1; + } + } + + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + sem_post(&stack_group->ethdev_init); + return 0; +} diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c index 17195c8..774d0f3 100644 --- a/src/lstack/core/lstack_init.c +++ b/src/lstack/core/lstack_init.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "lstack_cfg.h" #include "lstack_control_plane.h" @@ -225,16 +226,18 @@ __attribute__((constructor)) void gazelle_network_init(void) lstack_log_level_init(); - /* - * Phase 8: memory and nic */ ret = init_protocol_stack(); if (ret != 0) { LSTACK_EXIT(1, "init_protocol_stack failed\n"); } - ret = create_stack_thread(); - if (ret != 0) { - LSTACK_EXIT(1, "create_stack_thread failed\n"); + /* + * Phase 8: nic */ + if (!use_ltran()) { + ret = init_dpdk_ethdev(); + if (ret != 0) { + LSTACK_EXIT(1, "init_dpdk_ethdev failed\n"); + } } /* diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index b4d75d2..887464d 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -25,7 +26,6 @@ #include "lstack_ethdev.h" #include "lstack_protocol_stack.h" #include "lstack_log.h" -#include "lstack_weakup.h" #include "lstack_dpdk.h" #include "lstack_stack_stat.h" #include "lstack_lwip.h" @@ -49,37 +49,82 @@ void listen_list_add_node(int32_t head_fd, int32_t add_fd) sock->nextfd = add_fd; } +static void free_ring_pbuf(struct rte_ring *ring) +{ + while (1) { + struct pbuf *pbuf = NULL; + int32_t ret = rte_ring_sc_dequeue(ring, (void **)&pbuf); + if (ret != 0) { + break; + } + + pbuf_free(pbuf); + } +} + static void reset_sock_data(struct lwip_sock *sock) { /* check null pointer in ring_free func */ if (sock->recv_ring) { + free_ring_pbuf(sock->recv_ring); rte_ring_free(sock->recv_ring); } sock->recv_ring = NULL; + if (sock->recv_wait_free) { + free_ring_pbuf(sock->recv_wait_free); + rte_ring_free(sock->recv_wait_free); + } + sock->recv_wait_free = NULL; + if (sock->send_ring) { + free_ring_pbuf(sock->send_ring); rte_ring_free(sock->send_ring); } sock->send_ring = NULL; + if (sock->send_idle_ring) { + free_ring_pbuf(sock->send_idle_ring); + rte_ring_free(sock->send_idle_ring); + } + sock->send_idle_ring = NULL; + sock->stack = NULL; - sock->weakup = NULL; + sock->wakeup = NULL; sock->events = 0; sock->nextfd = -1; sock->attach_fd = -1; sock->wait_close = false; - sock->have_event = false; - sock->have_rpc_send = false; sock->shadowed_sock = NULL; + sock->epoll_events = 0; + sock->events = 0; if (sock->recv_lastdata) { pbuf_free(sock->recv_lastdata); - sock->recv_lastdata = NULL; } + sock->recv_lastdata = NULL; if (sock->send_lastdata) { pbuf_free(sock->send_lastdata); - sock->send_lastdata = NULL; + } + sock->send_lastdata = NULL; +} + +static void replenish_send_idlembuf(struct rte_ring *ring) +{ + uint32_t replenish_cnt = rte_ring_free_count(ring); + + for (uint32_t i = 0; i < replenish_cnt; i++) { + struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM); + if (pbuf == NULL) { + break; + } + + int32_t ret = rte_ring_sp_enqueue(ring, (void *)pbuf); + if (ret < 0) { + pbuf_free(pbuf); + break; + } } } @@ -99,19 +144,31 @@ void gazelle_init_sock(int32_t fd) return; } + sock->recv_wait_free = create_ring("wait_free", SOCK_RECV_RING_SIZE, 0, name_tick++); + if (sock->recv_wait_free == NULL) { + LSTACK_LOG(ERR, LSTACK, "wait_free create failed. errno: %d.\n", rte_errno); + return; + } + sock->send_ring = create_ring("sock_send", SOCK_SEND_RING_SIZE, 0, name_tick++); if (sock->send_ring == NULL) { LSTACK_LOG(ERR, LSTACK, "sock_send create failed. errno: %d.\n", rte_errno); return; } + sock->send_idle_ring = create_ring("idle_send", SOCK_SEND_RING_SIZE, 0, name_tick++); + if (sock->send_idle_ring == NULL) { + LSTACK_LOG(ERR, LSTACK, "idle_send create failed. errno: %d.\n", rte_errno); + return; + } + replenish_send_idlembuf(sock->send_idle_ring); + sock->stack = get_protocol_stack(); sock->stack->conn_num++; init_list_node(&sock->recv_list); init_list_node(&sock->attach_list); init_list_node(&sock->listen_list); init_list_node(&sock->event_list); - init_list_node(&sock->wakeup_list); init_list_node(&sock->send_list); } @@ -129,7 +186,9 @@ void gazelle_clean_sock(int32_t fd) list_del_node_init(&sock->recv_list); list_del_node_init(&sock->attach_list); list_del_node_init(&sock->listen_list); +#ifdef GAZELLE_USE_EPOLL_EVENT_STACK list_del_node_init(&sock->event_list); +#endif list_del_node_init(&sock->send_list); } @@ -206,101 +265,60 @@ struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type) void *data = rte_pktmbuf_mtod(mbuf, void *); struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ); - return pbuf; -} - -void stack_replenish_send_idlembuf(struct protocol_stack *stack) -{ - uint32_t replenish_cnt = rte_ring_free_count(stack->send_idle_ring); - - for (uint32_t i = 0; i < replenish_cnt; i++) { - struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM); - if (pbuf == NULL) { - break; - } - - int32_t ret = rte_ring_sp_enqueue(stack->send_idle_ring, (void *)pbuf); - if (ret < 0) { - gazelle_free_pbuf(pbuf); - break; - } +#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW + if (pbuf) { + pbuf->ol_flags = 0; + pbuf->l2_len = 0; + pbuf->l3_len = 0; } +#endif + + return pbuf; } -ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags) +struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags) { struct pbuf *pbuf = NULL; - ssize_t send_ret = 0; - ssize_t send_len = 0; - do { - if (sock->send_lastdata) { - pbuf = sock->send_lastdata; - sock->send_lastdata = NULL; - } else { - int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf); - if (ret != 0) { - break; - } - } - - if (sock->conn == NULL || sock->conn->pcb.tcp == NULL) { - GAZELLE_RETURN(ENOENT); - } - - uint16_t available = tcp_sndbuf(sock->conn->pcb.tcp); - if (available < pbuf->tot_len) { - sock->send_lastdata = pbuf; - break; - } - - ssize_t pbuf_len = pbuf->tot_len; - send_ret = lwip_send(fd, pbuf, pbuf->tot_len, flags); - if (send_ret > 0) { - send_len += send_ret; - } - if (send_ret != pbuf_len) { - sock->stack->stats.write_lwip_drop++; - break; + if (sock->send_lastdata) { + pbuf = sock->send_lastdata; + sock->send_lastdata = NULL; + } else { + int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf); + if (ret != 0) { + *apiflags &= ~TCP_WRITE_FLAG_MORE; + return NULL; } + } - sock->stack->stats.write_lwip_cnt++; - } while (1); - - return (send_ret < 0) ? send_ret : send_len; -} - -void add_self_event(struct lwip_sock *sock, uint32_t events) -{ - struct weakup_poll *wakeup = sock->weakup; - struct protocol_stack *stack = sock->stack; - if (wakeup == NULL || stack == NULL) { - return; + if (pbuf->tot_len >= remain_size) { + sock->send_lastdata = pbuf; + *apiflags |= TCP_WRITE_FLAG_MORE; /* set TCP_PSH flag */ + return NULL; } - sock->events |= events; + replenish_send_idlembuf(sock->send_idle_ring); - if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) { - return; + if ((sock->epoll_events & EPOLLOUT) && rte_ring_free_count(sock->send_ring)) { + add_epoll_event(sock->conn, EPOLLOUT); } - if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) { - __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); - sem_post(&sock->weakup->event_sem); - stack->stats.epoll_self_event++; - } else { - rpc_call_addevent(stack, sock); - stack->stats.epoll_self_call++; - } + sock->stack->stats.write_lwip_cnt++; + return pbuf; } ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) { + if (sock->events & EPOLLERR) { + return 0; + } + uint32_t free_count = rte_ring_free_count(sock->send_ring); if (free_count == 0) { - GAZELLE_RETURN(EAGAIN); + return -1; } - uint32_t avaible_cont = rte_ring_count(sock->stack->send_idle_ring); + + uint32_t avaible_cont = rte_ring_count(sock->send_idle_ring); avaible_cont = LWIP_MIN(free_count, avaible_cont); struct pbuf *pbuf = NULL; @@ -309,7 +327,7 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) uint32_t send_pkt = 0; while (send_len < len && send_pkt < avaible_cont) { - int32_t ret = rte_ring_sc_dequeue(sock->stack->send_idle_ring, (void **)&pbuf); + int32_t ret = rte_ring_sc_dequeue(sock->send_idle_ring, (void **)&pbuf); if (ret < 0) { sock->stack->stats.app_write_idlefail++; break; @@ -322,28 +340,16 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) ret = rte_ring_sp_enqueue(sock->send_ring, pbuf); if (ret != 0) { sock->stack->stats.app_write_drop++; - gazelle_free_pbuf(pbuf); + pbuf_free(pbuf); break; } - sock->stack->stats.app_write_cnt++; send_len += copy_len; send_pkt++; } + __sync_fetch_and_add(&sock->stack->stats.app_write_cnt, send_pkt); - if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) { - add_self_event(sock, EPOLLOUT); - sock->stack->stats.write_events++; - } else { - sock->events &= ~EPOLLOUT; - } - - if (rte_ring_free_count(sock->stack->send_idle_ring) > USED_IDLE_WATERMARK && !sock->stack->in_replenish) { - sock->stack->in_replenish = true; - rpc_call_replenish_idlembuf(sock->stack); - } - - return send_len; + return (send_len <= 0) ? -1 : send_len; } void stack_send(struct rpc_msg *msg) @@ -351,27 +357,62 @@ void stack_send(struct rpc_msg *msg) int32_t fd = msg->args[MSG_ARG_0].i; int32_t flags = msg->args[MSG_ARG_2].i; - struct protocol_stack *stack = get_protocol_stack(); struct lwip_sock *sock = get_socket(fd); if (sock == NULL) { msg->result = -1; return; } - msg->result = write_lwip_data(sock, fd, flags); - __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE); + if (!NETCONN_IS_DATAOUT(sock)) { + return; + } - if (msg->result >= 0 && - (rte_ring_count(sock->send_ring) || sock->send_lastdata)) { + /* send all send_ring, so len set lwip send max. */ + ssize_t len = lwip_send(fd, sock, UINT16_MAX, flags); + if (len == 0) { + /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */ + add_epoll_event(sock->conn, EPOLLERR); + } + + /* have remain data add sendlist */ + if (NETCONN_IS_DATAOUT(sock)) { if (list_is_empty(&sock->send_list)) { - __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); - list_add_node(&stack->send_list, &sock->send_list); - sock->stack->stats.send_self_rpc++; + sock->send_flags = flags; + list_add_node(&sock->stack->send_list, &sock->send_list); } + sock->stack->stats.send_self_rpc++; } +} - if (rte_ring_free_count(sock->send_ring)) { - add_epoll_event(sock->conn, EPOLLOUT); +void send_stack_list(struct protocol_stack *stack, uint32_t send_max) +{ + struct list_node *node, *temp; + struct lwip_sock *sock; + uint32_t read_num = 0; + + list_for_each_safe(node, temp, &stack->send_list) { + sock = container_of(node, struct lwip_sock, send_list); + + if (sock->conn == NULL || !NETCONN_IS_DATAOUT(sock)) { + list_del_node_init(&sock->send_list); + continue; + } + + /* send all send_ring, so len set lwip send max. */ + ssize_t len = lwip_send(sock->conn->socket, sock, UINT16_MAX, sock->send_flags); + if (len == 0) { + /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */ + add_epoll_event(sock->conn, EPOLLERR); + list_del_node_init(&sock->send_list); + } + + if (!NETCONN_IS_DATAOUT(sock)) { + list_del_node_init(&sock->send_list); + } + + if (++read_num >= send_max) { + break; + } } } @@ -381,6 +422,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) return 0; } + if (rte_ring_count(sock->recv_wait_free)) { + free_ring_pbuf(sock->recv_wait_free); + } + uint32_t free_count = rte_ring_free_count(sock->recv_ring); uint32_t data_count = rte_ring_count(sock->conn->recvmbox->ring); uint32_t read_max = LWIP_MIN(free_count, data_count); @@ -411,6 +456,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) read_count++; } + if (get_protocol_stack_group()->latency_start) { + calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_LWIP); + } + recv_len += pbuf->len; /* once we have some data to return, only add more if we don't need to wait */ @@ -425,6 +474,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags) add_epoll_event(sock->conn, EPOLLIN); } sock->stack->stats.read_lwip_cnt += read_count; + + if (recv_len == 0) { + GAZELLE_RETURN(EAGAIN); + } return recv_len; } @@ -440,7 +493,7 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags) if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) || ((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) || ((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) { - GAZELLE_RETURN(EINVAL); + GAZELLE_RETURN(EINVAL); } buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len); } @@ -479,16 +532,14 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags) sock->send_flags = flags; ssize_t send = write_stack_data(sock, buf, len); - - ssize_t ret = 0; - if (!__atomic_load_n(&sock->have_rpc_send, __ATOMIC_ACQUIRE)) { - __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); - ret = rpc_call_send(fd, buf, len, flags); - } - - if (send <= 0 || ret < 0) { + if (send < 0) { GAZELLE_RETURN(EAGAIN); + } else if (send == 0) { + return 0; } + rte_smp_mb(); + + rpc_call_send(fd, NULL, send, flags); return send; } @@ -505,7 +556,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags) if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) || ((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) || ((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) { - GAZELLE_RETURN(EINVAL); + GAZELLE_RETURN(EINVAL); } buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len); } @@ -513,7 +564,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags) for (i = 0; i < message->msg_iovlen; i++) { ret = gazelle_send(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags); if (ret < 0) { - return buflen == 0 ? ret : buflen; + return buflen == 0 ? ret : buflen; } buflen += ret; } @@ -536,6 +587,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) } sock->recv_flags = flags; + if ((sock->events & EPOLLERR) && !NETCONN_IS_DATAIN(sock)) { + return 0; + } + while (recv_left > 0) { if (sock->recv_lastdata) { pbuf = sock->recv_lastdata; @@ -556,22 +611,18 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags) if (pbuf->tot_len > copy_len) { sock->recv_lastdata = pbuf_free_header(pbuf, copy_len); } else { - pbuf_free(pbuf); - sock->recv_lastdata = NULL; - sock->stack->stats.app_read_cnt++; if (get_protocol_stack_group()->latency_start) { calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ); } + ret = rte_ring_sp_enqueue(sock->recv_wait_free, pbuf); + if (ret != 0) { + pbuf_free(pbuf); + } + sock->recv_lastdata = NULL; + __sync_fetch_and_add(&sock->stack->stats.app_read_cnt, 1); } } - if ((sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) { - add_self_event(sock, EPOLLIN); - sock->stack->stats.read_events++; - } else { - sock->events &= ~EPOLLIN; - } - if (recvd == 0) { sock->stack->stats.read_null++; GAZELLE_RETURN(EAGAIN); @@ -588,30 +639,32 @@ void add_recv_list(int32_t fd) } } -void read_recv_list(void) +void read_recv_list(struct protocol_stack *stack, uint32_t max_num) { - struct protocol_stack *stack = get_protocol_stack(); struct list_node *list = &(stack->recv_list); struct list_node *node, *temp; struct lwip_sock *sock; - struct lwip_sock *first_sock = NULL; + uint32_t read_num = 0; list_for_each_safe(node, temp, list) { sock = container_of(node, struct lwip_sock, recv_list); - /* when read_lwip_data have data wait to read, add sock into recv_list. read_recv_list read this sock again. - this is dead loop. so every sock just read one time */ - if (sock == first_sock) { - break; - } - if (first_sock == NULL) { - first_sock = sock; + if (sock->conn == NULL || sock->recv_ring == NULL || sock->send_ring == NULL || sock->conn->pcb.tcp == NULL) { + list_del_node_init(&sock->recv_list); + continue; } - /* recv_ring and send_ring maybe create fail, so check here */ - if (sock->conn && sock->recv_ring && sock->send_ring && rte_ring_free_count(sock->recv_ring)) { + if (rte_ring_free_count(sock->recv_ring)) { list_del_node_init(&sock->recv_list); - lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags); + ssize_t len = lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags); + if (len == 0) { + /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */ + add_epoll_event(sock->conn, EPOLLERR); + } + } + + if (++read_num >= max_num) { + break; } } } @@ -633,11 +686,13 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s struct lwip_sock *sock = get_socket(netconn->socket); if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) { conn->recv_ring_cnt = rte_ring_count(sock->recv_ring); + conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0; + conn->send_ring_cnt = rte_ring_count(sock->send_ring); - struct weakup_poll *weakup = sock->weakup; - if (weakup) { - conn->event_ring_cnt = rte_ring_count(weakup->event_ring); - conn->self_ring_cnt = rte_ring_count(weakup->self_ring); + conn->send_ring_cnt += (sock->send_lastdata) ? 1 : 0; + + if (sock->wakeup) { + sem_getvalue(&sock->wakeup->event_sem, &conn->sem_cnt); } } } @@ -786,11 +841,6 @@ static uint32_t get_list_count(struct list_node *list) return count; } -void stack_wakeuplist_count(struct rpc_msg *msg) -{ - msg->result = get_list_count(get_protocol_stack()->wakeup_list); -} - void stack_eventlist_count(struct rpc_msg *msg) { msg->result = get_list_count(&get_protocol_stack()->event_list); diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index e5761a4..da320e2 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -28,29 +29,34 @@ #include "lstack_lwip.h" #include "lstack_protocol_stack.h" #include "lstack_cfg.h" -#include "lstack_weakup.h" #include "lstack_control_plane.h" #include "lstack_stack_stat.h" +#define READ_LIST_MAX 32 +#define SEND_LIST_MAX 32 +#define HANDLE_RPC_MSG_MAX 32 + static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX; static struct protocol_stack_group g_stack_group = {0}; static PER_THREAD long g_stack_tid = 0; +static pthread_mutex_t g_mem_mutex = PTHREAD_MUTEX_INITIALIZER; typedef void *(*stack_thread_func)(void *arg); -int32_t bind_to_stack_numa(int32_t stack_id) +#ifdef GAZELLE_USE_EPOLL_EVENT_STACK +void update_stack_events(struct protocol_stack *stack); +#endif + +pthread_mutex_t *get_mem_mutex(void) { - static PER_THREAD int32_t last_stack_id = -1; + return &g_mem_mutex; +} +int32_t bind_to_stack_numa(struct protocol_stack *stack) +{ int32_t ret; - struct protocol_stack *stack = get_protocol_stack_group()->stacks[stack_id]; pthread_t tid = pthread_self(); - if (last_stack_id == stack_id) { - return 0; - } - last_stack_id = stack_id; - ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset); if (ret != 0) { LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %d failed\n", rte_gettid(), stack->queue_id); @@ -159,88 +165,27 @@ void lstack_low_power_idling(void) } } -int32_t init_protocol_stack(void) +static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_func func) { - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - struct protocol_stack *stack = NULL; + /* thread may run slow, if arg is temp var maybe have relese */ + static uint16_t queue[PROTOCOL_STACK_MAX]; + char name[PATH_MAX]; + pthread_t tid; int32_t ret; - for (uint32_t i = 0; i < stack_group->stack_num; i++) { - stack = malloc(sizeof(*stack)); - if (stack == NULL) { - return -ENOMEM; - } - memset_s(stack, sizeof(*stack), 0, sizeof(*stack)); - - stack->queue_id = i; - stack->port_id = stack_group->port_id; - stack->cpu_id = get_global_cfg_params()->cpus[i]; - stack->socket_id = numa_node_of_cpu(stack->cpu_id); - if (stack->socket_id < 0) { - LSTACK_LOG(ERR, PORT, "numa_node_of_cpu failed\n"); - return -EINVAL; - } - - ret = pktmbuf_pool_init(stack, stack_group->stack_num); - if (ret != 0) { - return ret; - } - - ret = create_shared_ring(stack); - if (ret != 0) { - return ret; - } - - init_list_node(&stack->recv_list); - init_list_node(&stack->listen_list); - init_list_node(&stack->event_list); - init_list_node(&stack->send_list); - - stack_group->stacks[i] = stack; - } - - ret = init_stack_numa_cpuset(); - if (ret < 0) { + if (queue_id >= PROTOCOL_STACK_MAX) { + LSTACK_LOG(ERR, LSTACK, "queue_id is %d exceed max=%d\n", queue_id, PROTOCOL_STACK_MAX); return -1; } + queue[queue_id] = queue_id; - if (!use_ltran()) { - ret = dpdk_ethdev_init(); - if (ret != 0) { - LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n"); - return -1; - } - - ret = dpdk_ethdev_start(); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n"); - return -1; - } - - if (get_global_cfg_params()->kni_switch) { - ret = dpdk_init_lstack_kni(); - if (ret < 0) { - return -1; - } - } - } - - return 0; -} - -static int32_t create_thread(struct protocol_stack *stack, char *thread_name, stack_thread_func func) -{ - char name[PATH_MAX]; - pthread_t tid; - int32_t ret; - - ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, stack->queue_id); + ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, queue[queue_id]); if (ret < 0) { LSTACK_LOG(ERR, LSTACK, "set name failed\n"); return -1; } - ret = pthread_create(&tid, NULL, func, stack); + ret = pthread_create(&tid, NULL, func, &queue[queue_id]); if (ret != 0) { LSTACK_LOG(ERR, LSTACK, "pthread_create ret=%d\n", ret); return -1; @@ -257,148 +202,185 @@ static int32_t create_thread(struct protocol_stack *stack, char *thread_name, st static void* gazelle_weakup_thread(void *arg) { - struct protocol_stack *stack = (struct protocol_stack *)arg; - int32_t lcore_id = get_global_cfg_params()->weakup[stack->queue_id]; + uint16_t queue_id = *(uint16_t *)arg; + struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id]; + int32_t lcore_id = get_global_cfg_params()->wakeup[stack->queue_id]; thread_affinity_init(lcore_id); - LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id); - struct list_node wakeup_list; - init_list_node(&wakeup_list); - stack->wakeup_list = &wakeup_list; + LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id); for (;;) { - wakeup_list_sock(&wakeup_list); + sem_t *event_sem; + if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) { + continue; + } - weakup_thread(stack->weakup_ring, &wakeup_list); + sem_post(event_sem); } return NULL; } -static void stack_thread_init(struct protocol_stack *stack) +static void init_stack_value(struct protocol_stack *stack, uint16_t queue_id) { - uint16_t queue_id = stack->queue_id; - int32_t ret; + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + + memset_s(stack, sizeof(*stack), 0, sizeof(*stack)); set_stack_idx(queue_id); stack->tid = gettid(); + stack->queue_id = queue_id; + stack->port_id = stack_group->port_id; + stack->cpu_id = get_global_cfg_params()->cpus[queue_id]; stack->lwip_stats = &lwip_stats; - RTE_PER_LCORE(_lcore_id) = stack->cpu_id; - thread_affinity_init(stack->cpu_id); + init_list_node(&stack->recv_list); + init_list_node(&stack->listen_list); + init_list_node(&stack->event_list); + init_list_node(&stack->send_list); + + pthread_spin_init(&stack->event_lock, PTHREAD_PROCESS_SHARED); sys_calibrate_tsc(); + stack_stat_init(); - hugepage_init(); + stack_group->stacks[queue_id] = stack; +} - stack_replenish_send_idlembuf(stack); +static struct protocol_stack * stack_thread_init(uint16_t queue_id) +{ + struct protocol_stack_group *stack_group = get_protocol_stack_group(); - tcpip_init(NULL, NULL); + struct protocol_stack *stack = malloc(sizeof(*stack)); + if (stack == NULL) { + LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n"); + return NULL; + } + init_stack_value(stack, queue_id); - if (use_ltran()) { - ret = client_reg_thrd_ring(); - if (ret != 0) { - LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret); - } + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(stack->cpu_id, &cpuset); + if (rte_thread_set_affinity(&cpuset) != 0) { + LSTACK_LOG(ERR, LSTACK, "rte_thread_set_affinity failed\n"); + free(stack); + return NULL; } + RTE_PER_LCORE(_lcore_id) = stack->cpu_id; - ret = ethdev_init(stack); + stack->socket_id = numa_node_of_cpu(stack->cpu_id); + if (stack->socket_id < 0) { + LSTACK_LOG(ERR, LSTACK, "numa_node_of_cpu failed\n"); + free(stack); + return NULL; + } + + int32_t ret = pktmbuf_pool_init(stack, stack_group->stack_num); if (ret != 0) { - LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret); + free(stack); + return NULL; } - stack_stat_init(); + ret = create_shared_ring(stack); + if (ret != 0) { + free(stack); + return NULL; + } - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - sem_post(&stack_group->thread_inited); - LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id); -} + thread_affinity_init(stack->cpu_id); -static void report_stack_event(struct protocol_stack *stack) -{ - struct list_node *list = &(stack->event_list); - struct list_node *node, *temp; - struct lwip_sock *sock; + hugepage_init(); - list_for_each_safe(node, temp, list) { - sock = container_of(node, struct lwip_sock, event_list); + tcpip_init(NULL, NULL); - if (weakup_enqueue(stack->weakup_ring, sock) == 0) { - __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE); - list_del_node_init(&sock->event_list); - stack->stats.weakup_events++; - } else { - break; + if (use_ltran()) { + ret = client_reg_thrd_ring(); + if (ret != 0) { + free(stack); + return NULL; } } -} - -static void send_stack_list(struct protocol_stack *stack) -{ - struct list_node *list = &(stack->send_list); - struct list_node *node, *temp; - struct lwip_sock *sock; - list_for_each_safe(node, temp, list) { - sock = container_of(node, struct lwip_sock, send_list); + sem_post(&stack_group->thread_phase1); - if (sock->conn == NULL || sock->stack == NULL) { - list_del_node_init(&sock->send_list); - continue; - } + int32_t sem_val; + do { + sem_getvalue(&stack_group->ethdev_init, &sem_val); + } while (!sem_val && !use_ltran()); - ssize_t ret = write_lwip_data(sock, sock->conn->socket, sock->send_flags); - __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE); - if (ret >= 0 && - (rte_ring_count(sock->send_ring) || sock->send_lastdata)) { - __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE); - } else { - list_del_node_init(&sock->send_list); - } + ret = ethdev_init(stack); + if (ret != 0) { + free(stack); + return NULL; + } - if (rte_ring_free_count(sock->send_ring)) { - add_epoll_event(sock->conn, EPOLLOUT); + if (stack_group->wakeup_enable) { + ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread); + if (ret != 0) { + free(stack); + return NULL; } } + + return stack; } static void* gazelle_stack_thread(void *arg) { - struct protocol_stack *stack = (struct protocol_stack *)arg; + uint16_t queue_id = *(uint16_t *)arg; - stack_thread_init(stack); + struct protocol_stack *stack = stack_thread_init(queue_id); + if (stack == NULL) { + pthread_mutex_lock(&g_mem_mutex); + LSTACK_EXIT(1, "stack_thread_init failed\n"); + pthread_mutex_unlock(&g_mem_mutex); + } + LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id); for (;;) { - poll_rpc_msg(stack); + poll_rpc_msg(stack, HANDLE_RPC_MSG_MAX); eth_dev_poll(); - read_recv_list(); + read_recv_list(stack, READ_LIST_MAX); - sys_timer_run(); + send_stack_list(stack, SEND_LIST_MAX); - report_stack_event(stack); + sys_timer_run(); - send_stack_list(stack); +#ifdef GAZELLE_USE_EPOLL_EVENT_STACK + update_stack_events(stack); +#endif } return NULL; } -int32_t create_stack_thread(void) +int32_t init_protocol_stack(void) { struct protocol_stack_group *stack_group = get_protocol_stack_group(); int32_t ret; - ret = sem_init(&stack_group->thread_inited, 0, 0); + stack_group->stack_num = get_global_cfg_params()->num_cpu; + stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false; + + if (!use_ltran()) { + ret = sem_init(&stack_group->ethdev_init, 0, 0); + if (ret < 0) { + LSTACK_LOG(ERR, PORT, "sem_init failed\n"); + return -1; + } + } + + ret = sem_init(&stack_group->thread_phase1, 0, 0); if (ret < 0) { LSTACK_LOG(ERR, PORT, "sem_init failed\n"); return -1; } for (uint32_t i = 0; i < stack_group->stack_num; i++) { - ret = create_thread(stack_group->stacks[i], "gazellestack", gazelle_stack_thread); + ret = create_thread(i, "gazellestack", gazelle_stack_thread); if (ret != 0) { return ret; } @@ -406,14 +388,12 @@ int32_t create_stack_thread(void) int32_t thread_inited_num; do { - sem_getvalue(&stack_group->thread_inited, &thread_inited_num); + sem_getvalue(&stack_group->thread_phase1, &thread_inited_num); } while (thread_inited_num < stack_group->stack_num); - for (uint32_t i = 0; i < stack_group->stack_num; i++) { - ret = create_thread(stack_group->stacks[i], "gazelleweakup", gazelle_weakup_thread); - if (ret != 0) { - return ret; - } + ret = init_stack_numa_cpuset(); + if (ret < 0) { + return -1; } return 0; @@ -440,7 +420,6 @@ static inline bool is_real_close(int32_t fd) /* last sock */ if (list_is_empty(&sock->attach_list)) { - list_del_node_init(&sock->attach_list); return true; } @@ -557,24 +536,6 @@ void stack_listen(struct rpc_msg *msg) } } -static bool have_accept_event(int32_t fd) -{ - do { - struct lwip_sock *sock = get_socket(fd); - if (sock == NULL) { - break; - } - - if (NETCONN_IS_ACCEPTIN(sock)) { - return true; - } - - fd = sock->nextfd; - } while (fd > 0); - - return false; -} - void stack_accept(struct rpc_msg *msg) { int32_t fd = msg->args[MSG_ARG_0].i; @@ -593,7 +554,7 @@ void stack_accept(struct rpc_msg *msg) } LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d attach_fd %d failed %d\n", get_stack_tid(), msg->args[MSG_ARG_0].i, - fd, accept_fd); + fd, accept_fd); msg->result = -1; } @@ -768,13 +729,11 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen) { struct lwip_sock *sock = get_socket(fd); - if (sock == NULL) { + if (sock == NULL || sock->attach_fd < 0) { errno = EINVAL; return -1; } - fd = sock->attach_fd; - int32_t head_fd = fd; struct lwip_sock *min_sock = NULL; int32_t min_fd = fd; @@ -783,15 +742,19 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add if (sock == NULL) { GAZELLE_RETURN(EINVAL); } + struct lwip_sock *attach_sock = get_socket(sock->attach_fd); + if (attach_sock == NULL) { + GAZELLE_RETURN(EINVAL); + } - if (!NETCONN_IS_ACCEPTIN(sock)) { + if (!NETCONN_IS_ACCEPTIN(attach_sock)) { fd = sock->nextfd; continue; } - if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) { - min_sock = sock; - min_fd = fd; + if (min_sock == NULL || min_sock->stack->conn_num > attach_sock->stack->conn_num) { + min_sock = attach_sock; + min_fd = sock->attach_fd; } fd = sock->nextfd; @@ -802,13 +765,7 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add ret = rpc_call_accept(min_fd, addr, addrlen); } - if (have_accept_event(head_fd)) { - add_self_event(sock, EPOLLIN); - sock = get_socket(head_fd); - sock->stack->stats.accept_events++; - } - - if(ret < 0) { + if (ret < 0) { errno = EAGAIN; } return ret; diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c index 1813906..743857f 100644 --- a/src/lstack/core/lstack_stack_stat.c +++ b/src/lstack/core/lstack_stack_stat.c @@ -105,8 +105,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ lstack_get_low_power_info(&dfx->low_power_info); memcpy_s(&dfx->data.pkts, sizeof(dfx->data.pkts), &stack->stats, sizeof(dfx->data.pkts)); dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail; - dfx->data.pkts.weakup_ring_cnt = rte_ring_count(stack->weakup_ring); - dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring); int32_t rpc_call_result = rpc_call_msgcnt(stack); dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; @@ -120,10 +118,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ rpc_call_result = rpc_call_sendlistcnt(stack); dfx->data.pkts.send_list = (rpc_call_result < 0) ? 0 : rpc_call_result; - if (stack->wakeup_list) { - rpc_call_result = rpc_call_eventlistcnt(stack); - dfx->data.pkts.wakeup_list = (rpc_call_result < 0) ? 0 : rpc_call_result; - } dfx->data.pkts.conn_num = stack->conn_num; } diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index 2a67333..26725f7 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -19,9 +19,10 @@ #include "lstack_protocol_stack.h" #include "lstack_control_plane.h" #include "gazelle_base_func.h" +#include "lstack_dpdk.h" #include "lstack_thread_rpc.h" -#define HANDLE_RPC_MSG_MAX (8) +static PER_THREAD struct rte_mempool *rpc_pool = NULL; static inline __attribute__((always_inline)) struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) @@ -33,11 +34,20 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) return NULL; } - ret = rte_mempool_get(stack->rpc_pool, (void **)&msg); + static uint16_t pool_index = 0; + if (rpc_pool == NULL) { + rpc_pool = create_rpc_mempool("rpc_msg", pool_index++); + if (rpc_pool == NULL) { + return NULL; + } + } + + ret = rte_mempool_get(rpc_pool, (void **)&msg); if (ret < 0) { get_protocol_stack_group()->call_alloc_fail++; return NULL; } + msg->pool = rpc_pool; pthread_spin_init(&msg->lock, PTHREAD_PROCESS_SHARED); msg->func = func; @@ -47,13 +57,13 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) } static inline __attribute__((always_inline)) -void rpc_msg_free(struct rte_mempool *pool, struct rpc_msg *msg) +void rpc_msg_free(struct rpc_msg *msg) { pthread_spin_destroy(&msg->lock); msg->self_release = 0; msg->func = NULL; - rte_mempool_put(pool, (void *)msg); + rte_mempool_put(msg->pool, (void *)msg); } static inline __attribute__((always_inline)) @@ -64,7 +74,7 @@ void rpc_call(lockless_queue *queue, struct rpc_msg *msg) } static inline __attribute__((always_inline)) -int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rpc_msg *msg) +int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg) { int32_t ret; @@ -74,20 +84,20 @@ int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rp pthread_spin_lock(&msg->lock); ret = msg->result; - rpc_msg_free(pool, msg); + rpc_msg_free(msg); return ret; } -void poll_rpc_msg(struct protocol_stack *stack) +void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num) { - int32_t num; + uint32_t num; struct rpc_msg *msg = NULL; num = 0; - while (num++ < HANDLE_RPC_MSG_MAX) { + while (num++ < max_num) { lockless_queue_node *node = lockless_queue_mpsc_pop(&stack->rpc_queue); if (node == NULL) { - return; + break; } msg = container_of(node, struct rpc_msg, queue_node); @@ -103,7 +113,7 @@ void poll_rpc_msg(struct protocol_stack *stack) if (msg->self_release) { pthread_spin_unlock(&msg->lock); } else { - rpc_msg_free(stack->rpc_pool, msg); + rpc_msg_free(msg); } } } @@ -118,7 +128,7 @@ int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint3 msg->args[MSG_ARG_0].p = conn_table; msg->args[MSG_ARG_1].u = max_conn; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_connnum(struct protocol_stack *stack) @@ -128,7 +138,7 @@ int32_t rpc_call_connnum(struct protocol_stack *stack) return -1; } - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) @@ -142,7 +152,7 @@ int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struc msg->args[MSG_ARG_1].cp = addr; msg->args[MSG_ARG_2].socklen = addrlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } static void rpc_msgcnt(struct rpc_msg *msg) @@ -158,7 +168,7 @@ int32_t rpc_call_msgcnt(struct protocol_stack *stack) return -1; } - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn) @@ -168,7 +178,7 @@ int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn) return -1; } msg->args[MSG_ARG_0].p = conn; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) @@ -178,17 +188,7 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) return -1; } msg->args[MSG_ARG_0].p = conn; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); -} - -int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack) -{ - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_wakeuplist_count); - if (msg == NULL) { - return -1; - } - - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_eventlistcnt(struct protocol_stack *stack) @@ -198,7 +198,7 @@ int32_t rpc_call_eventlistcnt(struct protocol_stack *stack) return -1; } - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_sendlistcnt(struct protocol_stack *stack) @@ -208,7 +208,7 @@ int32_t rpc_call_sendlistcnt(struct protocol_stack *stack) return -1; } - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) @@ -218,7 +218,7 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) return -1; } - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } void add_epoll_event(struct netconn *conn, uint32_t event); @@ -243,24 +243,6 @@ void rpc_call_addevent(struct protocol_stack *stack, void *sock) rpc_call(&stack->rpc_queue, msg); } -static void rpc_replenish_idlembuf(struct rpc_msg *msg) -{ - struct protocol_stack *stack = get_protocol_stack(); - stack_replenish_send_idlembuf(stack); - stack->in_replenish = 0; -} - -void rpc_call_replenish_idlembuf(struct protocol_stack *stack) -{ - struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_replenish_idlembuf); - if (msg == NULL) { - return; - } - - msg->self_release = 0; - rpc_call(&stack->rpc_queue, msg); -} - int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf) { struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp); @@ -287,7 +269,7 @@ int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol) msg->args[MSG_ARG_1].i = type; msg->args[MSG_ARG_2].i = protocol; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_close(int fd) @@ -300,7 +282,7 @@ int32_t rpc_call_close(int fd) msg->args[MSG_ARG_0].i = fd; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen) @@ -315,7 +297,7 @@ int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen msg->args[MSG_ARG_1].cp = addr; msg->args[MSG_ARG_2].socklen = addrlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_listen(int s, int backlog) @@ -329,7 +311,7 @@ int32_t rpc_call_listen(int s, int backlog) msg->args[MSG_ARG_0].i = s; msg->args[MSG_ARG_1].i = backlog; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen) @@ -344,7 +326,7 @@ int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen) msg->args[MSG_ARG_1].p = addr; msg->args[MSG_ARG_2].p = addrlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) @@ -359,7 +341,7 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) msg->args[MSG_ARG_1].cp = addr; msg->args[MSG_ARG_2].socklen = addrlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) @@ -374,7 +356,7 @@ int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) msg->args[MSG_ARG_1].p = addr; msg->args[MSG_ARG_2].p = addrlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) @@ -389,7 +371,7 @@ int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) msg->args[MSG_ARG_1].p = addr; msg->args[MSG_ARG_2].p = addrlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen) @@ -406,7 +388,7 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle msg->args[MSG_ARG_3].p = optval; msg->args[MSG_ARG_4].p = optlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) @@ -423,7 +405,7 @@ int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, msg->args[MSG_ARG_3].cp = optval; msg->args[MSG_ARG_4].socklen = optlen; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_fcntl(int fd, int cmd, long val) @@ -438,7 +420,7 @@ int32_t rpc_call_fcntl(int fd, int cmd, long val) msg->args[MSG_ARG_1].i = cmd; msg->args[MSG_ARG_2].l = val; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_ioctl(int fd, long cmd, void *argp) @@ -453,7 +435,7 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp) msg->args[MSG_ARG_1].l = cmd; msg->args[MSG_ARG_2].p = argp; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags) @@ -486,7 +468,7 @@ int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags) msg->args[MSG_ARG_1].cp = msghdr; msg->args[MSG_ARG_2].i = flags; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags) @@ -501,5 +483,5 @@ int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags) msg->args[MSG_ARG_1].p = msghdr; msg->args[MSG_ARG_2].i = flags; - return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg); + return rpc_sync_call(&stack->rpc_queue, msg); } diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h index 48b7e44..345a373 100644 --- a/src/lstack/include/lstack_cfg.h +++ b/src/lstack/include/lstack_cfg.h @@ -66,7 +66,7 @@ struct cfg_params { uint16_t num_cpu; uint32_t cpus[CFG_MAX_CPUS]; uint16_t num_wakeup; - uint32_t weakup[CFG_MAX_CPUS]; + uint32_t wakeup[CFG_MAX_CPUS]; uint8_t num_ports; uint16_t ports[CFG_MAX_PORTS]; char log_file[PATH_MAX]; diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h index e76a9a6..e8080e1 100644 --- a/src/lstack/include/lstack_dpdk.h +++ b/src/lstack/include/lstack_dpdk.h @@ -37,10 +37,10 @@ struct protocol_stack; #define MBUF_SZ (MAX_PACKET_SZ + RTE_PKTMBUF_HEADROOM) -#define MAX_CORE_NUM 256 +#define MAX_CORE_NUM 256 #define CALL_MSG_RING_SIZE (unsigned long long)32 -#define CALL_CACHE_SZ 64 -#define CALL_POOL_SZ ((VDEV_CALL_QUEUE_SZ << 1) + (2 * CALL_CACHE_SZ * MAX_CORE_NUM)) +#define CALL_CACHE_SZ 0 +#define CALL_POOL_SZ 128 /* Layout: * | rte_mbuf | pbuf | custom_free_function | payload | @@ -62,6 +62,7 @@ void dpdk_eal_init(void); int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num); struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id); int32_t create_shared_ring(struct protocol_stack *stack); +struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id); void lstack_log_level_init(void); int dpdk_ethdev_init(void); int dpdk_ethdev_start(void); diff --git a/src/lstack/include/lstack_log.h b/src/lstack/include/lstack_log.h index 383495d..8b4209a 100644 --- a/src/lstack/include/lstack_log.h +++ b/src/lstack/include/lstack_log.h @@ -15,17 +15,14 @@ #include #include - #include -#include "lwipopts.h" - -#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1 +#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1 #define LSTACK_EXIT(a, fmt, ...) rte_exit(a, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__) #define LSTACK_LOG(a, b, fmt, ...) (void)RTE_LOG(a, b, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__) -#define LSTACK_INFO LOG_INFO -#define LSTACK_ERR LOG_ERR +#define LSTACK_INFO LOG_INFO +#define LSTACK_ERR LOG_ERR /* before rte_eal_init */ #define LSTACK_PRE_LOG(level, fmt, ...) \ diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h index ffd3b80..c73e3a7 100644 --- a/src/lstack/include/lstack_lwip.h +++ b/src/lstack/include/lstack_lwip.h @@ -21,32 +21,31 @@ #define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox)) #define NETCONN_IS_DATAIN(sock) ((rte_ring_count((sock)->recv_ring) || (sock)->recv_lastdata)) -#define NETCONN_IS_DATAOUT(sock) rte_ring_free_count((sock)->send_ring) +#define NETCONN_IS_DATAOUT(sock) ((rte_ring_count((sock)->send_ring) || (sock)->send_lastdata)) +#define NETCONN_IS_OUTIDLE(sock) rte_ring_free_count((sock)->send_ring) void create_shadow_fd(struct rpc_msg *msg); void listen_list_add_node(int32_t head_fd, int32_t add_fd); void gazelle_init_sock(int32_t fd); int32_t gazelle_socket(int domain, int type, int protocol); void gazelle_clean_sock(int32_t fd); -ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags); +struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags); ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len); ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags); ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags); -void read_recv_list(void); +void read_recv_list(struct protocol_stack *stack, uint32_t max_num); +void send_stack_list(struct protocol_stack *stack, uint32_t send_max); void add_recv_list(int32_t fd); void stack_sendlist_count(struct rpc_msg *msg); void stack_eventlist_count(struct rpc_msg *msg); -void stack_wakeuplist_count(struct rpc_msg *msg); void get_lwip_conntable(struct rpc_msg *msg); void get_lwip_connnum(struct rpc_msg *msg); void stack_recvlist_count(struct rpc_msg *msg); void stack_send(struct rpc_msg *msg); -void stack_replenish_send_idlembuf(struct protocol_stack *stack); int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num); void gazelle_free_pbuf(struct pbuf *pbuf); ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags); ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags); ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags); -void add_self_event(struct lwip_sock *sock, uint32_t events); #endif diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index 39052e1..9753385 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -14,6 +14,7 @@ #define __GAZELLE_PROTOCOL_STACK_H__ #include +#include #include #include #include "dpdk_common.h" @@ -28,38 +29,32 @@ struct protocol_stack { uint16_t socket_id; uint16_t cpu_id; volatile uint16_t conn_num; - volatile bool in_replenish; - - // for dispatcher thread - cpu_set_t idle_cpuset; + cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */ lockless_queue rpc_queue; - struct rte_ring *weakup_ring; - struct rte_mempool *rx_pktmbuf_pool; struct rte_mempool *tx_pktmbuf_pool; - struct rte_mempool *rpc_pool; struct rte_ring *rx_ring; struct rte_ring *tx_ring; struct rte_ring *reg_ring; - struct rte_ring *send_idle_ring; + struct rte_ring *wakeup_ring; + struct reg_ring_msg *reg_buf; struct netif netif; uint32_t rx_ring_used; uint32_t tx_ring_used; + struct eth_dev_ops *dev_ops; struct list_node recv_list; struct list_node listen_list; - struct list_node event_list; struct list_node send_list; - struct list_node *wakeup_list; + struct list_node event_list; + pthread_spinlock_t event_lock; struct gazelle_stat_pkts stats; struct gazelle_stack_latency latency; struct stats_ *lwip_stats; - - struct eth_dev_ops *dev_ops; }; struct eth_params; @@ -67,25 +62,35 @@ struct eth_params; struct protocol_stack_group { uint16_t stack_num; uint16_t port_id; - sem_t thread_inited; + sem_t thread_phase1; + sem_t ethdev_init; struct rte_mempool *kni_pktmbuf_pool; struct eth_params *eth_params; struct protocol_stack *stacks[PROTOCOL_STACK_MAX]; + bool wakeup_enable; /* dfx stats */ bool latency_start; uint64_t call_alloc_fail; }; +struct wakeup_poll { + bool init; + struct protocol_stack *bind_stack; + struct list_node event_list; /* epoll temp use poll */ + sem_t event_sem; +}; + long get_stack_tid(void); +pthread_mutex_t *get_mem_mutex(void); struct protocol_stack *get_protocol_stack(void); struct protocol_stack *get_protocol_stack_by_fd(int32_t fd); struct protocol_stack *get_minconn_protocol_stack(void); struct protocol_stack_group *get_protocol_stack_group(void); int32_t init_protocol_stack(void); -int32_t create_stack_thread(void); -int bind_to_stack_numa(int stack_id); +int32_t bind_to_stack_numa(struct protocol_stack *stack); +int32_t init_dpdk_ethdev(void); /* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack); diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index 20539d9..61bcd38 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -42,21 +42,20 @@ struct rpc_msg { int32_t self_release; /* 0:msg handler release msg 1:msg sender release msg */ int64_t result; /* func return val */ lockless_queue_node queue_node; + struct rte_mempool *pool; rpc_msg_func func; /* msg handle func hook */ union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */ }; struct protocol_stack; -void poll_rpc_msg(struct protocol_stack *stack); -void rpc_call_replenish_idlembuf(struct protocol_stack *stack); +void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num); void rpc_call_addevent(struct protocol_stack *stack, void *sock); int32_t rpc_call_msgcnt(struct protocol_stack *stack); int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); int32_t rpc_call_recvlistcnt(struct protocol_stack *stack); int32_t rpc_call_eventlistcnt(struct protocol_stack *stack); int32_t rpc_call_sendlistcnt(struct protocol_stack *stack); -int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack); int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn); int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn); int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn); diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h index 916b1e2..31a997d 100644 --- a/src/lstack/include/lstack_vdev.h +++ b/src/lstack/include/lstack_vdev.h @@ -23,7 +23,7 @@ #define VDEV_EVENT_QUEUE_SZ (DEFAULT_RING_SIZE) #define VDEV_REG_QUEUE_SZ (DEFAULT_RING_SIZE) #define VDEV_CALL_QUEUE_SZ (DEFAULT_RING_SIZE) -#define VDEV_WEAKUP_QUEUE_SZ (DEFAULT_RING_SIZE) +#define VDEV_WAKEUP_QUEUE_SZ (DEFAULT_RING_SIZE) #define VDEV_IDLE_QUEUE_SZ (DEFAULT_RING_SIZE) #define VDEV_TX_QUEUE_SZ (DEFAULT_RING_SIZE) diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h index 2b3cff4..cac640b 100644 --- a/src/lstack/include/posix/lstack_epoll.h +++ b/src/lstack/include/posix/lstack_epoll.h @@ -17,6 +17,8 @@ extern "C" { #endif +#include + int32_t lstack_epoll_create(int32_t size); int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event); int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event *events, int32_t maxevents, int32_t timeout); diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf index fdca602..696dfb9 100644 --- a/src/lstack/lstack.conf +++ b/src/lstack/lstack.conf @@ -16,7 +16,6 @@ kni_switch=0 low_power_mode=0 num_cpus="2" -num_wakeup="3" host_addr="192.168.1.10" mask_addr="255.255.255.0" diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c index 8b2193f..ae39403 100644 --- a/src/lstack/netif/lstack_ethdev.c +++ b/src/lstack/netif/lstack_ethdev.c @@ -51,6 +51,9 @@ void eth_dev_recv(struct rte_mbuf *mbuf) stack->stats.rx_allocmbuf_fail++; break; } +#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW + next->ol_flags = m->ol_flags; +#endif if (head == NULL) { head = next; @@ -73,18 +76,19 @@ void eth_dev_recv(struct rte_mbuf *mbuf) } } +#define READ_PKTS_MAX 32 int32_t eth_dev_poll(void) { uint32_t nr_pkts; - struct rte_mbuf *pkts[DPDK_PKT_BURST_SIZE]; + struct rte_mbuf *pkts[READ_PKTS_MAX]; struct protocol_stack *stack = get_protocol_stack(); - nr_pkts = stack->dev_ops->rx_poll(stack, pkts, DPDK_PKT_BURST_SIZE); + nr_pkts = stack->dev_ops->rx_poll(stack, pkts, READ_PKTS_MAX); if (nr_pkts == 0) { return nr_pkts; } - if (get_protocol_stack_group()->latency_start) { + if (!use_ltran() && get_protocol_stack_group()->latency_start) { uint64_t time_stamp = get_current_time(); time_stamp_into_mbuf(nr_pkts, pkts, time_stamp); } @@ -146,19 +150,6 @@ static err_t eth_dev_output(struct netif *netif, struct pbuf *pbuf) return ERR_OK; } -static err_t eth_dev_input(struct pbuf *p, struct netif *netif) -{ - err_t ret = ethernet_input(p, netif); - if (ret != ERR_OK) { - return ret; - } - - if (get_protocol_stack_group()->latency_start) { - calculate_lstack_latency(&get_protocol_stack()->latency, p, GAZELLE_LATENCY_LWIP); - } - return ret; -} - static err_t eth_dev_init(struct netif *netif) { struct cfg_params *cfg = get_global_cfg_params(); @@ -200,7 +191,7 @@ int32_t ethdev_init(struct protocol_stack *stack) netif_set_default(&stack->netif); struct netif *netif = netif_add(&stack->netif, &cfg->host_addr, &cfg->netmask, &cfg->gateway_addr, NULL, - eth_dev_init, eth_dev_input); + eth_dev_init, ethernet_input); if (netif == NULL) { LSTACK_LOG(ERR, LSTACK, "netif_add failed\n"); return ERR_IF; diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c index 57d3bce..5a4e86a 100644 --- a/src/lstack/netif/lstack_vdev.c +++ b/src/lstack/netif/lstack_vdev.c @@ -91,7 +91,7 @@ static uint32_t vdev_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkt uint32_t sent_pkts = 0; do { - sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts); + sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts - sent_pkts); } while (sent_pkts < nr_pkts); return sent_pkts; diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c index 8db5791..8d71966 100644 --- a/src/ltran/ltran_dfx.c +++ b/src/ltran/ltran_dfx.c @@ -561,27 +561,16 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) printf("app_write_drop: %-13"PRIu64" ", lstack_stat->data.pkts.app_write_drop); printf("write_lwip_drop: %-12"PRIu64" ", lstack_stat->data.pkts.write_lwip_drop); printf("app_write_idlebuf: %-10"PRIu16" \n", lstack_stat->data.pkts.send_idle_ring_cnt); + printf("event_list: %-17"PRIu64" ", lstack_stat->data.pkts.event_list); printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list); - printf("weakup_ring_cnt: %-12"PRIu16" ", lstack_stat->data.pkts.weakup_ring_cnt); printf("conn_num: %-19"PRIu16" \n", lstack_stat->data.pkts.conn_num); - printf("weakup_events: %-14"PRIu64" ", lstack_stat->data.pkts.weakup_events); - printf("lwip_events: %-16"PRIu64" ", lstack_stat->data.pkts.lwip_events); - printf("app_events: %-17"PRIu64"\n", lstack_stat->data.pkts.app_events); - printf("epoll_pending: %-14"PRIu64" ", lstack_stat->data.pkts.epoll_pending); - printf("epoll_self_event: %-11"PRIu64" ", lstack_stat->data.pkts.epoll_self_event); - printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event); - printf("read_events: %-16"PRIu64" ", lstack_stat->data.pkts.read_events); - printf("write_events: %-15"PRIu64" ", lstack_stat->data.pkts.write_events); - printf("accept_events: %-14"PRIu64" \n", lstack_stat->data.pkts.accept_events); - printf("read_null: %-18"PRIu64" ", lstack_stat->data.pkts.read_null); - printf("wakeup_list: %-16"PRIu64" ", lstack_stat->data.pkts.wakeup_list); - printf("event_list: %-17"PRIu64" \n", lstack_stat->data.pkts.event_list); - printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc); - printf("epoll_pending_call: %-9"PRIu64" ", lstack_stat->data.pkts.epoll_pending_call); - printf("epoll_self_call: %-12"PRIu64" \n", lstack_stat->data.pkts.epoll_self_call); + printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.wakeup_events); + printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.app_events); + printf("read_null: %-18"PRIu64" \n", lstack_stat->data.pkts.read_null); printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt); printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail); printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null); + printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc); printf("send_list: %-18"PRIu64" \n", lstack_stat->data.pkts.send_list); } @@ -884,7 +873,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ printf("Active Internet connections (servers and established)\n"); do { printf("\n------ stack tid: %6u ------\n", stat->tid); - printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address" + printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt Local Address " " Foreign Address State\n"); uint32_t unread_pkts = 0; uint32_t unsend_pkts = 0; @@ -894,13 +883,13 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_ rip.s_addr = conn_info->rip; lip.s_addr = conn_info->lip; if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) { - printf("%-6utcp %-10u%-11u%-9u%-11u%-7u%-12u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt, - conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->event_ring_cnt, - conn_info->self_ring_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, + printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt, + conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->sem_cnt, + inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port, tcp_state_to_str(conn_info->tcp_sub_state)); } else if (conn_info->state == GAZELLE_LISTEN_LIST) { - printf("%-6utcp %-60u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt, + printf("%-6utcp %-50u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port); } else { printf("Got unknow tcp conn::%s:%5hu, state:%u\n", diff --git a/src/ltran/ltran_opt.h b/src/ltran/ltran_opt.h index e4e085d..1117898 100644 --- a/src/ltran/ltran_opt.h +++ b/src/ltran/ltran_opt.h @@ -34,12 +34,12 @@ #define GAZELLE_KNI_ETHERNET_HEADER_SIZE 14 #define GAZELLE_KNI_ETHERNET_FCS_SIZE 4 -#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%d" -#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%d" +#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%u" +#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%u" #define GAZELLE_PKT_MBUF_POOL_NAME_LENGTH 64 #define GAZELLE_BOND_NAME_LENGTH 64 -#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%d" +#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%hu" #define GAZELLE_BOND_QUEUE_MIN 1 #define GAZELLE_BOND_QUEUE_MAX 64 -- 1.8.3.1