gazelle/0204-supprot-multi-process.patch
jiangheng12 f133fa18a1 syn add pbuf lock when aggregate pbuf
(cherry picked from commit f101784b6c8a7494a49b219c9a46e9d86abc7e0b)
2023-03-20 10:07:43 +08:00

2009 lines
73 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 200ee63e092824edf12c7cc3172c61ba04a57d56 Mon Sep 17 00:00:00 2001
From: jinag12 <jiangheng14@huawei.com>
Date: Sat, 11 Mar 2023 15:59:51 +0000
Subject: [PATCH] supprot multi process
---
src/common/gazelle_opt.h | 4 +
src/lstack/api/lstack_epoll.c | 16 +-
src/lstack/api/lstack_wrap.c | 3 +-
src/lstack/core/lstack_cfg.c | 201 +++++++-
src/lstack/core/lstack_dpdk.c | 192 ++++---
src/lstack/core/lstack_init.c | 2 +
src/lstack/core/lstack_protocol_stack.c | 225 +++++++--
src/lstack/include/lstack_cfg.h | 17 +-
src/lstack/include/lstack_dpdk.h | 3 +-
src/lstack/include/lstack_ethdev.h | 23 +-
src/lstack/include/lstack_protocol_stack.h | 10 +
src/lstack/include/lstack_vdev.h | 7 +
src/lstack/lstack.conf | 4 +
src/lstack/netif/lstack_ethdev.c | 555 ++++++++++++++++++++-
src/lstack/netif/lstack_vdev.c | 49 +-
15 files changed, 1143 insertions(+), 168 deletions(-)
diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h
index 745fdd8..e278107 100644
--- a/src/common/gazelle_opt.h
+++ b/src/common/gazelle_opt.h
@@ -90,4 +90,8 @@
#define SEND_TIME_WAIT_NS 20000
#define SECOND_NSECOND 1000000000
+#define LSTACK_SEND_THREAD_NAME "lstack_send"
+#define LSTACK_RECV_THREAD_NAME "lstack_recv"
+#define LSTACK_THREAD_NAME "gazellelstack"
+
#endif /* _GAZELLE_OPT_H_ */
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index da29590..4a10b09 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -74,8 +74,8 @@ void add_sock_event(struct lwip_sock *sock, uint32_t event)
}
struct protocol_stack *stack = sock->stack;
- if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) {
- list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]);
+ if (list_is_null(&wakeup->wakeup_list[stack->stack_idx])) {
+ list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->stack_idx]);
}
}
@@ -95,7 +95,7 @@ void wakeup_stack_epoll(struct protocol_stack *stack, bool wakeup_thread_enable)
temp = nod;
}
- struct wakeup_poll *wakeup = container_of((node - stack->queue_id), struct wakeup_poll, wakeup_list);
+ struct wakeup_poll *wakeup = container_of((node - stack->stack_idx), struct wakeup_poll, wakeup_list);
if (!wakeup_thread_enable) {
if (__atomic_load_n(&wakeup->in_wait, __ATOMIC_ACQUIRE)) {
@@ -109,7 +109,7 @@ void wakeup_stack_epoll(struct protocol_stack *stack, bool wakeup_thread_enable)
stack->stats.wakeup_events++;
}
- list_del_node_null(&wakeup->wakeup_list[stack->queue_id]);
+ list_del_node_null(&wakeup->wakeup_list[stack->stack_idx]);
}
}
@@ -291,7 +291,7 @@ static uint16_t find_max_cnt_stack(int32_t *stack_count, uint16_t stack_num, str
/* all stack same, don't change */
if (all_same_cnt && last_stack) {
- return last_stack->queue_id;
+ return last_stack->stack_idx;
}
/* first bind and all stack same. choice tick as queue_id, avoid all bind to statck_0. */
@@ -343,7 +343,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
switch (op) {
case EPOLL_CTL_ADD:
sock->wakeup = wakeup;
- wakeup->stack_fd_cnt[sock->stack->queue_id]++;
+ wakeup->stack_fd_cnt[sock->stack->stack_idx]++;
/* fall through */
case EPOLL_CTL_MOD:
sock->epoll_events = event->events | EPOLLERR | EPOLLHUP;
@@ -352,7 +352,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
break;
case EPOLL_CTL_DEL:
sock->epoll_events = 0;
- wakeup->stack_fd_cnt[sock->stack->queue_id]--;
+ wakeup->stack_fd_cnt[sock->stack->stack_idx]--;
pthread_spin_lock(&wakeup->event_list_lock);
list_del_node_null(&sock->event_list);
pthread_spin_unlock(&wakeup->event_list_lock);
@@ -652,7 +652,7 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
while (sock && sock->conn) {
sock->epoll_events = fds[i].events | POLLERR;
sock->wakeup = wakeup;
- stack_count[sock->stack->queue_id]++;
+ stack_count[sock->stack->stack_idx]++;
sock = sock->listen_next;
}
}
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index f438529..561c6e4 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -238,8 +238,7 @@ static inline int32_t do_listen(int32_t s, int32_t backlog)
return posix_api->listen_fn(s, backlog);
}
- int32_t ret = get_global_cfg_params()->listen_shadow ? stack_broadcast_listen(s, backlog) :
- stack_single_listen(s, backlog);
+ int32_t ret = stack_broadcast_listen(s, backlog);
if (ret != 0) {
return ret;
}
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 9195f34..72a3292 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -65,6 +65,10 @@ static int32_t parse_tcp_conn_count(void);
static int32_t parse_mbuf_count_per_conn(void);
static int32_t parse_send_ring_size(void);
static int32_t parse_expand_send_ring(void);
+static int32_t parse_num_process(void);
+static int32_t parse_process_numa(void);
+static int32_t parse_process_index(void);
+static int32_t parse_seperate_sendrecv_args(void);
static inline int32_t parse_int(void *arg, char * arg_string, int32_t default_val,
int32_t min_val, int32_t max_val)
@@ -97,6 +101,7 @@ static struct config_vector_t g_config_tbl[] = {
{ "use_ltran", parse_use_ltran },
{ "devices", parse_devices },
{ "dpdk_args", parse_dpdk_args },
+ { "seperate_send_recv", parse_seperate_sendrecv_args },
{ "num_cpus", parse_stack_cpu_number },
{ "num_wakeup", parse_wakeup_cpu_number },
{ "low_power_mode", parse_low_power_mode },
@@ -112,6 +117,9 @@ static struct config_vector_t g_config_tbl[] = {
{ "nic_read_number", parse_nic_read_number },
{ "send_ring_size", parse_send_ring_size },
{ "expand_send_ring", parse_expand_send_ring },
+ { "num_process", parse_num_process },
+ { "process_numa", parse_process_numa },
+ { "process_idx", parse_process_index },
{ NULL, NULL }
};
@@ -277,35 +285,99 @@ static int32_t parse_stack_cpu_number(void)
const config_setting_t *num_cpus = NULL;
const char *args = NULL;
- num_cpus = config_lookup(&g_config, "num_cpus");
- if (num_cpus == NULL) {
- return -EINVAL;
- }
+ if (!g_config_params.seperate_send_recv) {
+ num_cpus = config_lookup(&g_config, "num_cpus");
+ if (num_cpus == NULL) {
+ return -EINVAL;
+ }
- args = config_setting_get_string(num_cpus);
- if (args == NULL) {
- return -EINVAL;
- }
+ args = config_setting_get_string(num_cpus);
+ if (args == NULL) {
+ return -EINVAL;
+ }
- if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
- int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
- if (idx < 0) {
- g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
- g_config_params.dpdk_argc++;
+ if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
+ int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
+ if (idx < 0) {
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
+ g_config_params.dpdk_argc++;
- g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
- g_config_params.dpdk_argc++;
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
+ g_config_params.dpdk_argc++;
+ }
}
- }
- char *tmp_arg = strdup(args);
- int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.cpus, CFG_MAX_CPUS);
- free(tmp_arg);
- if (cnt <= 0 || cnt > CFG_MAX_CPUS) {
- return -EINVAL;
- }
+ char *tmp_arg = strdup(args);
+ int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.cpus, CFG_MAX_CPUS);
+ free(tmp_arg);
+ if (cnt <= 0 || cnt > CFG_MAX_CPUS) {
+ return -EINVAL;
+ }
+
+ g_config_params.num_cpu = cnt;
+ g_config_params.num_queue = (uint16_t)cnt;
+ g_config_params.tot_queue_num = g_config_params.num_queue;
+ } else {
+ // send_num_cpus
+ num_cpus = config_lookup(&g_config, "send_num_cpus");
+ if (num_cpus == NULL) {
+ return -EINVAL;
+ }
+
+ args = config_setting_get_string(num_cpus);
+ if (args == NULL) {
+ return -EINVAL;
+ }
+
+ if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
+ int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
+ if (idx < 0) {
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
+ g_config_params.dpdk_argc++;
+
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
+ g_config_params.dpdk_argc++;
+ }
+ }
+
+ char *tmp_arg_send = strdup(args);
+ int32_t cnt = separate_str_to_array(tmp_arg_send, g_config_params.send_cpus, CFG_MAX_CPUS);
+ free(tmp_arg_send);
+
+ // recv_num_cpus
+ num_cpus = config_lookup(&g_config, "recv_num_cpus");
+ if (num_cpus == NULL) {
+ return -EINVAL;
+ }
+
+ args = config_setting_get_string(num_cpus);
+ if (args == NULL) {
+ return -EINVAL;
+ }
- g_config_params.num_cpu = cnt;
+ if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
+ int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
+ if (idx < 0) {
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
+ g_config_params.dpdk_argc++;
+
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
+ g_config_params.dpdk_argc++;
+ }
+ }
+
+ char *tmp_arg_recv = strdup(args);
+ cnt = separate_str_to_array(tmp_arg_recv, g_config_params.recv_cpus, CFG_MAX_CPUS);
+ free(tmp_arg_recv);
+
+ if (cnt <= 0 || cnt > CFG_MAX_CPUS / 2) {
+ return -EINVAL;
+ }
+
+ g_config_params.num_cpu = cnt;
+ g_config_params.num_queue = (uint16_t)cnt * 2;
+ g_config_params.tot_queue_num = g_config_params.num_queue;
+ }
return 0;
}
@@ -369,7 +441,12 @@ int32_t init_stack_numa_cpuset(struct protocol_stack *stack)
cpu_set_t stack_cpuset;
CPU_ZERO(&stack_cpuset);
for (int32_t idx = 0; idx < cfg->num_cpu; ++idx) {
- CPU_SET(cfg->cpus[idx], &stack_cpuset);
+ if (!cfg->seperate_send_recv) {
+ CPU_SET(cfg->cpus[idx], &stack_cpuset);
+ }else {
+ CPU_SET(cfg->send_cpus[idx], &stack_cpuset);
+ CPU_SET(cfg->recv_cpus[idx], &stack_cpuset);
+ }
}
for (int32_t idx = 0; idx < cfg->num_wakeup; ++idx) {
CPU_SET(cfg->wakeup[idx], &stack_cpuset);
@@ -643,6 +720,13 @@ static int32_t parse_dpdk_args(void)
goto free_dpdk_args;
}
g_config_params.dpdk_argv[start_index + i] = p;
+
+ const char *primary = "primary";
+ if(strcmp(p, primary) == 0){
+ struct cfg_params *global_params = get_global_cfg_params();
+ global_params->is_primary = 1;
+ }
+
(void)fprintf(stderr, "%s ", g_config_params.dpdk_argv[start_index + i]);
}
(void)fprintf(stderr, "\n");
@@ -877,3 +961,72 @@ static int32_t parse_unix_prefix(void)
return 0;
}
+static int32_t parse_seperate_sendrecv_args(void)
+{
+ return parse_int(&g_config_params.seperate_send_recv, "seperate_send_recv", 0, 0, 1);
+}
+
+static int32_t parse_num_process(void)
+{
+ if (g_config_params.use_ltran) {
+ return 0;
+ }
+
+ const config_setting_t *num_process = NULL;
+
+ num_process = config_lookup(&g_config, "num_process");
+ if (num_process == NULL) {
+ g_config_params.num_process = 1;
+ }else {
+ g_config_params.num_process = (uint8_t)config_setting_get_int(num_process);
+ }
+
+ g_config_params.tot_queue_num = g_config_params.num_queue * g_config_params.num_process;
+
+ return 0;
+}
+
+static int32_t parse_process_numa(void)
+{
+ const config_setting_t *cfg_args = NULL;
+ const char *args = NULL;
+
+ int ret;
+ cfg_args = config_lookup(&g_config, "process_numa");
+ if (cfg_args == NULL)
+ return 0;
+
+ args = config_setting_get_string(cfg_args);
+ if (cfg_args == NULL) {
+ return 0;
+ }
+
+ ret = separate_str_to_array((char *)args, g_config_params.process_numa, PROTOCOL_STACK_MAX);
+ if (ret <= 0) {
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int parse_process_index(void)
+{
+ if (g_config_params.use_ltran) {
+ return 0;
+ }
+
+ const config_setting_t *process_idx = NULL;
+ process_idx = config_lookup(&g_config, "process_idx");
+ if (process_idx == NULL) {
+ if (g_config_params.num_process == 1) {
+ g_config_params.process_idx = 0;
+ }else {
+ return -EINVAL;
+ }
+ } else {
+ g_config_params.process_idx = (uint8_t)config_setting_get_int(process_idx);
+ }
+
+ return 0;
+}
+
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index f60963f..1beb66b 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -19,6 +19,7 @@
#include <sys/socket.h>
#include <net/if.h>
#include <net/if_arp.h>
+#include <numa.h>
#include <rte_eal.h>
#include <rte_lcore.h>
@@ -131,7 +132,7 @@ int32_t dpdk_eal_init(void)
return ret;
}
-static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
+struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
uint32_t mbuf_cache_size, uint16_t queue_id)
{
int32_t ret;
@@ -149,7 +150,27 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
}
+
+ return pool;
+}
+
+static struct rte_mempool* get_pktmbuf_mempool(const char *name, uint16_t queue_id){
+ int32_t ret;
+ char pool_name[PATH_MAX];
+ struct rte_mempool *pool;
+
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
+ if (ret < 0) {
+ return NULL;
+ }
+ pool = rte_mempool_lookup(pool_name);
+ if (pool == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "look up %s pool rte_err=%d\n", pool_name, rte_errno);
+ }
+
+ // rte_mempool_dump(stdout, pool) ;
return pool;
+
}
static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_id)
@@ -178,10 +199,7 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num)
return -1;
}
- stack->rxtx_pktmbuf_pool = create_pktmbuf_mempool("rxtx_mbuf",
- get_global_cfg_params()->mbuf_count_per_conn * get_global_cfg_params()->tcp_conn_count / stack_num,
- RXTX_CACHE_SZ,
- stack->queue_id);
+ stack->rxtx_pktmbuf_pool = get_pktmbuf_mempool("rxtx_mbuf", stack->queue_id);
if (stack->rxtx_pktmbuf_pool == NULL) {
return -1;
}
@@ -201,7 +219,7 @@ struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, i
char ring_name[RTE_RING_NAMESIZE] = {0};
struct rte_ring *ring;
- int32_t ret = snprintf_s(ring_name, sizeof(ring_name), RTE_RING_NAMESIZE - 1, "%s_%d", name, queue_id);
+ int32_t ret = snprintf_s(ring_name, sizeof(ring_name), RTE_RING_NAMESIZE - 1, "%s_%d_%d", name, get_global_cfg_params()->process_idx, queue_id);
if (ret < 0) {
return NULL;
}
@@ -286,6 +304,12 @@ void lstack_log_level_init(void)
}
}
+// get port id
+inline uint16_t get_port_id(){
+ uint16_t port_id = get_global_cfg_params()->port_id;
+ return port_id;
+}
+
static int32_t ethdev_port_id(uint8_t *mac)
{
int32_t port_id;
@@ -412,89 +436,111 @@ static void rss_setup(const int port_id, const uint16_t nb_queues)
int32_t dpdk_ethdev_init(void)
{
uint16_t nb_queues = get_global_cfg_params()->num_cpu;
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
-
- int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr);
- if (port_id < 0) {
- return port_id;
+ if (get_global_cfg_params()->seperate_send_recv) {
+ nb_queues = get_global_cfg_params()->num_cpu * 2;
}
- struct rte_eth_dev_info dev_info;
- int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret);
- return ret;
+ if (!use_ltran()) {
+ nb_queues = get_global_cfg_params()->tot_queue_num;
}
- int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues);
- if (max_queues < nb_queues) {
- LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues);
- return -EINVAL;
- }
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues);
- if (eth_params == NULL) {
- return -ENOMEM;
- }
- eth_params_checksum(&eth_params->conf, &dev_info);
- int32_t rss_enable = eth_params_rss(&eth_params->conf, &dev_info);
- stack_group->eth_params = eth_params;
- stack_group->port_id = eth_params->port_id;
- stack_group->rx_offload = eth_params->conf.rxmode.offloads;
- stack_group->tx_offload = eth_params->conf.txmode.offloads;
-
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- struct protocol_stack *stack = stack_group->stacks[i];
- if (likely(stack)) {
- stack->port_id = stack_group->port_id;
- } else {
- LSTACK_LOG(ERR, LSTACK, "empty stack at stack_num %d\n", i);
- stack_group->eth_params = NULL;
- free(eth_params);
- return -EINVAL;
- }
- }
-
- ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &eth_params->conf);
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "cannot config eth dev at port %d: %s\n", port_id, rte_strerror(-ret));
- stack_group->eth_params = NULL;
- free(eth_params);
- return ret;
- }
+ int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr);
+ if (port_id < 0) {
+ return port_id;
+ }
- ret = dpdk_ethdev_start();
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
- stack_group->eth_params = NULL;
- free(eth_params);
- return ret;
- }
+ struct rte_eth_dev_info dev_info;
+ int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret);
+ return ret;
+ }
+
+ int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues);
+ if (max_queues < nb_queues) {
+ LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues);
+ return -EINVAL;
+ }
- if (rss_enable) {
- rss_setup(port_id, nb_queues);
- stack_group->reta_mask = dev_info.reta_size - 1;
+ struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues);
+ if (eth_params == NULL) {
+ return -ENOMEM;
+ }
+ eth_params_checksum(&eth_params->conf, &dev_info);
+ int32_t rss_enable = 0;
+ if (use_ltran()) {
+ rss_enable = eth_params_rss(&eth_params->conf, &dev_info);
+ }
+ stack_group->eth_params = eth_params;
+ stack_group->port_id = eth_params->port_id;
+ stack_group->rx_offload = eth_params->conf.rxmode.offloads;
+ stack_group->tx_offload = eth_params->conf.txmode.offloads;
+
+ if (get_global_cfg_params()->is_primary) {
+ for (uint32_t i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack *stack = stack_group->stacks[i];
+ if (likely(stack)) {
+ stack->port_id = stack_group->port_id;
+ } else {
+ LSTACK_LOG(ERR, LSTACK, "empty stack at stack_num %d\n", i);
+ stack_group->eth_params = NULL;
+ free(eth_params);
+ return -EINVAL;
+ }
+ }
+
+ ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &eth_params->conf);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "cannot config eth dev at port %d: %s\n", port_id, rte_strerror(-ret));
+ stack_group->eth_params = NULL;
+ free(eth_params);
+ return ret;
+ }
+
+ ret = dpdk_ethdev_start();
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
+ stack_group->eth_params = NULL;
+ free(eth_params);
+ return ret;
+ }
+
+ if (rss_enable && use_ltran()) {
+ rss_setup(port_id, nb_queues);
+ stack_group->reta_mask = dev_info.reta_size - 1;
+ }
+ stack_group->nb_queues = nb_queues;
}
- stack_group->nb_queues = nb_queues;
return 0;
}
-static int32_t dpdk_ethdev_setup(const struct eth_params *eth_params, const struct protocol_stack *stack)
+static int32_t dpdk_ethdev_setup(const struct eth_params *eth_params, uint16_t idx)
{
int32_t ret;
- ret = rte_eth_rx_queue_setup(eth_params->port_id, stack->queue_id, eth_params->nb_rx_desc, stack->socket_id,
- &eth_params->rx_conf, stack->rxtx_pktmbuf_pool);
+ struct rte_mempool *rxtx_pktmbuf_pool = get_protocol_stack_group()->total_rxtx_pktmbuf_pool[idx];
+
+ uint16_t socket_id = 0;
+ struct cfg_params * cfg = get_global_cfg_params();
+ if (!cfg->use_ltran && cfg->num_process == 1) {
+ socket_id = numa_node_of_cpu(cfg->cpus[idx]);
+ }else {
+ socket_id = cfg->process_numa[idx];
+ }
+ ret = rte_eth_rx_queue_setup(eth_params->port_id, idx, eth_params->nb_rx_desc, socket_id,
+ &eth_params->rx_conf, rxtx_pktmbuf_pool);
if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "cannot setup rx_queue %hu: %s\n", stack->queue_id, rte_strerror(-ret));
+ LSTACK_LOG(ERR, LSTACK, "cannot setup rx_queue %hu: %s\n", idx, rte_strerror(-ret));
return -1;
}
- ret = rte_eth_tx_queue_setup(eth_params->port_id, stack->queue_id, eth_params->nb_tx_desc, stack->socket_id,
+ ret = rte_eth_tx_queue_setup(eth_params->port_id, idx, eth_params->nb_tx_desc, socket_id,
&eth_params->tx_conf);
if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "cannot setup tx_queue %hu: %s\n", stack->queue_id, rte_strerror(-ret));
+ LSTACK_LOG(ERR, LSTACK, "cannot setup tx_queue %hu: %s\n", idx, rte_strerror(-ret));
return -1;
}
@@ -505,12 +551,9 @@ int32_t dpdk_ethdev_start(void)
{
int32_t ret;
const struct protocol_stack_group *stack_group = get_protocol_stack_group();
- const struct protocol_stack *stack = NULL;
-
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
- stack = stack_group->stacks[i];
- ret = dpdk_ethdev_setup(stack_group->eth_params, stack);
+ for (int32_t i = 0; i < get_global_cfg_params()->tot_queue_num; i++) {
+ ret = dpdk_ethdev_setup(stack_group->eth_params, i);
if (ret < 0) {
return ret;
}
@@ -529,6 +572,7 @@ int32_t dpdk_init_lstack_kni(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
stack_group->kni_pktmbuf_pool = create_pktmbuf_mempool("kni_mbuf", KNI_NB_MBUF, 0, 0);
if (stack_group->kni_pktmbuf_pool == NULL) {
return -1;
@@ -568,7 +612,7 @@ int32_t init_dpdk_ethdev(void)
return -1;
}
- if (get_global_cfg_params()->kni_switch) {
+ if (get_global_cfg_params()->kni_switch && get_global_cfg_params()->is_primary) {
ret = dpdk_init_lstack_kni();
if (ret < 0) {
return -1;
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
index 34b2c0d..e8fa0dc 100644
--- a/src/lstack/core/lstack_init.c
+++ b/src/lstack/core/lstack_init.c
@@ -356,6 +356,8 @@ __attribute__((constructor)) void gazelle_network_init(void)
}
}
+ // @todo, check process 2 dumped, resorce need to release.
+
gazelle_signal_init();
/*
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 300d7af..48eff1d 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -13,13 +13,14 @@
#include <pthread.h>
#include <stdatomic.h>
+#include <rte_kni.h>
+
#include <lwip/sockets.h>
#include <lwip/tcpip.h>
#include <lwip/tcp.h>
#include <lwip/memp_def.h>
#include <lwipsock.h>
#include <lwip/posix_api.h>
-#include <rte_kni.h>
#include <securec.h>
#include <numa.h>
@@ -29,6 +30,7 @@
#include "lstack_log.h"
#include "lstack_dpdk.h"
#include "lstack_ethdev.h"
+#include "lstack_vdev.h"
#include "lstack_lwip.h"
#include "lstack_protocol_stack.h"
#include "lstack_cfg.h"
@@ -79,6 +81,28 @@ struct protocol_stack_group *get_protocol_stack_group(void)
return &g_stack_group;
}
+int get_min_conn_stack(struct protocol_stack_group *stack_group){
+ int min_conn_stk_idx = 0;
+ int min_conn_num = GAZELLE_MAX_CLIENTS;
+ for (int i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack* stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (!stack->is_send_thread && stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ }else {
+ if (stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ }
+
+ }
+ return min_conn_stk_idx;
+
+}
+
struct protocol_stack *get_protocol_stack(void)
{
return g_stack_p;
@@ -105,22 +129,23 @@ struct protocol_stack *get_bind_protocol_stack(void)
struct protocol_stack_group *stack_group = get_protocol_stack_group();
uint16_t index = 0;
+ int min_conn_num = GAZELLE_MAX_CLIENTS;
/* close listen shadow, per app communication thread select only one stack */
- if (get_global_cfg_params()->listen_shadow == false) {
- static _Atomic uint16_t stack_index = 0;
- index = atomic_fetch_add(&stack_index, 1);
- if (index >= stack_group->stack_num) {
- LSTACK_LOG(ERR, LSTACK, "thread =%hu larger than stack num = %hu\n", index, stack_group->stack_num);
- return NULL;
- }
- /* use listen shadow, app communication thread maybe more than stack num, select the least load stack */
- } else {
- for (uint16_t i = 1; i < stack_group->stack_num; i++) {
- if (stack_group->stacks[i]->conn_num < stack_group->stacks[index]->conn_num) {
+ for (uint16_t i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack* stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (stack->is_send_thread && stack->conn_num < min_conn_num) {
index = i;
+ min_conn_num = stack->conn_num;
+ }
+ }else {
+ if (stack->conn_num < min_conn_num) {
+ index = i;
+ min_conn_num = stack->conn_num;
}
}
+
}
bind_stack = stack_group->stacks[index];
@@ -180,27 +205,35 @@ void low_power_idling(struct protocol_stack *stack)
}
}
-static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_func func)
+static int32_t create_thread(void *arg, char *thread_name, stack_thread_func func)
{
/* thread may run slow, if arg is temp var maybe have relese */
- static uint16_t queue[PROTOCOL_STACK_MAX];
char name[PATH_MAX];
pthread_t tid;
int32_t ret;
+ struct thread_params *t_params = (struct thread_params*) arg;
- if (queue_id >= PROTOCOL_STACK_MAX) {
- LSTACK_LOG(ERR, LSTACK, "queue_id is %hu exceed max=%d\n", queue_id, PROTOCOL_STACK_MAX);
+ if (t_params->queue_id >= PROTOCOL_STACK_MAX) {
+ LSTACK_LOG(ERR, LSTACK, "queue_id is %hu exceed max=%d\n", t_params->queue_id, PROTOCOL_STACK_MAX);
return -1;
}
- queue[queue_id] = queue_id;
- ret = sprintf_s(name, sizeof(name), "%s%02hu", thread_name, queue[queue_id]);
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "set name failed\n");
- return -1;
+ if (get_global_cfg_params()->seperate_send_recv){
+ ret = sprintf_s(name, sizeof(name), "%s", thread_name);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "set name failed\n");
+ return -1;
+ }
+
+ }else {
+ ret = sprintf_s(name, sizeof(name), "%s%02hu", thread_name, t_params->queue_id);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "set name failed\n");
+ return -1;
+ }
}
- ret = pthread_create(&tid, NULL, func, &queue[queue_id]);
+ ret = pthread_create(&tid, NULL, func, arg);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "pthread_create ret=%d\n", ret);
return -1;
@@ -221,7 +254,7 @@ static void* gazelle_wakeup_thread(void *arg)
struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
struct cfg_params *cfg = get_global_cfg_params();
- int32_t lcore_id = cfg->wakeup[stack->queue_id];
+ int32_t lcore_id = cfg->wakeup[stack->stack_idx];
thread_affinity_init(lcore_id);
struct timespec st = {
@@ -252,12 +285,13 @@ static void* gazelle_wakeup_thread(void *arg)
static void* gazelle_kernelevent_thread(void *arg)
{
- uint16_t queue_id = *(uint16_t *)arg;
- struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
+ struct thread_params *t_params = (struct thread_params*) arg;
+ uint16_t idx = t_params->idx;
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[idx];
bind_to_stack_numa(stack);
- LSTACK_LOG(INFO, LSTACK, "kernelevent_%02hu start\n", queue_id);
+ LSTACK_LOG(INFO, LSTACK, "kernelevent_%02hu start\n", idx);
for (;;) {
stack->kernel_event_num = posix_api->epoll_wait_fn(stack->epollfd, stack->kernel_events, KERNEL_EPOLL_MAX, -1);
@@ -269,13 +303,14 @@ static void* gazelle_kernelevent_thread(void *arg)
return NULL;
}
-static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
+static int32_t init_stack_value(struct protocol_stack *stack, void *arg)
{
+ struct thread_params *t_params = (struct thread_params*) arg;
struct protocol_stack_group *stack_group = get_protocol_stack_group();
stack->tid = rte_gettid();
- stack->queue_id = queue_id;
- stack->cpu_id = get_global_cfg_params()->cpus[queue_id];
+ stack->queue_id = t_params->queue_id;
+ stack->stack_idx = t_params->idx;
stack->lwip_stats = &lwip_stats;
init_list_node(&stack->recv_list);
@@ -284,14 +319,27 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
sys_calibrate_tsc();
stack_stat_init();
- stack_group->stacks[queue_id] = stack;
- set_stack_idx(queue_id);
+ stack_group->stacks[t_params->idx] = stack;
+ set_stack_idx(t_params->idx);
stack->epollfd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN);
if (stack->epollfd < 0) {
return -1;
}
+ int idx = t_params->idx;
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (idx % 2 == 0) {
+ stack->cpu_id = get_global_cfg_params()->recv_cpus[idx/2];
+ stack->is_send_thread = 0;
+ }else {
+ stack->cpu_id = get_global_cfg_params()->send_cpus[idx/2];
+ stack->is_send_thread = 1;
+ }
+ }else {
+ stack->cpu_id = get_global_cfg_params()->cpus[idx];
+ }
+
stack->socket_id = numa_node_of_cpu(stack->cpu_id);
if (stack->socket_id < 0) {
LSTACK_LOG(ERR, LSTACK, "numa_node_of_cpu failed\n");
@@ -317,16 +365,17 @@ void wait_sem_value(sem_t *sem, int32_t wait_value)
} while (sem_val < wait_value);
}
-static int32_t create_affiliate_thread(uint16_t queue_id, bool wakeup_enable)
+static int32_t create_affiliate_thread(void *arg, bool wakeup_enable)
{
+
if (wakeup_enable) {
- if (create_thread(queue_id, "gazelleweakup", gazelle_wakeup_thread) != 0) {
+ if (create_thread(arg, "gazelleweakup", gazelle_wakeup_thread) != 0) {
LSTACK_LOG(ERR, LSTACK, "gazelleweakup errno=%d\n", errno);
return -1;
}
}
- if (create_thread(queue_id, "gazellekernel", gazelle_kernelevent_thread) != 0) {
+ if (create_thread(arg, "gazellekernel", gazelle_kernelevent_thread) != 0) {
LSTACK_LOG(ERR, LSTACK, "gazellekernel errno=%d\n", errno);
return -1;
}
@@ -334,10 +383,9 @@ static int32_t create_affiliate_thread(uint16_t queue_id, bool wakeup_enable)
return 0;
}
-static struct protocol_stack *stack_thread_init(uint16_t queue_id)
+static struct protocol_stack *stack_thread_init(void *arg)
{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
-
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
struct protocol_stack *stack = calloc(1, sizeof(*stack));
if (stack == NULL) {
LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n");
@@ -345,14 +393,14 @@ static struct protocol_stack *stack_thread_init(uint16_t queue_id)
return NULL;
}
- if (init_stack_value(stack, queue_id) != 0) {
+ if (init_stack_value(stack, arg) != 0) {
goto END;
}
if (init_stack_numa_cpuset(stack) < 0) {
goto END;
}
- if (create_affiliate_thread(queue_id, stack_group->wakeup_enable) < 0) {
+ if (create_affiliate_thread(arg, stack_group->wakeup_enable) < 0) {
goto END;
}
@@ -402,19 +450,22 @@ static void wakeup_kernel_event(struct protocol_stack *stack)
}
__atomic_store_n(&wakeup->have_kernel_event, true, __ATOMIC_RELEASE);
- if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) {
- list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]);
+ if (list_is_null(&wakeup->wakeup_list[stack->stack_idx])) {
+ list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->stack_idx]);
}
}
stack->kernel_event_num = 0;
}
+
static void* gazelle_stack_thread(void *arg)
{
- uint16_t queue_id = *(uint16_t *)arg;
+ struct thread_params *t_params = (struct thread_params*) arg;
+
+ uint16_t queue_id = t_params->queue_id;
struct cfg_params *cfg = get_global_cfg_params();
- bool use_ltran_flag = cfg->use_ltran;;
+ uint8_t use_ltran_flag = cfg->use_ltran;;
bool kni_switch = cfg->kni_switch;
uint32_t read_connect_number = cfg->read_connect_number;
uint32_t rpc_number = cfg->rpc_number;
@@ -424,7 +475,8 @@ static void* gazelle_stack_thread(void *arg)
struct protocol_stack_group *stack_group = get_protocol_stack_group();
bool wakeup_thread_enable = stack_group->wakeup_enable;
- struct protocol_stack *stack = stack_thread_init(queue_id);
+ struct protocol_stack *stack = stack_thread_init(arg);
+
if (stack == NULL) {
/* exit in main thread, avoid create mempool and exit at the same time */
set_init_fail();
@@ -432,8 +484,12 @@ static void* gazelle_stack_thread(void *arg)
LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%hu\n", queue_id);
return NULL;
}
+ if (!use_ltran() && queue_id == 0) {
+ init_listen_and_user_ports();
+ }
sem_post(&stack_group->all_init);
+
LSTACK_LOG(INFO, LSTACK, "stack_%02hu init success\n", queue_id);
for (;;) {
@@ -452,6 +508,7 @@ static void* gazelle_stack_thread(void *arg)
* so processing KNI requests only in the thread with queue_id No.0 is sufficient. */
if (kni_switch && !queue_id && !(wakeup_tick & 0xfff)) {
rte_kni_handle_request(get_gazelle_kni());
+ kni_handle_rx(get_port_id());
}
wakeup_tick++;
@@ -466,6 +523,11 @@ static void* gazelle_stack_thread(void *arg)
return NULL;
}
+static void libnet_listen_thread(void *arg){
+ struct cfg_params * cfg_param = get_global_cfg_params();
+ recv_pkts_from_other_process(cfg_param->process_idx, arg);
+}
+
static int32_t init_protocol_sem(void)
{
int32_t ret;
@@ -498,8 +560,14 @@ int32_t init_protocol_stack(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
int32_t ret;
+ char name[PATH_MAX];
+
+ if (!get_global_cfg_params()->seperate_send_recv) {
+ stack_group->stack_num = get_global_cfg_params()->num_cpu;
+ }else {
+ stack_group->stack_num = get_global_cfg_params()->num_cpu * 2;
+ }
- stack_group->stack_num = get_global_cfg_params()->num_cpu;
stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false;
init_list_node(&stack_group->poll_list);
pthread_spin_init(&stack_group->poll_list_lock, PTHREAD_PROCESS_PRIVATE);
@@ -508,9 +576,43 @@ int32_t init_protocol_stack(void)
if (init_protocol_sem() != 0) {
return -1;
}
+ int queue_num = get_global_cfg_params()->num_queue;
+ struct thread_params *t_params[queue_num];
+ int process_index = get_global_cfg_params()->process_idx;
+
+ if (get_global_cfg_params()->is_primary) {
+ for (uint16_t idx = 0; idx < get_global_cfg_params()->tot_queue_num; idx++) {
+ struct rte_mempool* rxtx_mbuf = create_pktmbuf_mempool("rxtx_mbuf",
+ get_global_cfg_params()->mbuf_count_per_conn * get_global_cfg_params()->tcp_conn_count / stack_group->stack_num, RXTX_CACHE_SZ, idx);
+ get_protocol_stack_group()->total_rxtx_pktmbuf_pool[idx] = rxtx_mbuf;
+ }
+ }
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- ret = create_thread(i, "gazellestack", gazelle_stack_thread);
+ for (uint32_t i = 0; i < queue_num; i++) {
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (i % 2 == 0) {
+ ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_RECV_THREAD_NAME, process_index, i/2);
+ if (ret < 0) {
+ return -1;
+ }
+ }else {
+ ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_SEND_THREAD_NAME, process_index, i/2);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+ }else {
+ ret = sprintf_s(name, sizeof(name), "%s", LSTACK_THREAD_NAME);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+
+ t_params[i] = malloc(sizeof(struct thread_params));
+ t_params[i]->idx = i;
+ t_params[i]->queue_id = process_index * queue_num + i;
+
+ ret = create_thread((void *)t_params[i], name, gazelle_stack_thread);
if (ret != 0) {
return ret;
}
@@ -518,6 +620,20 @@ int32_t init_protocol_stack(void)
wait_sem_value(&stack_group->thread_phase1, stack_group->stack_num);
+ for(int idx = 0; idx < queue_num; idx++){
+ free(t_params[idx]);
+ }
+
+ if (!use_ltran()) {
+ ret = sem_init(&stack_group->sem_listen_thread, 0, 0);
+ ret = sprintf_s(name, sizeof(name), "%s", "listen_thread");
+ struct sys_thread *thread = sys_thread_new(name, libnet_listen_thread, (void*)(&stack_group->sem_listen_thread), 0, 0);
+ free(thread);
+ sem_wait(&stack_group->sem_listen_thread);
+
+ create_flow_rule_map();
+ }
+
if (get_init_fail()) {
return -1;
}
@@ -684,7 +800,7 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack
for (int32_t i = 0; i < stack_group->stack_num; i++) {
stack = stack_group->stacks[i];
- if (cur_stack == stack) {
+ if (cur_stack == stack && use_ltran()) {
continue;
}
@@ -718,7 +834,7 @@ void stack_clean_epoll(struct rpc_msg *msg)
struct protocol_stack *stack = get_protocol_stack();
struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p;
- list_del_node_null(&wakeup->wakeup_list[stack->queue_id]);
+ list_del_node_null(&wakeup->wakeup_list[stack->stack_idx]);
}
/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
@@ -769,8 +885,13 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
}
struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ int min_conn_stk_idx = get_min_conn_stack(stack_group);
+
for (int32_t i = 0; i < stack_group->stack_num; ++i) {
stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) {
+ continue;
+ }
if (stack != cur_stack) {
clone_fd = rpc_call_shadow_fd(stack, fd, &addr, sizeof(addr));
if (clone_fd < 0) {
@@ -781,6 +902,12 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
clone_fd = fd;
}
+ if (min_conn_stk_idx == i) {
+ get_socket_by_fd(clone_fd)->conn->is_master_fd = 1;
+ }else {
+ get_socket_by_fd(clone_fd)->conn->is_master_fd = 0;
+ }
+
ret = rpc_call_listen(clone_fd, backlog);
if (ret < 0) {
stack_broadcast_close(fd);
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 2705fee..942c0b7 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -65,6 +65,8 @@ struct cfg_params {
uint8_t mac_addr[ETHER_ADDR_LEN];
uint16_t num_cpu;
uint32_t cpus[CFG_MAX_CPUS];
+ uint32_t send_cpus[CFG_MAX_CPUS];
+ uint32_t recv_cpus[CFG_MAX_CPUS];
uint16_t num_wakeup;
uint32_t wakeup[CFG_MAX_CPUS];
uint8_t num_ports;
@@ -79,11 +81,22 @@ struct cfg_params {
uint32_t read_connect_number;
uint32_t rpc_number;
uint32_t nic_read_number;
- bool use_ltran; // ture:lstack read from nic false:read form ltran
+ uint8_t use_ltran; // ture:lstack read from nic false:read form ltran
+
+ uint16_t num_process;
+ uint16_t num_listen_port;
+ uint16_t port_id;
+ uint16_t is_primary;
+ uint16_t num_queue;
+ uint16_t tot_queue_num;
+ uint8_t process_idx;
+ uint32_t process_numa[PROTOCOL_STACK_MAX];
+
bool kni_switch;
bool listen_shadow; // true:listen in all stack thread. false:listen in one stack thread.
bool app_bind_numa;
bool main_thread_affinity;
+ bool seperate_send_recv;
int dpdk_argc;
char **dpdk_argv;
struct secondary_attach_arg sec_attach_arg;
@@ -94,7 +107,7 @@ struct cfg_params {
struct cfg_params *get_global_cfg_params(void);
-static inline bool use_ltran(void)
+static inline uint8_t use_ltran(void)
{
return get_global_cfg_params()->use_ltran;
}
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index c3bc527..55ca7a1 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -51,5 +51,6 @@ void dpdk_skip_nic_init(void);
int32_t dpdk_init_lstack_kni(void);
void dpdk_restore_pci(void);
bool port_in_stack_queue(uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
-
+uint16_t get_port_id();
+struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,uint32_t mbuf_cache_size, uint16_t queue_id);
#endif /* GAZELLE_DPDK_H */
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index 0b53cde..a690adb 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -13,6 +13,19 @@
#ifndef __GAZELLE_ETHDEV_H__
#define __GAZELLE_ETHDEV_H__
+#define INVAILD_PROCESS_IDX 255
+
+enum port_type {
+ PORT_LISTEN,
+ PORT_CONNECT,
+};
+
+enum PACKET_TRANSFER_TYPE{
+ TRANSFER_KERNEL = -1,
+ TRANSFER_OTHER_THREAD,
+ TRANSFER_CURRENT_THREAD,
+};
+
struct protocol_stack;
struct rte_mbuf;
struct lstack_dev_ops {
@@ -22,7 +35,15 @@ struct lstack_dev_ops {
int32_t ethdev_init(struct protocol_stack *stack);
int32_t eth_dev_poll(void);
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag, uint32_t nic_read_number);
+int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number);
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack);
+int recv_pkts_from_other_process(int process_index, void* arg);
+void create_flow_rule_map();
+void kni_handle_rx(uint16_t port_id);
+void delete_user_process_port(uint16_t dst_port, enum port_type type);
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
+void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+
#endif /* __GAZELLE_ETHDEV_H__ */
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 11b001c..b5d3f7d 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -41,6 +41,7 @@ struct protocol_stack {
uint16_t port_id;
uint16_t socket_id;
uint16_t cpu_id;
+ uint32_t stack_idx;
cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */
int32_t epollfd; /* kernel event thread epoll fd */
@@ -53,6 +54,8 @@ struct protocol_stack {
uint32_t reg_head;
volatile bool low_power;
+ bool is_send_thread;
+
lockless_queue rpc_queue __rte_cache_aligned;
char pad __rte_cache_aligned;
@@ -93,6 +96,8 @@ struct protocol_stack_group {
bool wakeup_enable;
struct list_node poll_list;
pthread_spinlock_t poll_list_lock;
+ sem_t sem_listen_thread;
+ struct rte_mempool *total_rxtx_pktmbuf_pool[PROTOCOL_STACK_MAX];
/* dfx stats */
bool latency_start;
@@ -131,6 +136,10 @@ void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup);
void stack_send_pkts(struct protocol_stack *stack);
struct rpc_msg;
+struct thread_params{
+ uint16_t queue_id;
+ uint16_t idx;
+};
void stack_clean_epoll(struct rpc_msg *msg);
void stack_arp(struct rpc_msg *msg);
void stack_socket(struct rpc_msg *msg);
@@ -146,4 +155,5 @@ void stack_getsockopt(struct rpc_msg *msg);
void stack_setsockopt(struct rpc_msg *msg);
void stack_fcntl(struct rpc_msg *msg);
void stack_ioctl(struct rpc_msg *msg);
+void kni_handle_tx(struct rte_mbuf *mbuf);
#endif
diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h
index 0693c4d..0995277 100644
--- a/src/lstack/include/lstack_vdev.h
+++ b/src/lstack/include/lstack_vdev.h
@@ -21,4 +21,11 @@ enum reg_ring_type;
void vdev_dev_ops_init(struct lstack_dev_ops *dev_ops);
int vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple);
+int recv_pkts_from_other_process(int process_index, void* arg);
+void create_flow_rule_map();
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add);
+void init_listen_and_user_ports();
+
#endif /* _GAZELLE_VDEV_H_ */
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index cf81954..389a81c 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -49,3 +49,7 @@ host_addr="192.168.1.10"
mask_addr="255.255.255.0"
gateway_addr="192.168.1.1"
devices="aa:bb:cc:dd:ee:ff"
+
+num_process=2
+process_numa="0,1"
+process_idx=0
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 1441f64..60ea897 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -10,14 +10,22 @@
* See the Mulan PSL v2 for more details.
*/
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <rte_kni.h>
#include <rte_ethdev.h>
#include <rte_malloc.h>
#include <lwip/debug.h>
#include <lwip/etharp.h>
+#include <lwip/posix_api.h>
#include <netif/ethernet.h>
+#include "lwip/tcp.h"
+#include <lwip/prot/tcp.h>
#include <securec.h>
+#include <rte_jhash.h>
#include "lstack_cfg.h"
#include "lstack_vdev.h"
@@ -28,9 +36,33 @@
#include "dpdk_common.h"
#include "lstack_protocol_stack.h"
#include "lstack_ethdev.h"
+#include "lstack_thread_rpc.h"
/* FRAME_MTU + 14byte header */
#define MBUF_MAX_LEN 1514
+#define MAX_PATTERN_NUM 4
+#define MAX_ACTION_NUM 2
+#define FULL_MASK 0xffffffff /* full mask */
+#define EMPTY_MASK 0x0 /* empty mask */
+#define LSTACK_MBUF_LEN 64
+#define TRANSFER_TCP_MUBF_LEN LSTACK_MBUF_LEN + 3
+#define DELETE_FLOWS_PARAMS_NUM 3
+#define DELETE_FLOWS_PARAMS_LENGTH 30
+#define CREATE_FLOWS_PARAMS_NUM 6
+#define CREATE_FLOWS_PARAMS_LENGTH 60
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3
+#define REPLY_LEN 10
+#define SUCCESS_REPLY "success"
+#define ERROR_REPLY "error"
+#define PACKET_READ_SIZE 32
+
+char *client_path = "/var/run/gazelle/client.socket";
+char *server_path = "/var/run/gazelle/server.socket";
+const char *split_delim = ",";
+
+uint8_t g_user_ports[65535] = {INVAILD_PROCESS_IDX,};
+uint8_t g_listen_ports[65535] = {INVAILD_PROCESS_IDX,};
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
{
@@ -45,6 +77,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
struct rte_mbuf *next_m = NULL;
pkt_len = (uint16_t)rte_pktmbuf_pkt_len(m);
+
while (m != NULL) {
len = (uint16_t)rte_pktmbuf_data_len(m);
payload = rte_pktmbuf_mtod(m, void *);
@@ -82,6 +115,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
}
}
+
int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
@@ -115,8 +149,507 @@ int32_t eth_dev_poll(void)
return nr_pkts;
}
+void init_listen_and_user_ports(){
+ memset(g_user_ports, INVAILD_PROCESS_IDX, sizeof(g_user_ports));
+ memset(g_listen_ports, INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
+}
+
+int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply)
+{
+ /* other process queue_id */
+ struct sockaddr_un serun;
+ int sockfd;
+ int ret = 0;
+
+ if ((sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ return -1;
+ }
+
+ memset(&serun, 0, sizeof(serun));
+ serun.sun_family = AF_UNIX;
+ sprintf_s(serun.sun_path, PATH_MAX,"%s%d", server_path, process_index);
+ int len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
+ if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0){
+ return -1;
+ }
+ posix_api->write_fn(sockfd, buf, write_len);
+ if (need_reply) {
+ char reply_message[REPLY_LEN];
+ posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
+ if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
+ ret = 0;
+ }else {
+ ret = -1;
+ }
+ }
+ posix_api->close_fn(sockfd);
+
+ return ret;
+}
+
+struct rte_flow *
+create_flow_director(uint16_t port_id, uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip,
+ uint16_t src_port, uint16_t dst_port, struct rte_flow_error *error)
+{
+ struct rte_flow_attr attr;
+ struct rte_flow_item pattern[MAX_PATTERN_NUM];
+ struct rte_flow_action action[MAX_ACTION_NUM];
+ struct rte_flow *flow = NULL;
+ struct rte_flow_action_queue queue = { .index = queue_id };
+ struct rte_flow_item_ipv4 ip_spec;
+ struct rte_flow_item_ipv4 ip_mask;
+
+ struct rte_flow_item_tcp tcp_spec;
+ struct rte_flow_item_tcp tcp_mask;
+ int res;
+
+ memset(pattern, 0, sizeof(pattern));
+ memset(action, 0, sizeof(action));
+
+ /*
+ * set the rule attribute.
+ * in this case only ingress packets will be checked.
+ */
+ memset(&attr, 0, sizeof(struct rte_flow_attr));
+ attr.ingress = 1;
+
+ /*
+ * create the action sequence.
+ * one action only, move packet to queue
+ */
+ action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
+ action[0].conf = &queue;
+ action[1].type = RTE_FLOW_ACTION_TYPE_END;
+
+ // not limit eth header
+ pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
+
+ // ip header
+ memset(&ip_spec, 0, sizeof(struct rte_flow_item_ipv4));
+ memset(&ip_mask, 0, sizeof(struct rte_flow_item_ipv4));
+ ip_spec.hdr.dst_addr = dst_ip;
+ ip_mask.hdr.dst_addr = FULL_MASK;
+ ip_spec.hdr.src_addr = src_ip;
+ ip_mask.hdr.src_addr = FULL_MASK;
+ pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
+ pattern[1].spec = &ip_spec;
+ pattern[1].mask = &ip_mask;
+
+ // tcp header, full mask 0xffff
+ memset(&tcp_spec, 0, sizeof(struct rte_flow_item_tcp));
+ memset(&tcp_mask, 0, sizeof(struct rte_flow_item_tcp));
+ pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP;
+ tcp_spec.hdr.src_port = src_port;
+ tcp_spec.hdr.dst_port = dst_port;
+ tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port;
+ tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port;
+ pattern[2].spec = &tcp_spec;
+ pattern[2].mask = &tcp_mask;
+
+ /* the final level must be always type end */
+ pattern[3].type = RTE_FLOW_ITEM_TYPE_END;
+ res = rte_flow_validate(port_id, &attr, pattern, action, error);
+ if (!res){
+ flow = rte_flow_create(port_id, &attr, pattern, action, error);
+ }else {
+ LSTACK_LOG(ERR, PORT,"rte_flow_create.rte_flow_validate error, res %d \n", res);
+ }
+
+ return flow;
+}
+
+void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port){
+
+ uint16_t port_id = get_port_id();
+
+ LSTACK_LOG(INFO, LSTACK, "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs :%u \n",
+ queue_id, src_ip,ntohs(src_port), ntohs(dst_port) );
+
+ struct rte_flow_error error;
+ struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error);
+ if (!flow) {
+ LSTACK_LOG(ERR, LSTACK,"config_flow_director, flow can not be created. queue_id %u, src_ip %u, src_port %u, dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n",
+ queue_id, src_ip,src_port,dst_port,ntohs(dst_port), error.type, error.message ? error.message : "(no stated reason)");
+ return;
+ }
+}
+
+void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ uint16_t port_id = get_port_id();
+ (void)port_id;
+}
+
+/*
+ * delete flows
+ * if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0.
+ */
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ if (get_global_cfg_params()->is_primary){
+ delete_flow_director(dst_ip, src_port, dst_port);
+ }else {
+ char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
+ sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u", dst_ip,split_delim, src_port,split_delim,dst_port);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
+ if(ret != 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer_delete_rule_info_to_process0 error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
+ rte_gettid(), dst_ip, src_port, dst_port);
+ }
+ }
+}
+
+/*
+ * add flows
+ * if process 0, add directly, else transfer 'src_ip,dst_ipsrc_portdst_port,queue_id' to process 0.
+ */
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ char process_server_path[CREATE_FLOWS_PARAMS_LENGTH];
+ /* exchage src_ip and dst_ip, src_port and dst_port */
+ uint8_t process_idx = get_global_cfg_params()->process_idx;
+ sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
+ dst_ip,split_delim,src_ip,split_delim, dst_port,split_delim,src_port, split_delim,queue_id,split_delim,process_idx);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
+ if(ret != 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer_create_rule_info_to_process0 error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u, queue_id %u, process_idx %u\n",
+ rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
+ }
+}
+
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add)
+{
+ char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
+ sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, "%u%s%u%s%u", listen_port,split_delim,process_idx, split_delim, is_add);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
+ if(ret != 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer_add_or_delete_listen_port_to_process0 error. tid %d. listen_port %u, process_idx %u\n",
+ rte_gettid(), listen_port, process_idx);
+ }
+}
+
+static int str_to_array(char *args, uint32_t *array, int size)
+{
+ int val;
+ uint16_t cnt = 0;
+ char *elem = NULL;
+ char *next_token = NULL;
+
+ memset(array, 0, sizeof(*array) * size);
+ elem = strtok_s((char *)args, split_delim, &next_token);
+ while (elem != NULL) {
+ if (cnt >= size) {
+ return -1;
+ }
+ val = atoi(elem);
+ if (val < 0) {
+ return -1;
+ }
+ array[cnt] = (uint32_t)val;
+ cnt++;
+
+ elem = strtok_s(NULL, split_delim, &next_token);
+ }
+
+ return cnt;
+}
+
+void parse_and_delete_rule(char* buf)
+{
+ uint32_t array[DELETE_FLOWS_PARAMS_NUM];
+ str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM);
+ uint32_t dst_ip = array[0];
+ uint16_t src_port = array[1];
+ uint16_t dst_port = array[2];
+ delete_flow_director(dst_ip, src_port, dst_port);
+}
+
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type){
+ if (type == PORT_LISTEN) {
+ g_listen_ports[dst_port] = process_idx;
+ }else {
+ g_user_ports[dst_port] = process_idx;
+ }
+}
+
+void delete_user_process_port(uint16_t dst_port, enum port_type type){
+ if (type == PORT_LISTEN) {
+ g_listen_ports[dst_port] = INVAILD_PROCESS_IDX;
+ }else {
+ g_user_ports[dst_port] = INVAILD_PROCESS_IDX;
+ }
+}
+
+void parse_and_create_rule(char* buf)
+{
+ uint32_t array[CREATE_FLOWS_PARAMS_NUM];
+ str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM);
+ uint32_t src_ip = array[0];
+ uint32_t dst_ip = array[1];
+ uint16_t src_port = array[2];
+ uint16_t dst_port = array[3];
+ uint16_t queue_id = array[4];
+ uint8_t process_idx = array[5];
+ config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port);
+ add_user_process_port(dst_port, process_idx, PORT_CONNECT);
+}
+
+void parse_and_add_or_delete_listen_port(char* buf)
+{
+ uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM];
+ str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM);
+ uint16_t listen_port = array[0];
+ uint8_t process_idx = array[1];
+ uint8_t is_add = array[2];
+ if (is_add == 1) {
+ add_user_process_port(listen_port,process_idx, PORT_LISTEN);
+ }else {
+ delete_user_process_port(listen_port, PORT_LISTEN);
+ }
+
+}
+
+void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
+{
+ struct cfg_params *cfgs = get_global_cfg_params();
+
+ for(int i = 1; i < cfgs->num_process; i++){
+ char arp_mbuf[LSTACK_MBUF_LEN] = {0};
+ sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
+ int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
+ if(result < 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. \n", i);
+ }
+ }
+}
+
+void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx)
+{
+ /* current process queue_id */
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx];
+ int ret = -1;
+ while(ret != 0) {
+ ret = rpc_call_arp(stack, mbuf);
+ printf("transfer_tcp_to_thread, ret : %d \n", ret);
+ }
+}
+
+void parse_arp_and_transefer(char* buf)
+{
+ struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf);
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = NULL;
+ int32_t ret;
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ while (ret != 0) {
+ ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ stack->stats.rx_allocmbuf_fail++;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+
+ ret = rpc_call_arp(stack, mbuf_copy);
+
+ while (ret != 0) {
+ rpc_call_arp(stack, mbuf_copy);;
+ }
+ }
+}
+
+void parse_tcp_and_transefer(char* buf)
+{
+ char *next_token = NULL;
+ char *elem = strtok_s(buf, split_delim, &next_token);
+ struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem);
+ elem = strtok_s(NULL, split_delim, &next_token);
+ uint16_t queue_id = atoll(elem);
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint16_t num_queue = get_global_cfg_params()->num_queue;
+ uint16_t stk_index = queue_id % num_queue;
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = stack_group->stacks[stk_index];
+
+ int32_t ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ while (ret != 0) {
+ ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ stack->stats.rx_allocmbuf_fail++;
+ }
+
+ copy_mbuf(mbuf_copy,mbuf);
+
+ transfer_tcp_to_thread(mbuf_copy, stk_index);
+}
+
+int recv_pkts_from_other_process(int process_index, void* arg){
+ struct sockaddr_un serun, cliun;
+ socklen_t cliun_len;
+ int listenfd, connfd, size;
+ char buf[132];
+ /* socket */
+ if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ perror("socket error");
+ return -1;
+ }
+ /* bind */
+ memset(&serun, 0, sizeof(serun));
+ serun.sun_family = AF_UNIX;
+ char process_server_path[PATH_MAX];
+ sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", server_path, process_index);
+ strcpy(serun.sun_path, process_server_path);
+ size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
+ unlink(process_server_path);
+ if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) {
+ perror("bind error");
+ return -1;
+ }
+ if (posix_api->listen_fn(listenfd, 20) < 0) {
+ perror("listen error");
+ return -1;
+ }
+ sem_post((sem_t *)arg);
+ /* block */
+ while(1) {
+ cliun_len = sizeof(cliun);
+ if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0){
+ perror("accept error");
+ continue;
+ }
+ while(1) {
+ int n = posix_api->read_fn(connfd, buf, sizeof(buf));
+ if (n < 0) {
+ perror("read error");
+ break;
+ } else if(n == 0) {
+ break;
+ }
+
+ if(n == LSTACK_MBUF_LEN){
+ /* arp */
+ parse_arp_and_transefer(buf);
+ }else if(n == TRANSFER_TCP_MUBF_LEN) {
+ /* tcp. lstack_mbuf_queue_id */
+ printf("recv_pkts_from_other_process, process idx %d \n ", process_index);
+ parse_tcp_and_transefer(buf);
+ }else if (n == DELETE_FLOWS_PARAMS_LENGTH) {
+ /* delete rule */
+ parse_and_delete_rule(buf);
+ }else if(n == CREATE_FLOWS_PARAMS_LENGTH){
+ /* add rule */
+ parse_and_create_rule(buf);
+ char reply_buf[REPLY_LEN];
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
+ }else {
+ /* add port */
+ parse_and_add_or_delete_listen_port(buf);
+ char reply_buf[REPLY_LEN];
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
+ }
+
+ }
+ posix_api->close_fn(connfd);
+ }
+ posix_api->close_fn(listenfd);
+ return 0;
+}
+
+void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id, char* mbuf_and_queue_id, int write_len){
+
+ sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf,split_delim,queue_id);
+}
+
+const int32_t ipv4_version_offset = 4;
+const int32_t ipv4_version = 4;
+
+int distribute_pakages(struct rte_mbuf *mbuf)
+{
+ struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
+ uint8_t ip_version = (iph->version_ihl & 0xf0) >> ipv4_version_offset;
+ if (likely(ip_version == ipv4_version)) {
+ if (likely(iph->next_proto_id == IPPROTO_TCP)) {
+ int each_process_queue_num = get_global_cfg_params()->num_queue;
+
+ struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *, sizeof(struct rte_ether_hdr) +
+ sizeof(struct rte_ipv4_hdr));
+ uint16_t dst_port = tcp_hdr->dst_port;
+
+ int user_process_idx = (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) ? g_listen_ports[dst_port] : g_user_ports[dst_port];
+
+ if (user_process_idx == INVAILD_PROCESS_IDX) {
+ return TRANSFER_KERNEL;
+ }
+ if(unlikely(tcp_hdr->tcp_flags == TCP_SYN)){
+ uint32_t src_ip = iph->src_addr;
+ uint16_t src_port = tcp_hdr->src_port;
+ uint32_t index = rte_jhash_3words(src_ip, src_port | (dst_port) << 16, 0, 0) % each_process_queue_num;
+ uint16_t queue_id = 0;
+ if (get_global_cfg_params()->seperate_send_recv) {
+ queue_id = user_process_idx * each_process_queue_num + (index/2) * 2;
+ }else {
+ queue_id = user_process_idx * each_process_queue_num + index;
+ }
+ if(queue_id != 0){
+ if(user_process_idx == 0){
+ transfer_tcp_to_thread(mbuf, queue_id);
+ }else {
+ char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN];
+ concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN);
+ transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx, TRANSFER_TCP_MUBF_LEN, false);
+ }
+ return TRANSFER_OTHER_THREAD;
+ }else {
+ return TRANSFER_CURRENT_THREAD;
+ }
+ }else {
+ return TRANSFER_CURRENT_THREAD;
+ }
+ }
+ }else {
+ return TRANSFER_KERNEL;
+ }
+ return TRANSFER_KERNEL;
+}
+
+void kni_handle_rx(uint16_t port_id)
+{
+ struct rte_mbuf *pkts_burst[PACKET_READ_SIZE];
+ uint32_t nb_kni_rx = rte_kni_rx_burst(get_gazelle_kni(), pkts_burst, PACKET_READ_SIZE);
+ if (nb_kni_rx > 0) {
+ uint16_t nb_rx = rte_eth_tx_burst(port_id, 0, pkts_burst, nb_kni_rx);
+ for (uint16_t i = nb_rx; i < nb_kni_rx; ++i) {
+ rte_pktmbuf_free(pkts_burst[i]);
+ }
+ }
+ return;
+}
+
+void kni_handle_tx(struct rte_mbuf *mbuf)
+{
+ if (!get_global_cfg_params()->kni_switch) {
+ return;
+ }
+ struct rte_ipv4_hdr *ipv4_hdr;
+ uint16_t l3_offset = mbuf->l2_len;
+
+ ipv4_hdr = (struct rte_ipv4_hdr *)(rte_pktmbuf_mtod(mbuf, char*) +
+ l3_offset);
+ if (mbuf->nb_segs > 1) {
+ ipv4_hdr->hdr_checksum = 0;
+ ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
+ }
+
+ // 发送到内核协议栈
+ if (!rte_kni_tx_burst(get_gazelle_kni(), &mbuf, 1)) {
+ rte_pktmbuf_free(mbuf);
+ }
+}
+
/* optimized eth_dev_poll() in lstack */
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag, uint32_t nic_read_number)
+int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number)
{
uint32_t nr_pkts;
@@ -131,15 +664,33 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag,
}
for (uint32_t i = 0; i < nr_pkts; i++) {
+ /* 1 current thread recv; 0 other thread recv; -1 kni recv; */
+ int transfer_type = TRANSFER_CURRENT_THREAD;
/* copy arp into other stack */
if (!use_ltran_flag) {
struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *);
if (unlikely(RTE_BE16(RTE_ETHER_TYPE_ARP) == ethh->ether_type)) {
stack_broadcast_arp(stack->pkts[i], stack);
+ if (!use_ltran_flag) {
+ // copy arp into other process
+ transfer_arp_to_other_process(stack->pkts[i]);
+ transfer_type = TRANSFER_KERNEL;
+ }
+ }else {
+ if (!use_ltran_flag && stack->queue_id == 0) {
+ transfer_type = distribute_pakages(stack->pkts[i]);
+ }
}
}
- eth_dev_recv(stack->pkts[i], stack);
+ if (likely(transfer_type == TRANSFER_CURRENT_THREAD)) {
+ eth_dev_recv(stack->pkts[i], stack);
+
+ }else if (transfer_type == TRANSFER_KERNEL) {
+ kni_handle_tx(stack->pkts[i]);
+ }else {
+ /*transfer to other thread*/
+ }
}
stack->stats.rx += nr_pkts;
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 3d1204e..1752853 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -102,7 +102,9 @@ static uint32_t vdev_rx_poll(struct protocol_stack *stack, struct rte_mbuf **pkt
pkts[i]->packet_type = RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_TCP;
}
- return rte_gro_reassemble_burst(pkts, pkt_num, &gro_param);
+ pkt_num = rte_gro_reassemble_burst(pkts, pkt_num, &gro_param);
+
+ return pkt_num;
}
static uint32_t ltran_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkts, uint32_t nr_pkts)
@@ -145,14 +147,51 @@ static uint32_t vdev_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkt
int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
{
- if (!use_ltran()) {
- return 0;
- }
-
if (qtuple == NULL) {
return -1;
}
+ if (!use_ltran()) {
+ if(type == REG_RING_TCP_LISTEN_CLOSE){
+ if (get_global_cfg_params()->is_primary) {
+ delete_user_process_port(qtuple->src_port, PORT_LISTEN);
+ }else{
+ transfer_add_or_delete_listen_port_to_process0(qtuple->src_port,get_global_cfg_params()->process_idx, 0);
+ }
+ }
+
+ if (type == REG_RING_TCP_CONNECT_CLOSE) {
+ if (get_global_cfg_params()->is_primary) {
+ delete_user_process_port(qtuple->src_port, PORT_CONNECT);
+ delete_flow_director(qtuple->dst_ip, qtuple->src_port, qtuple->dst_port);
+ }else{
+ transfer_delete_rule_info_to_process0(qtuple->dst_ip,qtuple->src_port,qtuple->dst_port);
+ }
+ }
+
+ if (type == REG_RING_TCP_CONNECT) {
+ uint16_t queue_id = get_protocol_stack()->queue_id;
+ if (get_global_cfg_params()->is_primary){
+ add_user_process_port(qtuple->src_port, get_global_cfg_params()->process_idx, PORT_CONNECT);
+ if (queue_id != 0) {
+ config_flow_director(queue_id, qtuple->dst_ip, qtuple->src_ip, qtuple->dst_port, qtuple->src_port);
+ }
+ }else {
+ transfer_create_rule_info_to_process0(queue_id, qtuple->src_ip, qtuple->dst_ip, qtuple->src_port, qtuple->dst_port);
+ }
+ }
+
+ if (type == REG_RING_TCP_LISTEN){
+ if (get_global_cfg_params()->is_primary) {
+ add_user_process_port(qtuple->src_port, get_global_cfg_params()->process_idx, PORT_LISTEN);
+ }else {
+ transfer_add_or_delete_listen_port_to_process0(qtuple->src_port,get_global_cfg_params()->process_idx, 1);
+ }
+ }
+ return 0;
+ }
+
+
int32_t ret;
uint32_t sent_pkts = 0;
void *free_buf[VDEV_REG_QUEUE_SZ];
--
2.23.0