From 200ee63e092824edf12c7cc3172c61ba04a57d56 Mon Sep 17 00:00:00 2001 From: jinag12 Date: Sat, 11 Mar 2023 15:59:51 +0000 Subject: [PATCH] supprot multi process --- src/common/gazelle_opt.h | 4 + src/lstack/api/lstack_epoll.c | 16 +- src/lstack/api/lstack_wrap.c | 3 +- src/lstack/core/lstack_cfg.c | 201 +++++++- src/lstack/core/lstack_dpdk.c | 192 ++++--- src/lstack/core/lstack_init.c | 2 + src/lstack/core/lstack_protocol_stack.c | 225 +++++++-- src/lstack/include/lstack_cfg.h | 17 +- src/lstack/include/lstack_dpdk.h | 3 +- src/lstack/include/lstack_ethdev.h | 23 +- src/lstack/include/lstack_protocol_stack.h | 10 + src/lstack/include/lstack_vdev.h | 7 + src/lstack/lstack.conf | 4 + src/lstack/netif/lstack_ethdev.c | 555 ++++++++++++++++++++- src/lstack/netif/lstack_vdev.c | 49 +- 15 files changed, 1143 insertions(+), 168 deletions(-) diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h index 745fdd8..e278107 100644 --- a/src/common/gazelle_opt.h +++ b/src/common/gazelle_opt.h @@ -90,4 +90,8 @@ #define SEND_TIME_WAIT_NS 20000 #define SECOND_NSECOND 1000000000 +#define LSTACK_SEND_THREAD_NAME "lstack_send" +#define LSTACK_RECV_THREAD_NAME "lstack_recv" +#define LSTACK_THREAD_NAME "gazellelstack" + #endif /* _GAZELLE_OPT_H_ */ diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c index da29590..4a10b09 100644 --- a/src/lstack/api/lstack_epoll.c +++ b/src/lstack/api/lstack_epoll.c @@ -74,8 +74,8 @@ void add_sock_event(struct lwip_sock *sock, uint32_t event) } struct protocol_stack *stack = sock->stack; - if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) { - list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]); + if (list_is_null(&wakeup->wakeup_list[stack->stack_idx])) { + list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->stack_idx]); } } @@ -95,7 +95,7 @@ void wakeup_stack_epoll(struct protocol_stack *stack, bool wakeup_thread_enable) temp = nod; } - struct wakeup_poll *wakeup = container_of((node - stack->queue_id), struct wakeup_poll, wakeup_list); + struct wakeup_poll *wakeup = container_of((node - stack->stack_idx), struct wakeup_poll, wakeup_list); if (!wakeup_thread_enable) { if (__atomic_load_n(&wakeup->in_wait, __ATOMIC_ACQUIRE)) { @@ -109,7 +109,7 @@ void wakeup_stack_epoll(struct protocol_stack *stack, bool wakeup_thread_enable) stack->stats.wakeup_events++; } - list_del_node_null(&wakeup->wakeup_list[stack->queue_id]); + list_del_node_null(&wakeup->wakeup_list[stack->stack_idx]); } } @@ -291,7 +291,7 @@ static uint16_t find_max_cnt_stack(int32_t *stack_count, uint16_t stack_num, str /* all stack same, don't change */ if (all_same_cnt && last_stack) { - return last_stack->queue_id; + return last_stack->stack_idx; } /* first bind and all stack same. choice tick as queue_id, avoid all bind to statck_0. */ @@ -343,7 +343,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even switch (op) { case EPOLL_CTL_ADD: sock->wakeup = wakeup; - wakeup->stack_fd_cnt[sock->stack->queue_id]++; + wakeup->stack_fd_cnt[sock->stack->stack_idx]++; /* fall through */ case EPOLL_CTL_MOD: sock->epoll_events = event->events | EPOLLERR | EPOLLHUP; @@ -352,7 +352,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even break; case EPOLL_CTL_DEL: sock->epoll_events = 0; - wakeup->stack_fd_cnt[sock->stack->queue_id]--; + wakeup->stack_fd_cnt[sock->stack->stack_idx]--; pthread_spin_lock(&wakeup->event_list_lock); list_del_node_null(&sock->event_list); pthread_spin_unlock(&wakeup->event_list_lock); @@ -652,7 +652,7 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd while (sock && sock->conn) { sock->epoll_events = fds[i].events | POLLERR; sock->wakeup = wakeup; - stack_count[sock->stack->queue_id]++; + stack_count[sock->stack->stack_idx]++; sock = sock->listen_next; } } diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c index f438529..561c6e4 100644 --- a/src/lstack/api/lstack_wrap.c +++ b/src/lstack/api/lstack_wrap.c @@ -238,8 +238,7 @@ static inline int32_t do_listen(int32_t s, int32_t backlog) return posix_api->listen_fn(s, backlog); } - int32_t ret = get_global_cfg_params()->listen_shadow ? stack_broadcast_listen(s, backlog) : - stack_single_listen(s, backlog); + int32_t ret = stack_broadcast_listen(s, backlog); if (ret != 0) { return ret; } diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c index 9195f34..72a3292 100644 --- a/src/lstack/core/lstack_cfg.c +++ b/src/lstack/core/lstack_cfg.c @@ -65,6 +65,10 @@ static int32_t parse_tcp_conn_count(void); static int32_t parse_mbuf_count_per_conn(void); static int32_t parse_send_ring_size(void); static int32_t parse_expand_send_ring(void); +static int32_t parse_num_process(void); +static int32_t parse_process_numa(void); +static int32_t parse_process_index(void); +static int32_t parse_seperate_sendrecv_args(void); static inline int32_t parse_int(void *arg, char * arg_string, int32_t default_val, int32_t min_val, int32_t max_val) @@ -97,6 +101,7 @@ static struct config_vector_t g_config_tbl[] = { { "use_ltran", parse_use_ltran }, { "devices", parse_devices }, { "dpdk_args", parse_dpdk_args }, + { "seperate_send_recv", parse_seperate_sendrecv_args }, { "num_cpus", parse_stack_cpu_number }, { "num_wakeup", parse_wakeup_cpu_number }, { "low_power_mode", parse_low_power_mode }, @@ -112,6 +117,9 @@ static struct config_vector_t g_config_tbl[] = { { "nic_read_number", parse_nic_read_number }, { "send_ring_size", parse_send_ring_size }, { "expand_send_ring", parse_expand_send_ring }, + { "num_process", parse_num_process }, + { "process_numa", parse_process_numa }, + { "process_idx", parse_process_index }, { NULL, NULL } }; @@ -277,35 +285,99 @@ static int32_t parse_stack_cpu_number(void) const config_setting_t *num_cpus = NULL; const char *args = NULL; - num_cpus = config_lookup(&g_config, "num_cpus"); - if (num_cpus == NULL) { - return -EINVAL; - } + if (!g_config_params.seperate_send_recv) { + num_cpus = config_lookup(&g_config, "num_cpus"); + if (num_cpus == NULL) { + return -EINVAL; + } - args = config_setting_get_string(num_cpus); - if (args == NULL) { - return -EINVAL; - } + args = config_setting_get_string(num_cpus); + if (args == NULL) { + return -EINVAL; + } - if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) { - int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST); - if (idx < 0) { - g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST); - g_config_params.dpdk_argc++; + if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) { + int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST); + if (idx < 0) { + g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST); + g_config_params.dpdk_argc++; - g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args); - g_config_params.dpdk_argc++; + g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args); + g_config_params.dpdk_argc++; + } } - } - char *tmp_arg = strdup(args); - int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.cpus, CFG_MAX_CPUS); - free(tmp_arg); - if (cnt <= 0 || cnt > CFG_MAX_CPUS) { - return -EINVAL; - } + char *tmp_arg = strdup(args); + int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.cpus, CFG_MAX_CPUS); + free(tmp_arg); + if (cnt <= 0 || cnt > CFG_MAX_CPUS) { + return -EINVAL; + } + + g_config_params.num_cpu = cnt; + g_config_params.num_queue = (uint16_t)cnt; + g_config_params.tot_queue_num = g_config_params.num_queue; + } else { + // send_num_cpus + num_cpus = config_lookup(&g_config, "send_num_cpus"); + if (num_cpus == NULL) { + return -EINVAL; + } + + args = config_setting_get_string(num_cpus); + if (args == NULL) { + return -EINVAL; + } + + if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) { + int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST); + if (idx < 0) { + g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST); + g_config_params.dpdk_argc++; + + g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args); + g_config_params.dpdk_argc++; + } + } + + char *tmp_arg_send = strdup(args); + int32_t cnt = separate_str_to_array(tmp_arg_send, g_config_params.send_cpus, CFG_MAX_CPUS); + free(tmp_arg_send); + + // recv_num_cpus + num_cpus = config_lookup(&g_config, "recv_num_cpus"); + if (num_cpus == NULL) { + return -EINVAL; + } + + args = config_setting_get_string(num_cpus); + if (args == NULL) { + return -EINVAL; + } - g_config_params.num_cpu = cnt; + if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) { + int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST); + if (idx < 0) { + g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST); + g_config_params.dpdk_argc++; + + g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args); + g_config_params.dpdk_argc++; + } + } + + char *tmp_arg_recv = strdup(args); + cnt = separate_str_to_array(tmp_arg_recv, g_config_params.recv_cpus, CFG_MAX_CPUS); + free(tmp_arg_recv); + + if (cnt <= 0 || cnt > CFG_MAX_CPUS / 2) { + return -EINVAL; + } + + g_config_params.num_cpu = cnt; + g_config_params.num_queue = (uint16_t)cnt * 2; + g_config_params.tot_queue_num = g_config_params.num_queue; + } return 0; } @@ -369,7 +441,12 @@ int32_t init_stack_numa_cpuset(struct protocol_stack *stack) cpu_set_t stack_cpuset; CPU_ZERO(&stack_cpuset); for (int32_t idx = 0; idx < cfg->num_cpu; ++idx) { - CPU_SET(cfg->cpus[idx], &stack_cpuset); + if (!cfg->seperate_send_recv) { + CPU_SET(cfg->cpus[idx], &stack_cpuset); + }else { + CPU_SET(cfg->send_cpus[idx], &stack_cpuset); + CPU_SET(cfg->recv_cpus[idx], &stack_cpuset); + } } for (int32_t idx = 0; idx < cfg->num_wakeup; ++idx) { CPU_SET(cfg->wakeup[idx], &stack_cpuset); @@ -643,6 +720,13 @@ static int32_t parse_dpdk_args(void) goto free_dpdk_args; } g_config_params.dpdk_argv[start_index + i] = p; + + const char *primary = "primary"; + if(strcmp(p, primary) == 0){ + struct cfg_params *global_params = get_global_cfg_params(); + global_params->is_primary = 1; + } + (void)fprintf(stderr, "%s ", g_config_params.dpdk_argv[start_index + i]); } (void)fprintf(stderr, "\n"); @@ -877,3 +961,72 @@ static int32_t parse_unix_prefix(void) return 0; } +static int32_t parse_seperate_sendrecv_args(void) +{ + return parse_int(&g_config_params.seperate_send_recv, "seperate_send_recv", 0, 0, 1); +} + +static int32_t parse_num_process(void) +{ + if (g_config_params.use_ltran) { + return 0; + } + + const config_setting_t *num_process = NULL; + + num_process = config_lookup(&g_config, "num_process"); + if (num_process == NULL) { + g_config_params.num_process = 1; + }else { + g_config_params.num_process = (uint8_t)config_setting_get_int(num_process); + } + + g_config_params.tot_queue_num = g_config_params.num_queue * g_config_params.num_process; + + return 0; +} + +static int32_t parse_process_numa(void) +{ + const config_setting_t *cfg_args = NULL; + const char *args = NULL; + + int ret; + cfg_args = config_lookup(&g_config, "process_numa"); + if (cfg_args == NULL) + return 0; + + args = config_setting_get_string(cfg_args); + if (cfg_args == NULL) { + return 0; + } + + ret = separate_str_to_array((char *)args, g_config_params.process_numa, PROTOCOL_STACK_MAX); + if (ret <= 0) { + return -EINVAL; + } + + return 0; +} + +static int parse_process_index(void) +{ + if (g_config_params.use_ltran) { + return 0; + } + + const config_setting_t *process_idx = NULL; + process_idx = config_lookup(&g_config, "process_idx"); + if (process_idx == NULL) { + if (g_config_params.num_process == 1) { + g_config_params.process_idx = 0; + }else { + return -EINVAL; + } + } else { + g_config_params.process_idx = (uint8_t)config_setting_get_int(process_idx); + } + + return 0; +} + diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index f60963f..1beb66b 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -131,7 +132,7 @@ int32_t dpdk_eal_init(void) return ret; } -static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf, +struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf, uint32_t mbuf_cache_size, uint16_t queue_id) { int32_t ret; @@ -149,7 +150,27 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_ if (pool == NULL) { LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno); } + + return pool; +} + +static struct rte_mempool* get_pktmbuf_mempool(const char *name, uint16_t queue_id){ + int32_t ret; + char pool_name[PATH_MAX]; + struct rte_mempool *pool; + + ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id); + if (ret < 0) { + return NULL; + } + pool = rte_mempool_lookup(pool_name); + if (pool == NULL) { + LSTACK_LOG(ERR, LSTACK, "look up %s pool rte_err=%d\n", pool_name, rte_errno); + } + + // rte_mempool_dump(stdout, pool) ; return pool; + } static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_id) @@ -178,10 +199,7 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num) return -1; } - stack->rxtx_pktmbuf_pool = create_pktmbuf_mempool("rxtx_mbuf", - get_global_cfg_params()->mbuf_count_per_conn * get_global_cfg_params()->tcp_conn_count / stack_num, - RXTX_CACHE_SZ, - stack->queue_id); + stack->rxtx_pktmbuf_pool = get_pktmbuf_mempool("rxtx_mbuf", stack->queue_id); if (stack->rxtx_pktmbuf_pool == NULL) { return -1; } @@ -201,7 +219,7 @@ struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, i char ring_name[RTE_RING_NAMESIZE] = {0}; struct rte_ring *ring; - int32_t ret = snprintf_s(ring_name, sizeof(ring_name), RTE_RING_NAMESIZE - 1, "%s_%d", name, queue_id); + int32_t ret = snprintf_s(ring_name, sizeof(ring_name), RTE_RING_NAMESIZE - 1, "%s_%d_%d", name, get_global_cfg_params()->process_idx, queue_id); if (ret < 0) { return NULL; } @@ -286,6 +304,12 @@ void lstack_log_level_init(void) } } +// get port id +inline uint16_t get_port_id(){ + uint16_t port_id = get_global_cfg_params()->port_id; + return port_id; +} + static int32_t ethdev_port_id(uint8_t *mac) { int32_t port_id; @@ -412,89 +436,111 @@ static void rss_setup(const int port_id, const uint16_t nb_queues) int32_t dpdk_ethdev_init(void) { uint16_t nb_queues = get_global_cfg_params()->num_cpu; - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - - int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr); - if (port_id < 0) { - return port_id; + if (get_global_cfg_params()->seperate_send_recv) { + nb_queues = get_global_cfg_params()->num_cpu * 2; } - struct rte_eth_dev_info dev_info; - int32_t ret = rte_eth_dev_info_get(port_id, &dev_info); - if (ret != 0) { - LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret); - return ret; + if (!use_ltran()) { + nb_queues = get_global_cfg_params()->tot_queue_num; } - int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues); - if (max_queues < nb_queues) { - LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues); - return -EINVAL; - } + struct protocol_stack_group *stack_group = get_protocol_stack_group(); - struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues); - if (eth_params == NULL) { - return -ENOMEM; - } - eth_params_checksum(ð_params->conf, &dev_info); - int32_t rss_enable = eth_params_rss(ð_params->conf, &dev_info); - stack_group->eth_params = eth_params; - stack_group->port_id = eth_params->port_id; - stack_group->rx_offload = eth_params->conf.rxmode.offloads; - stack_group->tx_offload = eth_params->conf.txmode.offloads; - - for (uint32_t i = 0; i < stack_group->stack_num; i++) { - struct protocol_stack *stack = stack_group->stacks[i]; - if (likely(stack)) { - stack->port_id = stack_group->port_id; - } else { - LSTACK_LOG(ERR, LSTACK, "empty stack at stack_num %d\n", i); - stack_group->eth_params = NULL; - free(eth_params); - return -EINVAL; - } - } - - ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, ð_params->conf); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "cannot config eth dev at port %d: %s\n", port_id, rte_strerror(-ret)); - stack_group->eth_params = NULL; - free(eth_params); - return ret; - } + int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr); + if (port_id < 0) { + return port_id; + } - ret = dpdk_ethdev_start(); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n"); - stack_group->eth_params = NULL; - free(eth_params); - return ret; - } + struct rte_eth_dev_info dev_info; + int32_t ret = rte_eth_dev_info_get(port_id, &dev_info); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret); + return ret; + } + + int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues); + if (max_queues < nb_queues) { + LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues); + return -EINVAL; + } - if (rss_enable) { - rss_setup(port_id, nb_queues); - stack_group->reta_mask = dev_info.reta_size - 1; + struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues); + if (eth_params == NULL) { + return -ENOMEM; + } + eth_params_checksum(ð_params->conf, &dev_info); + int32_t rss_enable = 0; + if (use_ltran()) { + rss_enable = eth_params_rss(ð_params->conf, &dev_info); + } + stack_group->eth_params = eth_params; + stack_group->port_id = eth_params->port_id; + stack_group->rx_offload = eth_params->conf.rxmode.offloads; + stack_group->tx_offload = eth_params->conf.txmode.offloads; + + if (get_global_cfg_params()->is_primary) { + for (uint32_t i = 0; i < stack_group->stack_num; i++) { + struct protocol_stack *stack = stack_group->stacks[i]; + if (likely(stack)) { + stack->port_id = stack_group->port_id; + } else { + LSTACK_LOG(ERR, LSTACK, "empty stack at stack_num %d\n", i); + stack_group->eth_params = NULL; + free(eth_params); + return -EINVAL; + } + } + + ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, ð_params->conf); + if (ret < 0) { + LSTACK_LOG(ERR, LSTACK, "cannot config eth dev at port %d: %s\n", port_id, rte_strerror(-ret)); + stack_group->eth_params = NULL; + free(eth_params); + return ret; + } + + ret = dpdk_ethdev_start(); + if (ret < 0) { + LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n"); + stack_group->eth_params = NULL; + free(eth_params); + return ret; + } + + if (rss_enable && use_ltran()) { + rss_setup(port_id, nb_queues); + stack_group->reta_mask = dev_info.reta_size - 1; + } + stack_group->nb_queues = nb_queues; } - stack_group->nb_queues = nb_queues; return 0; } -static int32_t dpdk_ethdev_setup(const struct eth_params *eth_params, const struct protocol_stack *stack) +static int32_t dpdk_ethdev_setup(const struct eth_params *eth_params, uint16_t idx) { int32_t ret; - ret = rte_eth_rx_queue_setup(eth_params->port_id, stack->queue_id, eth_params->nb_rx_desc, stack->socket_id, - ð_params->rx_conf, stack->rxtx_pktmbuf_pool); + struct rte_mempool *rxtx_pktmbuf_pool = get_protocol_stack_group()->total_rxtx_pktmbuf_pool[idx]; + + uint16_t socket_id = 0; + struct cfg_params * cfg = get_global_cfg_params(); + if (!cfg->use_ltran && cfg->num_process == 1) { + socket_id = numa_node_of_cpu(cfg->cpus[idx]); + }else { + socket_id = cfg->process_numa[idx]; + } + ret = rte_eth_rx_queue_setup(eth_params->port_id, idx, eth_params->nb_rx_desc, socket_id, + ð_params->rx_conf, rxtx_pktmbuf_pool); if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "cannot setup rx_queue %hu: %s\n", stack->queue_id, rte_strerror(-ret)); + LSTACK_LOG(ERR, LSTACK, "cannot setup rx_queue %hu: %s\n", idx, rte_strerror(-ret)); return -1; } - ret = rte_eth_tx_queue_setup(eth_params->port_id, stack->queue_id, eth_params->nb_tx_desc, stack->socket_id, + ret = rte_eth_tx_queue_setup(eth_params->port_id, idx, eth_params->nb_tx_desc, socket_id, ð_params->tx_conf); if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "cannot setup tx_queue %hu: %s\n", stack->queue_id, rte_strerror(-ret)); + LSTACK_LOG(ERR, LSTACK, "cannot setup tx_queue %hu: %s\n", idx, rte_strerror(-ret)); return -1; } @@ -505,12 +551,9 @@ int32_t dpdk_ethdev_start(void) { int32_t ret; const struct protocol_stack_group *stack_group = get_protocol_stack_group(); - const struct protocol_stack *stack = NULL; - - for (int32_t i = 0; i < stack_group->stack_num; i++) { - stack = stack_group->stacks[i]; - ret = dpdk_ethdev_setup(stack_group->eth_params, stack); + for (int32_t i = 0; i < get_global_cfg_params()->tot_queue_num; i++) { + ret = dpdk_ethdev_setup(stack_group->eth_params, i); if (ret < 0) { return ret; } @@ -529,6 +572,7 @@ int32_t dpdk_init_lstack_kni(void) { struct protocol_stack_group *stack_group = get_protocol_stack_group(); + stack_group->kni_pktmbuf_pool = create_pktmbuf_mempool("kni_mbuf", KNI_NB_MBUF, 0, 0); if (stack_group->kni_pktmbuf_pool == NULL) { return -1; @@ -568,7 +612,7 @@ int32_t init_dpdk_ethdev(void) return -1; } - if (get_global_cfg_params()->kni_switch) { + if (get_global_cfg_params()->kni_switch && get_global_cfg_params()->is_primary) { ret = dpdk_init_lstack_kni(); if (ret < 0) { return -1; diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c index 34b2c0d..e8fa0dc 100644 --- a/src/lstack/core/lstack_init.c +++ b/src/lstack/core/lstack_init.c @@ -356,6 +356,8 @@ __attribute__((constructor)) void gazelle_network_init(void) } } + // @todo, check process 2 dumped, resorce need to release. + gazelle_signal_init(); /* diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 300d7af..48eff1d 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -13,13 +13,14 @@ #include #include +#include + #include #include #include #include #include #include -#include #include #include @@ -29,6 +30,7 @@ #include "lstack_log.h" #include "lstack_dpdk.h" #include "lstack_ethdev.h" +#include "lstack_vdev.h" #include "lstack_lwip.h" #include "lstack_protocol_stack.h" #include "lstack_cfg.h" @@ -79,6 +81,28 @@ struct protocol_stack_group *get_protocol_stack_group(void) return &g_stack_group; } +int get_min_conn_stack(struct protocol_stack_group *stack_group){ + int min_conn_stk_idx = 0; + int min_conn_num = GAZELLE_MAX_CLIENTS; + for (int i = 0; i < stack_group->stack_num; i++) { + struct protocol_stack* stack = stack_group->stacks[i]; + if (get_global_cfg_params()->seperate_send_recv) { + if (!stack->is_send_thread && stack->conn_num < min_conn_num) { + min_conn_stk_idx = i; + min_conn_num = stack->conn_num; + } + }else { + if (stack->conn_num < min_conn_num) { + min_conn_stk_idx = i; + min_conn_num = stack->conn_num; + } + } + + } + return min_conn_stk_idx; + +} + struct protocol_stack *get_protocol_stack(void) { return g_stack_p; @@ -105,22 +129,23 @@ struct protocol_stack *get_bind_protocol_stack(void) struct protocol_stack_group *stack_group = get_protocol_stack_group(); uint16_t index = 0; + int min_conn_num = GAZELLE_MAX_CLIENTS; /* close listen shadow, per app communication thread select only one stack */ - if (get_global_cfg_params()->listen_shadow == false) { - static _Atomic uint16_t stack_index = 0; - index = atomic_fetch_add(&stack_index, 1); - if (index >= stack_group->stack_num) { - LSTACK_LOG(ERR, LSTACK, "thread =%hu larger than stack num = %hu\n", index, stack_group->stack_num); - return NULL; - } - /* use listen shadow, app communication thread maybe more than stack num, select the least load stack */ - } else { - for (uint16_t i = 1; i < stack_group->stack_num; i++) { - if (stack_group->stacks[i]->conn_num < stack_group->stacks[index]->conn_num) { + for (uint16_t i = 0; i < stack_group->stack_num; i++) { + struct protocol_stack* stack = stack_group->stacks[i]; + if (get_global_cfg_params()->seperate_send_recv) { + if (stack->is_send_thread && stack->conn_num < min_conn_num) { index = i; + min_conn_num = stack->conn_num; + } + }else { + if (stack->conn_num < min_conn_num) { + index = i; + min_conn_num = stack->conn_num; } } + } bind_stack = stack_group->stacks[index]; @@ -180,27 +205,35 @@ void low_power_idling(struct protocol_stack *stack) } } -static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_func func) +static int32_t create_thread(void *arg, char *thread_name, stack_thread_func func) { /* thread may run slow, if arg is temp var maybe have relese */ - static uint16_t queue[PROTOCOL_STACK_MAX]; char name[PATH_MAX]; pthread_t tid; int32_t ret; + struct thread_params *t_params = (struct thread_params*) arg; - if (queue_id >= PROTOCOL_STACK_MAX) { - LSTACK_LOG(ERR, LSTACK, "queue_id is %hu exceed max=%d\n", queue_id, PROTOCOL_STACK_MAX); + if (t_params->queue_id >= PROTOCOL_STACK_MAX) { + LSTACK_LOG(ERR, LSTACK, "queue_id is %hu exceed max=%d\n", t_params->queue_id, PROTOCOL_STACK_MAX); return -1; } - queue[queue_id] = queue_id; - ret = sprintf_s(name, sizeof(name), "%s%02hu", thread_name, queue[queue_id]); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "set name failed\n"); - return -1; + if (get_global_cfg_params()->seperate_send_recv){ + ret = sprintf_s(name, sizeof(name), "%s", thread_name); + if (ret < 0) { + LSTACK_LOG(ERR, LSTACK, "set name failed\n"); + return -1; + } + + }else { + ret = sprintf_s(name, sizeof(name), "%s%02hu", thread_name, t_params->queue_id); + if (ret < 0) { + LSTACK_LOG(ERR, LSTACK, "set name failed\n"); + return -1; + } } - ret = pthread_create(&tid, NULL, func, &queue[queue_id]); + ret = pthread_create(&tid, NULL, func, arg); if (ret != 0) { LSTACK_LOG(ERR, LSTACK, "pthread_create ret=%d\n", ret); return -1; @@ -221,7 +254,7 @@ static void* gazelle_wakeup_thread(void *arg) struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id]; struct cfg_params *cfg = get_global_cfg_params(); - int32_t lcore_id = cfg->wakeup[stack->queue_id]; + int32_t lcore_id = cfg->wakeup[stack->stack_idx]; thread_affinity_init(lcore_id); struct timespec st = { @@ -252,12 +285,13 @@ static void* gazelle_wakeup_thread(void *arg) static void* gazelle_kernelevent_thread(void *arg) { - uint16_t queue_id = *(uint16_t *)arg; - struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id]; + struct thread_params *t_params = (struct thread_params*) arg; + uint16_t idx = t_params->idx; + struct protocol_stack *stack = get_protocol_stack_group()->stacks[idx]; bind_to_stack_numa(stack); - LSTACK_LOG(INFO, LSTACK, "kernelevent_%02hu start\n", queue_id); + LSTACK_LOG(INFO, LSTACK, "kernelevent_%02hu start\n", idx); for (;;) { stack->kernel_event_num = posix_api->epoll_wait_fn(stack->epollfd, stack->kernel_events, KERNEL_EPOLL_MAX, -1); @@ -269,13 +303,14 @@ static void* gazelle_kernelevent_thread(void *arg) return NULL; } -static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id) +static int32_t init_stack_value(struct protocol_stack *stack, void *arg) { + struct thread_params *t_params = (struct thread_params*) arg; struct protocol_stack_group *stack_group = get_protocol_stack_group(); stack->tid = rte_gettid(); - stack->queue_id = queue_id; - stack->cpu_id = get_global_cfg_params()->cpus[queue_id]; + stack->queue_id = t_params->queue_id; + stack->stack_idx = t_params->idx; stack->lwip_stats = &lwip_stats; init_list_node(&stack->recv_list); @@ -284,14 +319,27 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id) sys_calibrate_tsc(); stack_stat_init(); - stack_group->stacks[queue_id] = stack; - set_stack_idx(queue_id); + stack_group->stacks[t_params->idx] = stack; + set_stack_idx(t_params->idx); stack->epollfd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN); if (stack->epollfd < 0) { return -1; } + int idx = t_params->idx; + if (get_global_cfg_params()->seperate_send_recv) { + if (idx % 2 == 0) { + stack->cpu_id = get_global_cfg_params()->recv_cpus[idx/2]; + stack->is_send_thread = 0; + }else { + stack->cpu_id = get_global_cfg_params()->send_cpus[idx/2]; + stack->is_send_thread = 1; + } + }else { + stack->cpu_id = get_global_cfg_params()->cpus[idx]; + } + stack->socket_id = numa_node_of_cpu(stack->cpu_id); if (stack->socket_id < 0) { LSTACK_LOG(ERR, LSTACK, "numa_node_of_cpu failed\n"); @@ -317,16 +365,17 @@ void wait_sem_value(sem_t *sem, int32_t wait_value) } while (sem_val < wait_value); } -static int32_t create_affiliate_thread(uint16_t queue_id, bool wakeup_enable) +static int32_t create_affiliate_thread(void *arg, bool wakeup_enable) { + if (wakeup_enable) { - if (create_thread(queue_id, "gazelleweakup", gazelle_wakeup_thread) != 0) { + if (create_thread(arg, "gazelleweakup", gazelle_wakeup_thread) != 0) { LSTACK_LOG(ERR, LSTACK, "gazelleweakup errno=%d\n", errno); return -1; } } - if (create_thread(queue_id, "gazellekernel", gazelle_kernelevent_thread) != 0) { + if (create_thread(arg, "gazellekernel", gazelle_kernelevent_thread) != 0) { LSTACK_LOG(ERR, LSTACK, "gazellekernel errno=%d\n", errno); return -1; } @@ -334,10 +383,9 @@ static int32_t create_affiliate_thread(uint16_t queue_id, bool wakeup_enable) return 0; } -static struct protocol_stack *stack_thread_init(uint16_t queue_id) +static struct protocol_stack *stack_thread_init(void *arg) { - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - + struct protocol_stack_group *stack_group = get_protocol_stack_group(); struct protocol_stack *stack = calloc(1, sizeof(*stack)); if (stack == NULL) { LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n"); @@ -345,14 +393,14 @@ static struct protocol_stack *stack_thread_init(uint16_t queue_id) return NULL; } - if (init_stack_value(stack, queue_id) != 0) { + if (init_stack_value(stack, arg) != 0) { goto END; } if (init_stack_numa_cpuset(stack) < 0) { goto END; } - if (create_affiliate_thread(queue_id, stack_group->wakeup_enable) < 0) { + if (create_affiliate_thread(arg, stack_group->wakeup_enable) < 0) { goto END; } @@ -402,19 +450,22 @@ static void wakeup_kernel_event(struct protocol_stack *stack) } __atomic_store_n(&wakeup->have_kernel_event, true, __ATOMIC_RELEASE); - if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) { - list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]); + if (list_is_null(&wakeup->wakeup_list[stack->stack_idx])) { + list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->stack_idx]); } } stack->kernel_event_num = 0; } + static void* gazelle_stack_thread(void *arg) { - uint16_t queue_id = *(uint16_t *)arg; + struct thread_params *t_params = (struct thread_params*) arg; + + uint16_t queue_id = t_params->queue_id; struct cfg_params *cfg = get_global_cfg_params(); - bool use_ltran_flag = cfg->use_ltran;; + uint8_t use_ltran_flag = cfg->use_ltran;; bool kni_switch = cfg->kni_switch; uint32_t read_connect_number = cfg->read_connect_number; uint32_t rpc_number = cfg->rpc_number; @@ -424,7 +475,8 @@ static void* gazelle_stack_thread(void *arg) struct protocol_stack_group *stack_group = get_protocol_stack_group(); bool wakeup_thread_enable = stack_group->wakeup_enable; - struct protocol_stack *stack = stack_thread_init(queue_id); + struct protocol_stack *stack = stack_thread_init(arg); + if (stack == NULL) { /* exit in main thread, avoid create mempool and exit at the same time */ set_init_fail(); @@ -432,8 +484,12 @@ static void* gazelle_stack_thread(void *arg) LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%hu\n", queue_id); return NULL; } + if (!use_ltran() && queue_id == 0) { + init_listen_and_user_ports(); + } sem_post(&stack_group->all_init); + LSTACK_LOG(INFO, LSTACK, "stack_%02hu init success\n", queue_id); for (;;) { @@ -452,6 +508,7 @@ static void* gazelle_stack_thread(void *arg) * so processing KNI requests only in the thread with queue_id No.0 is sufficient. */ if (kni_switch && !queue_id && !(wakeup_tick & 0xfff)) { rte_kni_handle_request(get_gazelle_kni()); + kni_handle_rx(get_port_id()); } wakeup_tick++; @@ -466,6 +523,11 @@ static void* gazelle_stack_thread(void *arg) return NULL; } +static void libnet_listen_thread(void *arg){ + struct cfg_params * cfg_param = get_global_cfg_params(); + recv_pkts_from_other_process(cfg_param->process_idx, arg); +} + static int32_t init_protocol_sem(void) { int32_t ret; @@ -498,8 +560,14 @@ int32_t init_protocol_stack(void) { struct protocol_stack_group *stack_group = get_protocol_stack_group(); int32_t ret; + char name[PATH_MAX]; + + if (!get_global_cfg_params()->seperate_send_recv) { + stack_group->stack_num = get_global_cfg_params()->num_cpu; + }else { + stack_group->stack_num = get_global_cfg_params()->num_cpu * 2; + } - stack_group->stack_num = get_global_cfg_params()->num_cpu; stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false; init_list_node(&stack_group->poll_list); pthread_spin_init(&stack_group->poll_list_lock, PTHREAD_PROCESS_PRIVATE); @@ -508,9 +576,43 @@ int32_t init_protocol_stack(void) if (init_protocol_sem() != 0) { return -1; } + int queue_num = get_global_cfg_params()->num_queue; + struct thread_params *t_params[queue_num]; + int process_index = get_global_cfg_params()->process_idx; + + if (get_global_cfg_params()->is_primary) { + for (uint16_t idx = 0; idx < get_global_cfg_params()->tot_queue_num; idx++) { + struct rte_mempool* rxtx_mbuf = create_pktmbuf_mempool("rxtx_mbuf", + get_global_cfg_params()->mbuf_count_per_conn * get_global_cfg_params()->tcp_conn_count / stack_group->stack_num, RXTX_CACHE_SZ, idx); + get_protocol_stack_group()->total_rxtx_pktmbuf_pool[idx] = rxtx_mbuf; + } + } - for (uint32_t i = 0; i < stack_group->stack_num; i++) { - ret = create_thread(i, "gazellestack", gazelle_stack_thread); + for (uint32_t i = 0; i < queue_num; i++) { + if (get_global_cfg_params()->seperate_send_recv) { + if (i % 2 == 0) { + ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_RECV_THREAD_NAME, process_index, i/2); + if (ret < 0) { + return -1; + } + }else { + ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_SEND_THREAD_NAME, process_index, i/2); + if (ret < 0) { + return -1; + } + } + }else { + ret = sprintf_s(name, sizeof(name), "%s", LSTACK_THREAD_NAME); + if (ret < 0) { + return -1; + } + } + + t_params[i] = malloc(sizeof(struct thread_params)); + t_params[i]->idx = i; + t_params[i]->queue_id = process_index * queue_num + i; + + ret = create_thread((void *)t_params[i], name, gazelle_stack_thread); if (ret != 0) { return ret; } @@ -518,6 +620,20 @@ int32_t init_protocol_stack(void) wait_sem_value(&stack_group->thread_phase1, stack_group->stack_num); + for(int idx = 0; idx < queue_num; idx++){ + free(t_params[idx]); + } + + if (!use_ltran()) { + ret = sem_init(&stack_group->sem_listen_thread, 0, 0); + ret = sprintf_s(name, sizeof(name), "%s", "listen_thread"); + struct sys_thread *thread = sys_thread_new(name, libnet_listen_thread, (void*)(&stack_group->sem_listen_thread), 0, 0); + free(thread); + sem_wait(&stack_group->sem_listen_thread); + + create_flow_rule_map(); + } + if (get_init_fail()) { return -1; } @@ -684,7 +800,7 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack for (int32_t i = 0; i < stack_group->stack_num; i++) { stack = stack_group->stacks[i]; - if (cur_stack == stack) { + if (cur_stack == stack && use_ltran()) { continue; } @@ -718,7 +834,7 @@ void stack_clean_epoll(struct rpc_msg *msg) struct protocol_stack *stack = get_protocol_stack(); struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p; - list_del_node_null(&wakeup->wakeup_list[stack->queue_id]); + list_del_node_null(&wakeup->wakeup_list[stack->stack_idx]); } /* when fd is listenfd, listenfd of all protocol stack thread will be closed */ @@ -769,8 +885,13 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) } struct protocol_stack_group *stack_group = get_protocol_stack_group(); + int min_conn_stk_idx = get_min_conn_stack(stack_group); + for (int32_t i = 0; i < stack_group->stack_num; ++i) { stack = stack_group->stacks[i]; + if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) { + continue; + } if (stack != cur_stack) { clone_fd = rpc_call_shadow_fd(stack, fd, &addr, sizeof(addr)); if (clone_fd < 0) { @@ -781,6 +902,12 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) clone_fd = fd; } + if (min_conn_stk_idx == i) { + get_socket_by_fd(clone_fd)->conn->is_master_fd = 1; + }else { + get_socket_by_fd(clone_fd)->conn->is_master_fd = 0; + } + ret = rpc_call_listen(clone_fd, backlog); if (ret < 0) { stack_broadcast_close(fd); diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h index 2705fee..942c0b7 100644 --- a/src/lstack/include/lstack_cfg.h +++ b/src/lstack/include/lstack_cfg.h @@ -65,6 +65,8 @@ struct cfg_params { uint8_t mac_addr[ETHER_ADDR_LEN]; uint16_t num_cpu; uint32_t cpus[CFG_MAX_CPUS]; + uint32_t send_cpus[CFG_MAX_CPUS]; + uint32_t recv_cpus[CFG_MAX_CPUS]; uint16_t num_wakeup; uint32_t wakeup[CFG_MAX_CPUS]; uint8_t num_ports; @@ -79,11 +81,22 @@ struct cfg_params { uint32_t read_connect_number; uint32_t rpc_number; uint32_t nic_read_number; - bool use_ltran; // ture:lstack read from nic false:read form ltran + uint8_t use_ltran; // ture:lstack read from nic false:read form ltran + + uint16_t num_process; + uint16_t num_listen_port; + uint16_t port_id; + uint16_t is_primary; + uint16_t num_queue; + uint16_t tot_queue_num; + uint8_t process_idx; + uint32_t process_numa[PROTOCOL_STACK_MAX]; + bool kni_switch; bool listen_shadow; // true:listen in all stack thread. false:listen in one stack thread. bool app_bind_numa; bool main_thread_affinity; + bool seperate_send_recv; int dpdk_argc; char **dpdk_argv; struct secondary_attach_arg sec_attach_arg; @@ -94,7 +107,7 @@ struct cfg_params { struct cfg_params *get_global_cfg_params(void); -static inline bool use_ltran(void) +static inline uint8_t use_ltran(void) { return get_global_cfg_params()->use_ltran; } diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h index c3bc527..55ca7a1 100644 --- a/src/lstack/include/lstack_dpdk.h +++ b/src/lstack/include/lstack_dpdk.h @@ -51,5 +51,6 @@ void dpdk_skip_nic_init(void); int32_t dpdk_init_lstack_kni(void); void dpdk_restore_pci(void); bool port_in_stack_queue(uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); - +uint16_t get_port_id(); +struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,uint32_t mbuf_cache_size, uint16_t queue_id); #endif /* GAZELLE_DPDK_H */ diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h index 0b53cde..a690adb 100644 --- a/src/lstack/include/lstack_ethdev.h +++ b/src/lstack/include/lstack_ethdev.h @@ -13,6 +13,19 @@ #ifndef __GAZELLE_ETHDEV_H__ #define __GAZELLE_ETHDEV_H__ +#define INVAILD_PROCESS_IDX 255 + +enum port_type { + PORT_LISTEN, + PORT_CONNECT, +}; + +enum PACKET_TRANSFER_TYPE{ + TRANSFER_KERNEL = -1, + TRANSFER_OTHER_THREAD, + TRANSFER_CURRENT_THREAD, +}; + struct protocol_stack; struct rte_mbuf; struct lstack_dev_ops { @@ -22,7 +35,15 @@ struct lstack_dev_ops { int32_t ethdev_init(struct protocol_stack *stack); int32_t eth_dev_poll(void); -int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag, uint32_t nic_read_number); +int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number); void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack); +int recv_pkts_from_other_process(int process_index, void* arg); +void create_flow_rule_map(); +void kni_handle_rx(uint16_t port_id); +void delete_user_process_port(uint16_t dst_port, enum port_type type); +void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type); +void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); +void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); + #endif /* __GAZELLE_ETHDEV_H__ */ diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index 11b001c..b5d3f7d 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -41,6 +41,7 @@ struct protocol_stack { uint16_t port_id; uint16_t socket_id; uint16_t cpu_id; + uint32_t stack_idx; cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */ int32_t epollfd; /* kernel event thread epoll fd */ @@ -53,6 +54,8 @@ struct protocol_stack { uint32_t reg_head; volatile bool low_power; + bool is_send_thread; + lockless_queue rpc_queue __rte_cache_aligned; char pad __rte_cache_aligned; @@ -93,6 +96,8 @@ struct protocol_stack_group { bool wakeup_enable; struct list_node poll_list; pthread_spinlock_t poll_list_lock; + sem_t sem_listen_thread; + struct rte_mempool *total_rxtx_pktmbuf_pool[PROTOCOL_STACK_MAX]; /* dfx stats */ bool latency_start; @@ -131,6 +136,10 @@ void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup); void stack_send_pkts(struct protocol_stack *stack); struct rpc_msg; +struct thread_params{ + uint16_t queue_id; + uint16_t idx; +}; void stack_clean_epoll(struct rpc_msg *msg); void stack_arp(struct rpc_msg *msg); void stack_socket(struct rpc_msg *msg); @@ -146,4 +155,5 @@ void stack_getsockopt(struct rpc_msg *msg); void stack_setsockopt(struct rpc_msg *msg); void stack_fcntl(struct rpc_msg *msg); void stack_ioctl(struct rpc_msg *msg); +void kni_handle_tx(struct rte_mbuf *mbuf); #endif diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h index 0693c4d..0995277 100644 --- a/src/lstack/include/lstack_vdev.h +++ b/src/lstack/include/lstack_vdev.h @@ -21,4 +21,11 @@ enum reg_ring_type; void vdev_dev_ops_init(struct lstack_dev_ops *dev_ops); int vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple); +int recv_pkts_from_other_process(int process_index, void* arg); +void create_flow_rule_map(); +void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); +void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); +void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add); +void init_listen_and_user_ports(); + #endif /* _GAZELLE_VDEV_H_ */ diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf index cf81954..389a81c 100644 --- a/src/lstack/lstack.conf +++ b/src/lstack/lstack.conf @@ -49,3 +49,7 @@ host_addr="192.168.1.10" mask_addr="255.255.255.0" gateway_addr="192.168.1.1" devices="aa:bb:cc:dd:ee:ff" + +num_process=2 +process_numa="0,1" +process_idx=0 diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c index 1441f64..60ea897 100644 --- a/src/lstack/netif/lstack_ethdev.c +++ b/src/lstack/netif/lstack_ethdev.c @@ -10,14 +10,22 @@ * See the Mulan PSL v2 for more details. */ +#include +#include + +#include #include #include #include #include +#include #include +#include "lwip/tcp.h" +#include #include +#include #include "lstack_cfg.h" #include "lstack_vdev.h" @@ -28,9 +36,33 @@ #include "dpdk_common.h" #include "lstack_protocol_stack.h" #include "lstack_ethdev.h" +#include "lstack_thread_rpc.h" /* FRAME_MTU + 14byte header */ #define MBUF_MAX_LEN 1514 +#define MAX_PATTERN_NUM 4 +#define MAX_ACTION_NUM 2 +#define FULL_MASK 0xffffffff /* full mask */ +#define EMPTY_MASK 0x0 /* empty mask */ +#define LSTACK_MBUF_LEN 64 +#define TRANSFER_TCP_MUBF_LEN LSTACK_MBUF_LEN + 3 +#define DELETE_FLOWS_PARAMS_NUM 3 +#define DELETE_FLOWS_PARAMS_LENGTH 30 +#define CREATE_FLOWS_PARAMS_NUM 6 +#define CREATE_FLOWS_PARAMS_LENGTH 60 +#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25 +#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3 +#define REPLY_LEN 10 +#define SUCCESS_REPLY "success" +#define ERROR_REPLY "error" +#define PACKET_READ_SIZE 32 + +char *client_path = "/var/run/gazelle/client.socket"; +char *server_path = "/var/run/gazelle/server.socket"; +const char *split_delim = ","; + +uint8_t g_user_ports[65535] = {INVAILD_PROCESS_IDX,}; +uint8_t g_listen_ports[65535] = {INVAILD_PROCESS_IDX,}; void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack) { @@ -45,6 +77,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack) struct rte_mbuf *next_m = NULL; pkt_len = (uint16_t)rte_pktmbuf_pkt_len(m); + while (m != NULL) { len = (uint16_t)rte_pktmbuf_data_len(m); payload = rte_pktmbuf_mtod(m, void *); @@ -82,6 +115,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack) } } + int32_t eth_dev_poll(void) { uint32_t nr_pkts; @@ -115,8 +149,507 @@ int32_t eth_dev_poll(void) return nr_pkts; } +void init_listen_and_user_ports(){ + memset(g_user_ports, INVAILD_PROCESS_IDX, sizeof(g_user_ports)); + memset(g_listen_ports, INVAILD_PROCESS_IDX, sizeof(g_listen_ports)); +} + +int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply) +{ + /* other process queue_id */ + struct sockaddr_un serun; + int sockfd; + int ret = 0; + + if ((sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) { + return -1; + } + + memset(&serun, 0, sizeof(serun)); + serun.sun_family = AF_UNIX; + sprintf_s(serun.sun_path, PATH_MAX,"%s%d", server_path, process_index); + int len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path); + if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0){ + return -1; + } + posix_api->write_fn(sockfd, buf, write_len); + if (need_reply) { + char reply_message[REPLY_LEN]; + posix_api->read_fn(sockfd, reply_message, REPLY_LEN); + if (strcmp(reply_message, SUCCESS_REPLY) == 0) { + ret = 0; + }else { + ret = -1; + } + } + posix_api->close_fn(sockfd); + + return ret; +} + +struct rte_flow * +create_flow_director(uint16_t port_id, uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, + uint16_t src_port, uint16_t dst_port, struct rte_flow_error *error) +{ + struct rte_flow_attr attr; + struct rte_flow_item pattern[MAX_PATTERN_NUM]; + struct rte_flow_action action[MAX_ACTION_NUM]; + struct rte_flow *flow = NULL; + struct rte_flow_action_queue queue = { .index = queue_id }; + struct rte_flow_item_ipv4 ip_spec; + struct rte_flow_item_ipv4 ip_mask; + + struct rte_flow_item_tcp tcp_spec; + struct rte_flow_item_tcp tcp_mask; + int res; + + memset(pattern, 0, sizeof(pattern)); + memset(action, 0, sizeof(action)); + + /* + * set the rule attribute. + * in this case only ingress packets will be checked. + */ + memset(&attr, 0, sizeof(struct rte_flow_attr)); + attr.ingress = 1; + + /* + * create the action sequence. + * one action only, move packet to queue + */ + action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[0].conf = &queue; + action[1].type = RTE_FLOW_ACTION_TYPE_END; + + // not limit eth header + pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; + + // ip header + memset(&ip_spec, 0, sizeof(struct rte_flow_item_ipv4)); + memset(&ip_mask, 0, sizeof(struct rte_flow_item_ipv4)); + ip_spec.hdr.dst_addr = dst_ip; + ip_mask.hdr.dst_addr = FULL_MASK; + ip_spec.hdr.src_addr = src_ip; + ip_mask.hdr.src_addr = FULL_MASK; + pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; + pattern[1].spec = &ip_spec; + pattern[1].mask = &ip_mask; + + // tcp header, full mask 0xffff + memset(&tcp_spec, 0, sizeof(struct rte_flow_item_tcp)); + memset(&tcp_mask, 0, sizeof(struct rte_flow_item_tcp)); + pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP; + tcp_spec.hdr.src_port = src_port; + tcp_spec.hdr.dst_port = dst_port; + tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port; + tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port; + pattern[2].spec = &tcp_spec; + pattern[2].mask = &tcp_mask; + + /* the final level must be always type end */ + pattern[3].type = RTE_FLOW_ITEM_TYPE_END; + res = rte_flow_validate(port_id, &attr, pattern, action, error); + if (!res){ + flow = rte_flow_create(port_id, &attr, pattern, action, error); + }else { + LSTACK_LOG(ERR, PORT,"rte_flow_create.rte_flow_validate error, res %d \n", res); + } + + return flow; +} + +void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port){ + + uint16_t port_id = get_port_id(); + + LSTACK_LOG(INFO, LSTACK, "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs :%u \n", + queue_id, src_ip,ntohs(src_port), ntohs(dst_port) ); + + struct rte_flow_error error; + struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error); + if (!flow) { + LSTACK_LOG(ERR, LSTACK,"config_flow_director, flow can not be created. queue_id %u, src_ip %u, src_port %u, dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n", + queue_id, src_ip,src_port,dst_port,ntohs(dst_port), error.type, error.message ? error.message : "(no stated reason)"); + return; + } +} + +void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) +{ + uint16_t port_id = get_port_id(); + (void)port_id; +} + +/* + * delete flows + * if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0. + */ +void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) +{ + if (get_global_cfg_params()->is_primary){ + delete_flow_director(dst_ip, src_port, dst_port); + }else { + char process_server_path[DELETE_FLOWS_PARAMS_LENGTH]; + sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u", dst_ip,split_delim, src_port,split_delim,dst_port); + int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false); + if(ret != 0){ + LSTACK_LOG(ERR, LSTACK,"transfer_delete_rule_info_to_process0 error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n", + rte_gettid(), dst_ip, src_port, dst_port); + } + } +} + +/* + * add flows + * if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0. + */ +void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) +{ + char process_server_path[CREATE_FLOWS_PARAMS_LENGTH]; + /* exchage src_ip and dst_ip, src_port and dst_port */ + uint8_t process_idx = get_global_cfg_params()->process_idx; + sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u", + dst_ip,split_delim,src_ip,split_delim, dst_port,split_delim,src_port, split_delim,queue_id,split_delim,process_idx); + int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true); + if(ret != 0){ + LSTACK_LOG(ERR, LSTACK,"transfer_create_rule_info_to_process0 error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u, queue_id %u, process_idx %u\n", + rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx); + } +} + +void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add) +{ + char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH]; + sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, "%u%s%u%s%u", listen_port,split_delim,process_idx, split_delim, is_add); + int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true); + if(ret != 0){ + LSTACK_LOG(ERR, LSTACK,"transfer_add_or_delete_listen_port_to_process0 error. tid %d. listen_port %u, process_idx %u\n", + rte_gettid(), listen_port, process_idx); + } +} + +static int str_to_array(char *args, uint32_t *array, int size) +{ + int val; + uint16_t cnt = 0; + char *elem = NULL; + char *next_token = NULL; + + memset(array, 0, sizeof(*array) * size); + elem = strtok_s((char *)args, split_delim, &next_token); + while (elem != NULL) { + if (cnt >= size) { + return -1; + } + val = atoi(elem); + if (val < 0) { + return -1; + } + array[cnt] = (uint32_t)val; + cnt++; + + elem = strtok_s(NULL, split_delim, &next_token); + } + + return cnt; +} + +void parse_and_delete_rule(char* buf) +{ + uint32_t array[DELETE_FLOWS_PARAMS_NUM]; + str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM); + uint32_t dst_ip = array[0]; + uint16_t src_port = array[1]; + uint16_t dst_port = array[2]; + delete_flow_director(dst_ip, src_port, dst_port); +} + +void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type){ + if (type == PORT_LISTEN) { + g_listen_ports[dst_port] = process_idx; + }else { + g_user_ports[dst_port] = process_idx; + } +} + +void delete_user_process_port(uint16_t dst_port, enum port_type type){ + if (type == PORT_LISTEN) { + g_listen_ports[dst_port] = INVAILD_PROCESS_IDX; + }else { + g_user_ports[dst_port] = INVAILD_PROCESS_IDX; + } +} + +void parse_and_create_rule(char* buf) +{ + uint32_t array[CREATE_FLOWS_PARAMS_NUM]; + str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM); + uint32_t src_ip = array[0]; + uint32_t dst_ip = array[1]; + uint16_t src_port = array[2]; + uint16_t dst_port = array[3]; + uint16_t queue_id = array[4]; + uint8_t process_idx = array[5]; + config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port); + add_user_process_port(dst_port, process_idx, PORT_CONNECT); +} + +void parse_and_add_or_delete_listen_port(char* buf) +{ + uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM]; + str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM); + uint16_t listen_port = array[0]; + uint8_t process_idx = array[1]; + uint8_t is_add = array[2]; + if (is_add == 1) { + add_user_process_port(listen_port,process_idx, PORT_LISTEN); + }else { + delete_user_process_port(listen_port, PORT_LISTEN); + } + +} + +void transfer_arp_to_other_process(struct rte_mbuf *mbuf) +{ + struct cfg_params *cfgs = get_global_cfg_params(); + + for(int i = 1; i < cfgs->num_process; i++){ + char arp_mbuf[LSTACK_MBUF_LEN] = {0}; + sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf); + int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false); + if(result < 0){ + LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. \n", i); + } + } +} + +void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx) +{ + /* current process queue_id */ + struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx]; + int ret = -1; + while(ret != 0) { + ret = rpc_call_arp(stack, mbuf); + printf("transfer_tcp_to_thread, ret : %d \n", ret); + } +} + +void parse_arp_and_transefer(char* buf) +{ + struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf); + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + struct rte_mbuf *mbuf_copy = NULL; + struct protocol_stack *stack = NULL; + int32_t ret; + for (int32_t i = 0; i < stack_group->stack_num; i++) { + stack = stack_group->stacks[i]; + ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1); + while (ret != 0) { + ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1); + stack->stats.rx_allocmbuf_fail++; + } + copy_mbuf(mbuf_copy, mbuf); + + ret = rpc_call_arp(stack, mbuf_copy); + + while (ret != 0) { + rpc_call_arp(stack, mbuf_copy);; + } + } +} + +void parse_tcp_and_transefer(char* buf) +{ + char *next_token = NULL; + char *elem = strtok_s(buf, split_delim, &next_token); + struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem); + elem = strtok_s(NULL, split_delim, &next_token); + uint16_t queue_id = atoll(elem); + + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + uint16_t num_queue = get_global_cfg_params()->num_queue; + uint16_t stk_index = queue_id % num_queue; + struct rte_mbuf *mbuf_copy = NULL; + struct protocol_stack *stack = stack_group->stacks[stk_index]; + + int32_t ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1); + while (ret != 0) { + ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1); + stack->stats.rx_allocmbuf_fail++; + } + + copy_mbuf(mbuf_copy,mbuf); + + transfer_tcp_to_thread(mbuf_copy, stk_index); +} + +int recv_pkts_from_other_process(int process_index, void* arg){ + struct sockaddr_un serun, cliun; + socklen_t cliun_len; + int listenfd, connfd, size; + char buf[132]; + /* socket */ + if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) { + perror("socket error"); + return -1; + } + /* bind */ + memset(&serun, 0, sizeof(serun)); + serun.sun_family = AF_UNIX; + char process_server_path[PATH_MAX]; + sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", server_path, process_index); + strcpy(serun.sun_path, process_server_path); + size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path); + unlink(process_server_path); + if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) { + perror("bind error"); + return -1; + } + if (posix_api->listen_fn(listenfd, 20) < 0) { + perror("listen error"); + return -1; + } + sem_post((sem_t *)arg); + /* block */ + while(1) { + cliun_len = sizeof(cliun); + if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0){ + perror("accept error"); + continue; + } + while(1) { + int n = posix_api->read_fn(connfd, buf, sizeof(buf)); + if (n < 0) { + perror("read error"); + break; + } else if(n == 0) { + break; + } + + if(n == LSTACK_MBUF_LEN){ + /* arp */ + parse_arp_and_transefer(buf); + }else if(n == TRANSFER_TCP_MUBF_LEN) { + /* tcp. lstack_mbuf_queue_id */ + printf("recv_pkts_from_other_process, process idx %d \n ", process_index); + parse_tcp_and_transefer(buf); + }else if (n == DELETE_FLOWS_PARAMS_LENGTH) { + /* delete rule */ + parse_and_delete_rule(buf); + }else if(n == CREATE_FLOWS_PARAMS_LENGTH){ + /* add rule */ + parse_and_create_rule(buf); + char reply_buf[REPLY_LEN]; + sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY); + posix_api->write_fn(connfd, reply_buf, REPLY_LEN); + }else { + /* add port */ + parse_and_add_or_delete_listen_port(buf); + char reply_buf[REPLY_LEN]; + sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY); + posix_api->write_fn(connfd, reply_buf, REPLY_LEN); + } + + } + posix_api->close_fn(connfd); + } + posix_api->close_fn(listenfd); + return 0; +} + +void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id, char* mbuf_and_queue_id, int write_len){ + + sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf,split_delim,queue_id); +} + +const int32_t ipv4_version_offset = 4; +const int32_t ipv4_version = 4; + +int distribute_pakages(struct rte_mbuf *mbuf) +{ + struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr)); + uint8_t ip_version = (iph->version_ihl & 0xf0) >> ipv4_version_offset; + if (likely(ip_version == ipv4_version)) { + if (likely(iph->next_proto_id == IPPROTO_TCP)) { + int each_process_queue_num = get_global_cfg_params()->num_queue; + + struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *, sizeof(struct rte_ether_hdr) + + sizeof(struct rte_ipv4_hdr)); + uint16_t dst_port = tcp_hdr->dst_port; + + int user_process_idx = (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) ? g_listen_ports[dst_port] : g_user_ports[dst_port]; + + if (user_process_idx == INVAILD_PROCESS_IDX) { + return TRANSFER_KERNEL; + } + if(unlikely(tcp_hdr->tcp_flags == TCP_SYN)){ + uint32_t src_ip = iph->src_addr; + uint16_t src_port = tcp_hdr->src_port; + uint32_t index = rte_jhash_3words(src_ip, src_port | (dst_port) << 16, 0, 0) % each_process_queue_num; + uint16_t queue_id = 0; + if (get_global_cfg_params()->seperate_send_recv) { + queue_id = user_process_idx * each_process_queue_num + (index/2) * 2; + }else { + queue_id = user_process_idx * each_process_queue_num + index; + } + if(queue_id != 0){ + if(user_process_idx == 0){ + transfer_tcp_to_thread(mbuf, queue_id); + }else { + char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN]; + concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN); + transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx, TRANSFER_TCP_MUBF_LEN, false); + } + return TRANSFER_OTHER_THREAD; + }else { + return TRANSFER_CURRENT_THREAD; + } + }else { + return TRANSFER_CURRENT_THREAD; + } + } + }else { + return TRANSFER_KERNEL; + } + return TRANSFER_KERNEL; +} + +void kni_handle_rx(uint16_t port_id) +{ + struct rte_mbuf *pkts_burst[PACKET_READ_SIZE]; + uint32_t nb_kni_rx = rte_kni_rx_burst(get_gazelle_kni(), pkts_burst, PACKET_READ_SIZE); + if (nb_kni_rx > 0) { + uint16_t nb_rx = rte_eth_tx_burst(port_id, 0, pkts_burst, nb_kni_rx); + for (uint16_t i = nb_rx; i < nb_kni_rx; ++i) { + rte_pktmbuf_free(pkts_burst[i]); + } + } + return; +} + +void kni_handle_tx(struct rte_mbuf *mbuf) +{ + if (!get_global_cfg_params()->kni_switch) { + return; + } + struct rte_ipv4_hdr *ipv4_hdr; + uint16_t l3_offset = mbuf->l2_len; + + ipv4_hdr = (struct rte_ipv4_hdr *)(rte_pktmbuf_mtod(mbuf, char*) + + l3_offset); + if (mbuf->nb_segs > 1) { + ipv4_hdr->hdr_checksum = 0; + ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr); + } + + // 发送到内核协议栈 + if (!rte_kni_tx_burst(get_gazelle_kni(), &mbuf, 1)) { + rte_pktmbuf_free(mbuf); + } +} + /* optimized eth_dev_poll() in lstack */ -int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag, uint32_t nic_read_number) +int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number) { uint32_t nr_pkts; @@ -131,15 +664,33 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag, } for (uint32_t i = 0; i < nr_pkts; i++) { + /* 1 current thread recv; 0 other thread recv; -1 kni recv; */ + int transfer_type = TRANSFER_CURRENT_THREAD; /* copy arp into other stack */ if (!use_ltran_flag) { struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *); if (unlikely(RTE_BE16(RTE_ETHER_TYPE_ARP) == ethh->ether_type)) { stack_broadcast_arp(stack->pkts[i], stack); + if (!use_ltran_flag) { + // copy arp into other process + transfer_arp_to_other_process(stack->pkts[i]); + transfer_type = TRANSFER_KERNEL; + } + }else { + if (!use_ltran_flag && stack->queue_id == 0) { + transfer_type = distribute_pakages(stack->pkts[i]); + } } } - eth_dev_recv(stack->pkts[i], stack); + if (likely(transfer_type == TRANSFER_CURRENT_THREAD)) { + eth_dev_recv(stack->pkts[i], stack); + + }else if (transfer_type == TRANSFER_KERNEL) { + kni_handle_tx(stack->pkts[i]); + }else { + /*transfer to other thread*/ + } } stack->stats.rx += nr_pkts; diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c index 3d1204e..1752853 100644 --- a/src/lstack/netif/lstack_vdev.c +++ b/src/lstack/netif/lstack_vdev.c @@ -102,7 +102,9 @@ static uint32_t vdev_rx_poll(struct protocol_stack *stack, struct rte_mbuf **pkt pkts[i]->packet_type = RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_TCP; } - return rte_gro_reassemble_burst(pkts, pkt_num, &gro_param); + pkt_num = rte_gro_reassemble_burst(pkts, pkt_num, &gro_param); + + return pkt_num; } static uint32_t ltran_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkts, uint32_t nr_pkts) @@ -145,14 +147,51 @@ static uint32_t vdev_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkt int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple) { - if (!use_ltran()) { - return 0; - } - if (qtuple == NULL) { return -1; } + if (!use_ltran()) { + if(type == REG_RING_TCP_LISTEN_CLOSE){ + if (get_global_cfg_params()->is_primary) { + delete_user_process_port(qtuple->src_port, PORT_LISTEN); + }else{ + transfer_add_or_delete_listen_port_to_process0(qtuple->src_port,get_global_cfg_params()->process_idx, 0); + } + } + + if (type == REG_RING_TCP_CONNECT_CLOSE) { + if (get_global_cfg_params()->is_primary) { + delete_user_process_port(qtuple->src_port, PORT_CONNECT); + delete_flow_director(qtuple->dst_ip, qtuple->src_port, qtuple->dst_port); + }else{ + transfer_delete_rule_info_to_process0(qtuple->dst_ip,qtuple->src_port,qtuple->dst_port); + } + } + + if (type == REG_RING_TCP_CONNECT) { + uint16_t queue_id = get_protocol_stack()->queue_id; + if (get_global_cfg_params()->is_primary){ + add_user_process_port(qtuple->src_port, get_global_cfg_params()->process_idx, PORT_CONNECT); + if (queue_id != 0) { + config_flow_director(queue_id, qtuple->dst_ip, qtuple->src_ip, qtuple->dst_port, qtuple->src_port); + } + }else { + transfer_create_rule_info_to_process0(queue_id, qtuple->src_ip, qtuple->dst_ip, qtuple->src_port, qtuple->dst_port); + } + } + + if (type == REG_RING_TCP_LISTEN){ + if (get_global_cfg_params()->is_primary) { + add_user_process_port(qtuple->src_port, get_global_cfg_params()->process_idx, PORT_LISTEN); + }else { + transfer_add_or_delete_listen_port_to_process0(qtuple->src_port,get_global_cfg_params()->process_idx, 1); + } + } + return 0; + } + + int32_t ret; uint32_t sent_pkts = 0; void *free_buf[VDEV_REG_QUEUE_SZ]; -- 2.23.0