1708 lines
62 KiB
Diff
1708 lines
62 KiB
Diff
From 409b6155a1ec9324bd68aae97a07e33560c19028 Mon Sep 17 00:00:00 2001
|
||
From: jiangheng <jiangheng14@huawei.com>
|
||
Date: Wed, 21 Feb 2024 08:05:03 +0800
|
||
Subject: [PATCH] split the flow fules related functions into separate file
|
||
|
||
---
|
||
src/lstack/core/lstack_init.c | 1 +
|
||
src/lstack/core/lstack_protocol_stack.c | 25 +-
|
||
src/lstack/include/lstack_ethdev.h | 27 -
|
||
src/lstack/include/lstack_flow.h | 51 ++
|
||
src/lstack/include/lstack_vdev.h | 9 -
|
||
src/lstack/netif/dir.mk | 2 +-
|
||
src/lstack/netif/lstack_ethdev.c | 687 +-----------------------
|
||
src/lstack/netif/lstack_flow.c | 680 +++++++++++++++++++++++
|
||
src/lstack/netif/lstack_vdev.c | 1 +
|
||
9 files changed, 747 insertions(+), 736 deletions(-)
|
||
create mode 100644 src/lstack/include/lstack_flow.h
|
||
create mode 100644 src/lstack/netif/lstack_flow.c
|
||
|
||
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
|
||
index 31fd91d..d22a295 100644
|
||
--- a/src/lstack/core/lstack_init.c
|
||
+++ b/src/lstack/core/lstack_init.c
|
||
@@ -48,6 +48,7 @@
|
||
#include "lstack_protocol_stack.h"
|
||
#include "lstack_preload.h"
|
||
#include "lstack_wrap.h"
|
||
+#include "lstack_flow.h"
|
||
|
||
static void check_process_start(void)
|
||
{
|
||
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
|
||
index 18e5df7..a545b73 100644
|
||
--- a/src/lstack/core/lstack_protocol_stack.c
|
||
+++ b/src/lstack/core/lstack_protocol_stack.c
|
||
@@ -454,14 +454,12 @@ int stack_polling(uint32_t wakeup_tick)
|
||
{
|
||
int force_quit;
|
||
struct cfg_params *cfg = get_global_cfg_params();
|
||
- uint8_t use_ltran_flag = cfg->use_ltran;
|
||
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
|
||
bool kni_switch = cfg->kni_switch;
|
||
#endif
|
||
bool use_sockmap = cfg->use_sockmap;
|
||
bool stack_mode_rtc = cfg->stack_mode_rtc;
|
||
uint32_t rpc_number = cfg->rpc_number;
|
||
- uint32_t nic_read_number = cfg->nic_read_number;
|
||
uint32_t read_connect_number = cfg->read_connect_number;
|
||
struct protocol_stack *stack = get_protocol_stack();
|
||
|
||
@@ -469,7 +467,7 @@ int stack_polling(uint32_t wakeup_tick)
|
||
rpc_poll_msg(&stack->dfx_rpc_queue, 2);
|
||
force_quit = rpc_poll_msg(&stack->rpc_queue, rpc_number);
|
||
|
||
- gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
|
||
+ eth_dev_poll();
|
||
sys_timer_run();
|
||
if (cfg->low_power_mod != 0) {
|
||
low_power_idling(stack);
|
||
@@ -525,10 +523,6 @@ static void* gazelle_stack_thread(void *arg)
|
||
}
|
||
sem_post(&g_stack_group.sem_stack_setup);
|
||
|
||
- if (!use_ltran() && queue_id == 0) {
|
||
- init_listen_and_user_ports();
|
||
- }
|
||
-
|
||
LSTACK_LOG(INFO, LSTACK, "stack_%02hu init success\n", queue_id);
|
||
if (get_global_cfg_params()->stack_mode_rtc) {
|
||
return NULL;
|
||
@@ -545,12 +539,6 @@ static void* gazelle_stack_thread(void *arg)
|
||
return NULL;
|
||
}
|
||
|
||
-static void gazelle_listen_thread(void *arg)
|
||
-{
|
||
- struct cfg_params *cfg_param = get_global_cfg_params();
|
||
- recv_pkts_from_other_process(cfg_param->process_idx, arg);
|
||
-}
|
||
-
|
||
int32_t stack_group_init_mempool(void)
|
||
{
|
||
struct cfg_params *global_cfg_parmas = get_global_cfg_params();
|
||
@@ -611,17 +599,6 @@ int32_t stack_group_init(void)
|
||
}
|
||
}
|
||
|
||
- /* run to completion mode does not currently support multiple process */
|
||
- if (!use_ltran() && !get_global_cfg_params()->stack_mode_rtc) {
|
||
- char name[PATH_MAX];
|
||
- sem_init(&stack_group->sem_listen_thread, 0, 0);
|
||
- sprintf_s(name, sizeof(name), "%s", "listen_thread");
|
||
- struct sys_thread *thread = sys_thread_new(name, gazelle_listen_thread,
|
||
- (void*)(&stack_group->sem_listen_thread), 0, 0);
|
||
- free(thread);
|
||
- sem_wait(&stack_group->sem_listen_thread);
|
||
- }
|
||
-
|
||
return 0;
|
||
}
|
||
|
||
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
|
||
index 3252906..0c3d906 100644
|
||
--- a/src/lstack/include/lstack_ethdev.h
|
||
+++ b/src/lstack/include/lstack_ethdev.h
|
||
@@ -16,25 +16,6 @@
|
||
#include <rte_eal.h>
|
||
#include <rte_version.h>
|
||
|
||
-#define INVAILD_PROCESS_IDX 255
|
||
-
|
||
-enum port_type {
|
||
- PORT_LISTEN,
|
||
- PORT_CONNECT,
|
||
-};
|
||
-
|
||
-enum PACKET_TRANSFER_TYPE {
|
||
- TRANSFER_KERNEL = -1,
|
||
- TRANSFER_OTHER_THREAD,
|
||
- TRANSFER_CURRENT_THREAD,
|
||
-};
|
||
-
|
||
-enum TRANSFER_MESSAGE_RESULT {
|
||
- CONNECT_ERROR = -2,
|
||
- REPLY_ERROR = -1,
|
||
- TRANSFER_SUCESS = 0,
|
||
-};
|
||
-
|
||
struct protocol_stack;
|
||
struct rte_mbuf;
|
||
struct lstack_dev_ops {
|
||
@@ -44,21 +25,13 @@ struct lstack_dev_ops {
|
||
|
||
int32_t ethdev_init(struct protocol_stack *stack);
|
||
int32_t eth_dev_poll(void);
|
||
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number);
|
||
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack);
|
||
|
||
-int recv_pkts_from_other_process(int process_index, void* arg);
|
||
-int32_t check_params_from_primary(void);
|
||
-
|
||
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
|
||
void kni_handle_rx(uint16_t port_id);
|
||
void kni_handle_tx(struct rte_mbuf *mbuf);
|
||
#endif
|
||
|
||
-void delete_user_process_port(uint16_t dst_port, enum port_type type);
|
||
-void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
|
||
-void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
||
-void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
||
void netif_poll(struct netif *netif);
|
||
|
||
#endif /* __GAZELLE_ETHDEV_H__ */
|
||
diff --git a/src/lstack/include/lstack_flow.h b/src/lstack/include/lstack_flow.h
|
||
new file mode 100644
|
||
index 0000000..ad35cdf
|
||
--- /dev/null
|
||
+++ b/src/lstack/include/lstack_flow.h
|
||
@@ -0,0 +1,51 @@
|
||
+/*
|
||
+* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
|
||
+* gazelle is licensed under the Mulan PSL v2.
|
||
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
+* You may obtain a copy of Mulan PSL v2 at:
|
||
+* http://license.coscl.org.cn/MulanPSL2
|
||
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
+* PURPOSE.
|
||
+* See the Mulan PSL v2 for more details.
|
||
+*/
|
||
+
|
||
+#ifndef __LSTACK_FLOW_H__
|
||
+#define __LSTACK_FLOW_H__
|
||
+
|
||
+#include <rte_mbuf.h>
|
||
+
|
||
+enum port_type {
|
||
+ PORT_LISTEN,
|
||
+ PORT_CONNECT,
|
||
+};
|
||
+
|
||
+enum PACKET_TRANSFER_TYPE {
|
||
+ TRANSFER_KERNEL = -1,
|
||
+ TRANSFER_OTHER_THREAD,
|
||
+ TRANSFER_CURRENT_THREAD,
|
||
+};
|
||
+
|
||
+enum TRANSFER_MESSAGE_RESULT {
|
||
+ CONNECT_ERROR = -2,
|
||
+ REPLY_ERROR = -1,
|
||
+ TRANSFER_SUCESS = 0,
|
||
+};
|
||
+
|
||
+int distribute_pakages(struct rte_mbuf *mbuf);
|
||
+void flow_init(void);
|
||
+int32_t check_params_from_primary(void);
|
||
+
|
||
+int recv_pkts_from_other_process(int process_index, void* arg);
|
||
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
||
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
|
||
+ uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
||
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add);
|
||
+void transfer_arp_to_other_process(struct rte_mbuf *mbuf);
|
||
+
|
||
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
|
||
+void delete_user_process_port(uint16_t dst_port, enum port_type type);
|
||
+
|
||
+void gazelle_listen_thread(void *arg);
|
||
+
|
||
+#endif
|
||
diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h
|
||
index 007eec7..4e5d191 100644
|
||
--- a/src/lstack/include/lstack_vdev.h
|
||
+++ b/src/lstack/include/lstack_vdev.h
|
||
@@ -13,19 +13,10 @@
|
||
#ifndef _GAZELLE_VDEV_H_
|
||
#define _GAZELLE_VDEV_H_
|
||
|
||
-#include <stdbool.h>
|
||
-
|
||
struct lstack_dev_ops;
|
||
struct gazelle_quintuple;
|
||
enum reg_ring_type;
|
||
void vdev_dev_ops_init(struct lstack_dev_ops *dev_ops);
|
||
int vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple);
|
||
|
||
-int recv_pkts_from_other_process(int process_index, void* arg);
|
||
-void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
||
-void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
|
||
- uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
||
-void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add);
|
||
-void init_listen_and_user_ports();
|
||
-
|
||
#endif /* _GAZELLE_VDEV_H_ */
|
||
diff --git a/src/lstack/netif/dir.mk b/src/lstack/netif/dir.mk
|
||
index ec7c4ad..20fb5d6 100644
|
||
--- a/src/lstack/netif/dir.mk
|
||
+++ b/src/lstack/netif/dir.mk
|
||
@@ -8,5 +8,5 @@
|
||
# PURPOSE.
|
||
# See the Mulan PSL v2 for more details.
|
||
|
||
-SRC = lstack_ethdev.c lstack_vdev.c
|
||
+SRC = lstack_ethdev.c lstack_vdev.c lstack_flow.c
|
||
$(eval $(call register_dir, netif, $(SRC)))
|
||
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
|
||
index 25c94eb..2e938b0 100644
|
||
--- a/src/lstack/netif/lstack_ethdev.c
|
||
+++ b/src/lstack/netif/lstack_ethdev.c
|
||
@@ -10,74 +10,36 @@
|
||
* See the Mulan PSL v2 for more details.
|
||
*/
|
||
|
||
-#include <sys/socket.h>
|
||
-#include <sys/un.h>
|
||
-
|
||
#include <rte_eal.h>
|
||
#include <rte_version.h>
|
||
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
|
||
#include <rte_kni.h>
|
||
#endif
|
||
#include <rte_ethdev.h>
|
||
-#include <rte_malloc.h>
|
||
-#include <rte_ether.h>
|
||
|
||
-#include <lwip/debug.h>
|
||
#include <lwip/etharp.h>
|
||
#include <lwip/ethip6.h>
|
||
#include <lwip/posix_api.h>
|
||
#include <netif/ethernet.h>
|
||
-#include <lwip/tcp.h>
|
||
-#include <lwip/prot/tcp.h>
|
||
|
||
#include <securec.h>
|
||
-#include <rte_jhash.h>
|
||
-#include <uthash.h>
|
||
|
||
+#include "dpdk_common.h"
|
||
#include "lstack_cfg.h"
|
||
#include "lstack_vdev.h"
|
||
#include "lstack_stack_stat.h"
|
||
#include "lstack_log.h"
|
||
#include "lstack_dpdk.h"
|
||
#include "lstack_lwip.h"
|
||
-#include "dpdk_common.h"
|
||
#include "lstack_protocol_stack.h"
|
||
#include "lstack_thread_rpc.h"
|
||
+#include "lstack_flow.h"
|
||
#include "lstack_ethdev.h"
|
||
|
||
/* FRAME_MTU + 14byte header */
|
||
#define MBUF_MAX_LEN 1514
|
||
-#define MAX_PATTERN_NUM 4
|
||
-#define MAX_ACTION_NUM 2
|
||
-#define FULL_MASK 0xffffffff /* full mask */
|
||
-#define EMPTY_MASK 0x0 /* empty mask */
|
||
-#define LSTACK_MBUF_LEN 64
|
||
-#define TRANSFER_TCP_MUBF_LEN (LSTACK_MBUF_LEN + 3)
|
||
-#define DELETE_FLOWS_PARAMS_NUM 3
|
||
-#define DELETE_FLOWS_PARAMS_LENGTH 30
|
||
-#define CREATE_FLOWS_PARAMS_NUM 6
|
||
-#define CREATE_FLOWS_PARAMS_LENGTH 60
|
||
-#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25
|
||
-#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3
|
||
-#define REPLY_LEN 10
|
||
-#define SUCCESS_REPLY "success"
|
||
-#define ERROR_REPLY "error"
|
||
#define PACKET_READ_SIZE 32
|
||
|
||
-#define GET_LSTACK_NUM 14
|
||
-#define GET_LSTACK_NUM_STRING "get_lstack_num"
|
||
-
|
||
-#define SERVER_PATH "/var/run/gazelle/server.socket"
|
||
-#define SPLIT_DELIM ","
|
||
-
|
||
-#define UNIX_TCP_PORT_MAX 65535
|
||
-
|
||
-#define IPV4_VERSION_OFFSET 4
|
||
-#define IPV4_VERSION 4
|
||
-
|
||
-static uint8_t g_user_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
|
||
-static uint8_t g_listen_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
|
||
-
|
||
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
|
||
{
|
||
int32_t ret;
|
||
@@ -126,636 +88,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
|
||
}
|
||
}
|
||
|
||
-int32_t eth_dev_poll(void)
|
||
-{
|
||
- uint32_t nr_pkts;
|
||
- struct cfg_params *cfg = get_global_cfg_params();
|
||
- struct protocol_stack *stack = get_protocol_stack();
|
||
-
|
||
- nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, cfg->nic_read_number);
|
||
- if (nr_pkts == 0) {
|
||
- return 0;
|
||
- }
|
||
-
|
||
- if (!cfg->use_ltran && get_protocol_stack_group()->latency_start) {
|
||
- uint64_t time_stamp = get_current_time();
|
||
- time_stamp_into_mbuf(nr_pkts, stack->pkts, time_stamp);
|
||
- }
|
||
-
|
||
- for (uint32_t i = 0; i < nr_pkts; i++) {
|
||
- /* copy arp into other stack */
|
||
- if (!cfg->use_ltran) {
|
||
- struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *);
|
||
- if (unlikely(RTE_BE16(RTE_ETHER_TYPE_ARP) == ethh->ether_type)) {
|
||
- stack_broadcast_arp(stack->pkts[i], stack);
|
||
- }
|
||
- }
|
||
-
|
||
- eth_dev_recv(stack->pkts[i], stack);
|
||
- }
|
||
-
|
||
- stack->stats.rx += nr_pkts;
|
||
-
|
||
- return nr_pkts;
|
||
-}
|
||
-
|
||
-/* flow rule map */
|
||
-#define RULE_KEY_LEN 23
|
||
-struct flow_rule {
|
||
- char rule_key[RULE_KEY_LEN];
|
||
- struct rte_flow *flow;
|
||
- UT_hash_handle hh;
|
||
-};
|
||
-
|
||
-static uint16_t g_flow_num = 0;
|
||
-struct flow_rule *g_flow_rules = NULL;
|
||
-struct flow_rule *find_rule(char *rule_key)
|
||
-{
|
||
- struct flow_rule *fl;
|
||
- HASH_FIND_STR(g_flow_rules, rule_key, fl);
|
||
- return fl;
|
||
-}
|
||
-
|
||
-void add_rule(char* rule_key, struct rte_flow *flow)
|
||
-{
|
||
- struct flow_rule *rule;
|
||
- HASH_FIND_STR(g_flow_rules, rule_key, rule);
|
||
- if (rule == NULL) {
|
||
- rule = (struct flow_rule*)malloc(sizeof(struct flow_rule));
|
||
- strcpy_s(rule->rule_key, RULE_KEY_LEN, rule_key);
|
||
- HASH_ADD_STR(g_flow_rules, rule_key, rule);
|
||
- }
|
||
- rule->flow = flow;
|
||
-}
|
||
-
|
||
-void delete_rule(char* rule_key)
|
||
-{
|
||
- struct flow_rule *rule = NULL;
|
||
- HASH_FIND_STR(g_flow_rules, rule_key, rule);
|
||
- if (rule != NULL) {
|
||
- HASH_DEL(g_flow_rules, rule);
|
||
- free(rule);
|
||
- }
|
||
-}
|
||
-
|
||
-void init_listen_and_user_ports(void)
|
||
-{
|
||
- memset_s(g_user_ports, sizeof(g_user_ports), INVAILD_PROCESS_IDX, sizeof(g_user_ports));
|
||
- memset_s(g_listen_ports, sizeof(g_listen_ports), INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
|
||
-}
|
||
-
|
||
-int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply)
|
||
-{
|
||
- /* other process queue_id */
|
||
- struct sockaddr_un serun;
|
||
- int sockfd;
|
||
- int ret = 0;
|
||
-
|
||
- sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0);
|
||
- memset_s(&serun, sizeof(serun), 0, sizeof(serun));
|
||
- serun.sun_family = AF_UNIX;
|
||
- sprintf_s(serun.sun_path, PATH_MAX, "%s%d", SERVER_PATH, process_index);
|
||
- int32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
|
||
- if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0) {
|
||
- return CONNECT_ERROR;
|
||
- }
|
||
- posix_api->write_fn(sockfd, buf, write_len);
|
||
- if (need_reply) {
|
||
- char reply_message[REPLY_LEN];
|
||
- int32_t read_result = posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
|
||
- if (read_result > 0) {
|
||
- if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
|
||
- ret = TRANSFER_SUCESS;
|
||
- } else if (strcmp(reply_message, ERROR_REPLY) == 0) {
|
||
- ret = REPLY_ERROR;
|
||
- } else {
|
||
- ret = atoi(reply_message);
|
||
- }
|
||
- } else {
|
||
- ret = REPLY_ERROR;
|
||
- }
|
||
- }
|
||
- posix_api->close_fn(sockfd);
|
||
-
|
||
- return ret;
|
||
-}
|
||
-
|
||
-int32_t check_params_from_primary(void)
|
||
-{
|
||
- struct cfg_params *cfg = get_global_cfg_params();
|
||
- if (cfg->is_primary) {
|
||
- return 0;
|
||
- }
|
||
- // check lstack num
|
||
- char get_lstack_num[GET_LSTACK_NUM];
|
||
- sprintf_s(get_lstack_num, GET_LSTACK_NUM, "%s", GET_LSTACK_NUM_STRING);
|
||
- int32_t ret = transfer_pkt_to_other_process(get_lstack_num, 0, GET_LSTACK_NUM, true);
|
||
- if (ret != cfg->num_cpu) {
|
||
- return -1;
|
||
- }
|
||
- return 0;
|
||
-}
|
||
-
|
||
-struct rte_flow *create_flow_director(uint16_t port_id, uint16_t queue_id,
|
||
- uint32_t src_ip, uint32_t dst_ip,
|
||
- uint16_t src_port, uint16_t dst_port,
|
||
- struct rte_flow_error *error)
|
||
-{
|
||
- struct rte_flow_attr attr;
|
||
- struct rte_flow_item pattern[MAX_PATTERN_NUM];
|
||
- struct rte_flow_action action[MAX_ACTION_NUM];
|
||
- struct rte_flow *flow = NULL;
|
||
- struct rte_flow_action_queue queue = { .index = queue_id };
|
||
- struct rte_flow_item_ipv4 ip_spec;
|
||
- struct rte_flow_item_ipv4 ip_mask;
|
||
-
|
||
- struct rte_flow_item_tcp tcp_spec;
|
||
- struct rte_flow_item_tcp tcp_mask;
|
||
- int res;
|
||
-
|
||
- memset_s(pattern, sizeof(pattern), 0, sizeof(pattern));
|
||
- memset_s(action, sizeof(action), 0, sizeof(action));
|
||
-
|
||
- /*
|
||
- * set the rule attribute.
|
||
- * in this case only ingress packets will be checked.
|
||
- */
|
||
- memset_s(&attr, sizeof(struct rte_flow_attr), 0, sizeof(struct rte_flow_attr));
|
||
- attr.ingress = 1;
|
||
-
|
||
- /*
|
||
- * create the action sequence.
|
||
- * one action only, move packet to queue
|
||
- */
|
||
- action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
|
||
- action[0].conf = &queue;
|
||
- action[1].type = RTE_FLOW_ACTION_TYPE_END;
|
||
-
|
||
- // not limit eth header
|
||
- pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
|
||
-
|
||
- // ip header
|
||
- memset_s(&ip_spec, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
|
||
- memset_s(&ip_mask, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
|
||
- ip_spec.hdr.dst_addr = dst_ip;
|
||
- ip_mask.hdr.dst_addr = FULL_MASK;
|
||
- ip_spec.hdr.src_addr = src_ip;
|
||
- ip_mask.hdr.src_addr = FULL_MASK;
|
||
- pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
|
||
- pattern[1].spec = &ip_spec;
|
||
- pattern[1].mask = &ip_mask;
|
||
-
|
||
- // tcp header, full mask 0xffff
|
||
- memset_s(&tcp_spec, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
|
||
- memset_s(&tcp_mask, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
|
||
- pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP; // 2: pattern 2 is tcp header
|
||
- tcp_spec.hdr.src_port = src_port;
|
||
- tcp_spec.hdr.dst_port = dst_port;
|
||
- tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port;
|
||
- tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port;
|
||
- pattern[2].spec = &tcp_spec;
|
||
- pattern[2].mask = &tcp_mask;
|
||
-
|
||
- /* the final level must be always type end */
|
||
- pattern[3].type = RTE_FLOW_ITEM_TYPE_END;
|
||
- res = rte_flow_validate(port_id, &attr, pattern, action, error);
|
||
- if (!res) {
|
||
- flow = rte_flow_create(port_id, &attr, pattern, action, error);
|
||
- } else {
|
||
- LSTACK_LOG(ERR, LSTACK, "rte_flow_create.rte_flow_validate error, res %d \n", res);
|
||
- }
|
||
-
|
||
- return flow;
|
||
-}
|
||
-
|
||
-void config_flow_director(uint16_t queue_id, uint32_t src_ip,
|
||
- uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||
-{
|
||
- uint16_t port_id = get_protocol_stack_group()->port_id;
|
||
- char rule_key[RULE_KEY_LEN] = {0};
|
||
- sprintf_s(rule_key, sizeof(rule_key), "%u_%u_%u", src_ip, src_port, dst_port);
|
||
- struct flow_rule *fl_exist = find_rule(rule_key);
|
||
- if (fl_exist != NULL) {
|
||
- return;
|
||
- }
|
||
-
|
||
- LSTACK_LOG(INFO, LSTACK,
|
||
- "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs:%u\n",
|
||
- queue_id, src_ip, ntohs(src_port), ntohs(dst_port));
|
||
-
|
||
- struct rte_flow_error error;
|
||
- struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error);
|
||
- if (!flow) {
|
||
- LSTACK_LOG(ERR, LSTACK,"flow can not be created. queue_id %u, src_ip %u, src_port %u,"
|
||
- "dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n",
|
||
- queue_id, src_ip, src_port, dst_port, ntohs(dst_port),
|
||
- error.type, error.message ? error.message : "(no stated reason)");
|
||
- return;
|
||
- }
|
||
- __sync_fetch_and_add(&g_flow_num, 1);
|
||
- add_rule(rule_key, flow);
|
||
-}
|
||
-
|
||
-void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||
-{
|
||
- uint16_t port_id = get_protocol_stack_group()->port_id;
|
||
- char rule_key[RULE_KEY_LEN] = {0};
|
||
- sprintf_s(rule_key, RULE_KEY_LEN, "%u_%u_%u",dst_ip, dst_port, src_port);
|
||
- struct flow_rule *fl = find_rule(rule_key);
|
||
-
|
||
- if(fl != NULL){
|
||
- struct rte_flow_error error;
|
||
- int ret = rte_flow_destroy(port_id, fl->flow, &error);
|
||
- if(ret != 0){
|
||
- LSTACK_LOG(ERR, LSTACK, "Flow can't be delete %d message: %s\n",
|
||
- error.type, error.message ? error.message : "(no stated reason)");
|
||
- }
|
||
- delete_rule(rule_key);
|
||
- __sync_fetch_and_sub(&g_flow_num, 1);
|
||
- }
|
||
-}
|
||
-
|
||
-/* if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0. */
|
||
-void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||
-{
|
||
- if (get_global_cfg_params()->is_primary) {
|
||
- delete_flow_director(dst_ip, src_port, dst_port);
|
||
- } else {
|
||
- char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
|
||
- sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u",
|
||
- dst_ip, SPLIT_DELIM, src_port, SPLIT_DELIM, dst_port);
|
||
- int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
|
||
- if(ret != TRANSFER_SUCESS){
|
||
- LSTACK_LOG(ERR, LSTACK, "error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
|
||
- rte_gettid(), dst_ip, src_port, dst_port);
|
||
- }
|
||
- }
|
||
-}
|
||
-
|
||
-// if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0.
|
||
-void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
|
||
- uint32_t dst_ip, uint16_t src_port,
|
||
- uint16_t dst_port)
|
||
-{
|
||
- char process_server_path[CREATE_FLOWS_PARAMS_LENGTH];
|
||
- /* exchage src_ip and dst_ip, src_port and dst_port */
|
||
- uint8_t process_idx = get_global_cfg_params()->process_idx;
|
||
- sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
|
||
- dst_ip, SPLIT_DELIM, src_ip, SPLIT_DELIM,
|
||
- dst_port, SPLIT_DELIM, src_port, SPLIT_DELIM,
|
||
- queue_id, SPLIT_DELIM, process_idx);
|
||
- int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
|
||
- if (ret != TRANSFER_SUCESS) {
|
||
- LSTACK_LOG(ERR, LSTACK, "error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u,"
|
||
- "queue_id %u, process_idx %u\n",
|
||
- rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
|
||
- }
|
||
-}
|
||
-
|
||
-void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add)
|
||
-{
|
||
- char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
|
||
- sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH,
|
||
- "%u%s%u%s%u", listen_port, SPLIT_DELIM, process_idx, SPLIT_DELIM, is_add);
|
||
- int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
|
||
- if(ret != TRANSFER_SUCESS) {
|
||
- LSTACK_LOG(ERR, LSTACK, "error. tid %d. listen_port %u, process_idx %u\n",
|
||
- rte_gettid(), listen_port, process_idx);
|
||
- }
|
||
-}
|
||
-
|
||
-static int str_to_array(char *args, uint32_t *array, int size)
|
||
-{
|
||
- int val;
|
||
- uint16_t cnt = 0;
|
||
- char *elem = NULL;
|
||
- char *next_token = NULL;
|
||
-
|
||
- memset_s(array, sizeof(*array) * size, 0, sizeof(*array) * size);
|
||
- elem = strtok_s((char *)args, SPLIT_DELIM, &next_token);
|
||
- while (elem != NULL) {
|
||
- if (cnt >= size) {
|
||
- return -1;
|
||
- }
|
||
- val = atoi(elem);
|
||
- if (val < 0) {
|
||
- return -1;
|
||
- }
|
||
- array[cnt] = (uint32_t)val;
|
||
- cnt++;
|
||
-
|
||
- elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
|
||
- }
|
||
-
|
||
- return cnt;
|
||
-}
|
||
-
|
||
-void parse_and_delete_rule(char* buf)
|
||
-{
|
||
- uint32_t array[DELETE_FLOWS_PARAMS_NUM];
|
||
- str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM);
|
||
- uint32_t dst_ip = array[0];
|
||
- uint16_t src_port = array[1];
|
||
- uint16_t dst_port = array[2];
|
||
- delete_flow_director(dst_ip, src_port, dst_port);
|
||
-}
|
||
-
|
||
-void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type)
|
||
-{
|
||
- if (type == PORT_LISTEN) {
|
||
- g_listen_ports[dst_port] = process_idx;
|
||
- } else {
|
||
- g_user_ports[dst_port] = process_idx;
|
||
- }
|
||
-}
|
||
-
|
||
-void delete_user_process_port(uint16_t dst_port, enum port_type type)
|
||
-{
|
||
- if (type == PORT_LISTEN) {
|
||
- g_listen_ports[dst_port] = INVAILD_PROCESS_IDX;
|
||
- } else {
|
||
- g_user_ports[dst_port] = INVAILD_PROCESS_IDX;
|
||
- }
|
||
-}
|
||
-
|
||
-void parse_and_create_rule(char* buf)
|
||
-{
|
||
- uint32_t array[CREATE_FLOWS_PARAMS_NUM];
|
||
- str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM);
|
||
- uint32_t src_ip = array[0];
|
||
- uint32_t dst_ip = array[1];
|
||
- uint16_t src_port = array[2];
|
||
- uint16_t dst_port = array[3];
|
||
- uint16_t queue_id = array[4];
|
||
- uint8_t process_idx = array[5];
|
||
- config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port);
|
||
- add_user_process_port(dst_port, process_idx, PORT_CONNECT);
|
||
-}
|
||
-
|
||
-void parse_and_add_or_delete_listen_port(char* buf)
|
||
-{
|
||
- uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM];
|
||
- str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM);
|
||
- uint16_t listen_port = array[0];
|
||
- uint8_t process_idx = array[1];
|
||
- uint8_t is_add = array[2];
|
||
- if (is_add == 1) {
|
||
- add_user_process_port(listen_port, process_idx, PORT_LISTEN);
|
||
- } else {
|
||
- delete_user_process_port(listen_port, PORT_LISTEN);
|
||
- }
|
||
-
|
||
-}
|
||
-
|
||
-void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
|
||
-{
|
||
- struct cfg_params *cfgs = get_global_cfg_params();
|
||
-
|
||
- for(int i = 1; i < cfgs->num_process; i++){
|
||
- char arp_mbuf[LSTACK_MBUF_LEN] = {0};
|
||
- sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
|
||
- int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
|
||
- if (result == CONNECT_ERROR) {
|
||
- LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i);
|
||
- } else if (result == REPLY_ERROR) {
|
||
- LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i);
|
||
- }
|
||
- }
|
||
-}
|
||
-
|
||
-void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx)
|
||
-{
|
||
- /* current process queue_id */
|
||
- struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx];
|
||
- int ret = -1;
|
||
- while(ret != 0) {
|
||
- ret = rpc_call_arp(&stack->rpc_queue, mbuf);
|
||
- printf("transfer_tcp_to_thread, ret : %d \n", ret);
|
||
- }
|
||
-}
|
||
-
|
||
-void parse_arp_and_transefer(char* buf)
|
||
-{
|
||
- struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf);
|
||
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
||
- struct rte_mbuf *mbuf_copy = NULL;
|
||
- struct protocol_stack *stack = NULL;
|
||
- int32_t ret;
|
||
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
|
||
- stack = stack_group->stacks[i];
|
||
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
|
||
- while (ret != 0) {
|
||
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
|
||
- stack->stats.rx_allocmbuf_fail++;
|
||
- }
|
||
- copy_mbuf(mbuf_copy, mbuf);
|
||
-
|
||
- ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
|
||
-
|
||
- while (ret != 0) {
|
||
- rpc_call_arp(&stack->rpc_queue, mbuf_copy);
|
||
- }
|
||
- }
|
||
-}
|
||
-
|
||
-void parse_tcp_and_transefer(char* buf)
|
||
-{
|
||
- char *next_token = NULL;
|
||
- char *elem = strtok_s(buf, SPLIT_DELIM, &next_token);
|
||
- struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem);
|
||
- elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
|
||
- uint16_t queue_id = atoll(elem);
|
||
-
|
||
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
||
- uint16_t num_queue = get_global_cfg_params()->num_queue;
|
||
- uint16_t stk_index = queue_id % num_queue;
|
||
- struct rte_mbuf *mbuf_copy = NULL;
|
||
- struct protocol_stack *stack = stack_group->stacks[stk_index];
|
||
-
|
||
- int32_t ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
|
||
- while (ret != 0) {
|
||
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
|
||
- stack->stats.rx_allocmbuf_fail++;
|
||
- }
|
||
-
|
||
- copy_mbuf(mbuf_copy,mbuf);
|
||
-
|
||
- transfer_tcp_to_thread(mbuf_copy, stk_index);
|
||
-}
|
||
-
|
||
-int recv_pkts_from_other_process(int process_index, void* arg)
|
||
-{
|
||
- struct sockaddr_un serun, cliun;
|
||
- socklen_t cliun_len;
|
||
- int listenfd, connfd, size;
|
||
- char buf[132];
|
||
- /* socket */
|
||
- if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
|
||
- perror("socket error");
|
||
- return -1;
|
||
- }
|
||
- /* bind */
|
||
- memset_s(&serun, sizeof(serun), 0, sizeof(serun));
|
||
- serun.sun_family = AF_UNIX;
|
||
- char process_server_path[PATH_MAX];
|
||
- sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", SERVER_PATH, process_index);
|
||
- strcpy_s(serun.sun_path, sizeof(serun.sun_path), process_server_path);
|
||
- size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
|
||
- unlink(process_server_path);
|
||
- if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) {
|
||
- perror("bind error");
|
||
- return -1;
|
||
- }
|
||
- if (posix_api->listen_fn(listenfd, 20) < 0) { /* 20: max backlog */
|
||
- perror("listen error");
|
||
- return -1;
|
||
- }
|
||
- sem_post((sem_t *)arg);
|
||
- /* block */
|
||
- while(1) {
|
||
- cliun_len = sizeof(cliun);
|
||
- if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0) {
|
||
- perror("accept error");
|
||
- continue;
|
||
- }
|
||
- while(1) {
|
||
- int n = posix_api->read_fn(connfd, buf, sizeof(buf));
|
||
- if (n < 0) {
|
||
- perror("read error");
|
||
- break;
|
||
- } else if (n == 0) {
|
||
- break;
|
||
- }
|
||
-
|
||
- if(n == LSTACK_MBUF_LEN) {
|
||
- /* arp */
|
||
- parse_arp_and_transefer(buf);
|
||
- } else if (n == TRANSFER_TCP_MUBF_LEN) {
|
||
- /* tcp. lstack_mbuf_queue_id */
|
||
- parse_tcp_and_transefer(buf);
|
||
- } else if (n == DELETE_FLOWS_PARAMS_LENGTH) {
|
||
- /* delete rule */
|
||
- parse_and_delete_rule(buf);
|
||
- } else if(n == CREATE_FLOWS_PARAMS_LENGTH) {
|
||
- /* add rule */
|
||
- parse_and_create_rule(buf);
|
||
- char reply_buf[REPLY_LEN];
|
||
- sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
|
||
- posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
|
||
- } else if (n == GET_LSTACK_NUM) {
|
||
- char reply_buf[REPLY_LEN];
|
||
- sprintf_s(reply_buf, sizeof(reply_buf), "%d", get_global_cfg_params()->num_cpu);
|
||
- posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
|
||
- } else {
|
||
- /* add port */
|
||
- parse_and_add_or_delete_listen_port(buf);
|
||
- char reply_buf[REPLY_LEN];
|
||
- sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
|
||
- posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
|
||
- }
|
||
-
|
||
- }
|
||
- posix_api->close_fn(connfd);
|
||
- }
|
||
- posix_api->close_fn(listenfd);
|
||
- return 0;
|
||
-}
|
||
-
|
||
-void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id,
|
||
- char* mbuf_and_queue_id, int write_len)
|
||
-{
|
||
- sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf, SPLIT_DELIM, queue_id);
|
||
-}
|
||
-
|
||
-static int mbuf_to_idx(struct rte_mbuf *mbuf, uint16_t *dst_port)
|
||
-{
|
||
- struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
|
||
- u16_t type = rte_be_to_cpu_16(ethh->ether_type);
|
||
- uint32_t index = 0;
|
||
- if (type == RTE_ETHER_TYPE_IPV4) {
|
||
- struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
|
||
- uint8_t ip_version = (iph->version_ihl & 0xf0) >> IPV4_VERSION_OFFSET;
|
||
- if (likely(ip_version == IPV4_VERSION)) {
|
||
- if (likely(iph->next_proto_id == IPPROTO_TCP)) {
|
||
- struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
|
||
- sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
|
||
- *dst_port = tcp_hdr->dst_port;
|
||
-
|
||
- if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
|
||
- uint32_t src_ip = iph->src_addr;
|
||
- uint16_t src_port = tcp_hdr->src_port;
|
||
- index = rte_jhash_3words(src_ip, src_port | ((*dst_port) << 16), 0, 0);
|
||
- } else {
|
||
- return -1;
|
||
- }
|
||
- }
|
||
- }
|
||
- } else if (type == RTE_ETHER_TYPE_IPV6) {
|
||
- struct rte_ipv6_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv6_hdr *, sizeof(struct rte_ether_hdr));
|
||
- if (likely(iph->proto == IPPROTO_TCP)) {
|
||
- struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
|
||
- sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv6_hdr));
|
||
- *dst_port = tcp_hdr->dst_port;
|
||
-
|
||
- if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
|
||
- uint32_t *src_ip = (uint32_t *) &iph->src_addr;
|
||
- uint16_t src_port = tcp_hdr->src_port;
|
||
- uint32_t v = rte_jhash_3words(src_ip[0], src_ip[1], src_ip[2], 0);
|
||
- index = rte_jhash_3words(src_ip[3], src_port | ((*dst_port) << 16), v, 0);
|
||
- } else {
|
||
- return -1;
|
||
- }
|
||
- }
|
||
- } else {
|
||
- return -1;
|
||
- }
|
||
- return index;
|
||
-}
|
||
-
|
||
-int distribute_pakages(struct rte_mbuf *mbuf)
|
||
-{
|
||
- uint16_t dst_port = 0;
|
||
- uint32_t index = mbuf_to_idx(mbuf, &dst_port);
|
||
- if (index == -1) {
|
||
- return TRANSFER_CURRENT_THREAD;
|
||
- }
|
||
-
|
||
- uint16_t queue_id = 0;
|
||
- uint32_t user_process_idx = 0;
|
||
- int each_process_queue_num = get_global_cfg_params()->num_queue;
|
||
- index = index % each_process_queue_num;
|
||
- if (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) {
|
||
- user_process_idx = g_listen_ports[dst_port];
|
||
- } else {
|
||
- user_process_idx = g_user_ports[dst_port];
|
||
- }
|
||
-
|
||
- if (user_process_idx == INVAILD_PROCESS_IDX) {
|
||
- return TRANSFER_KERNEL;
|
||
- }
|
||
-
|
||
- if (get_global_cfg_params()->seperate_send_recv) {
|
||
- queue_id = user_process_idx * each_process_queue_num + (index / 2) * 2;
|
||
- } else {
|
||
- queue_id = user_process_idx * each_process_queue_num + index;
|
||
- }
|
||
- if (queue_id != 0) {
|
||
- if (user_process_idx == 0) {
|
||
- transfer_tcp_to_thread(mbuf, queue_id);
|
||
- } else {
|
||
- char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN];
|
||
- concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN);
|
||
- transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx,
|
||
- TRANSFER_TCP_MUBF_LEN, false);
|
||
- }
|
||
- return TRANSFER_OTHER_THREAD;
|
||
- } else {
|
||
- return TRANSFER_CURRENT_THREAD;
|
||
- }
|
||
-
|
||
- return TRANSFER_KERNEL;
|
||
-}
|
||
-
|
||
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
|
||
void kni_handle_rx(uint16_t port_id)
|
||
{
|
||
@@ -797,17 +129,18 @@ void kni_handle_tx(struct rte_mbuf *mbuf)
|
||
}
|
||
#endif
|
||
|
||
-/* optimized eth_dev_poll() in lstack */
|
||
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number)
|
||
+int32_t eth_dev_poll(void)
|
||
{
|
||
uint32_t nr_pkts;
|
||
+ struct cfg_params *cfg = get_global_cfg_params();
|
||
+ struct protocol_stack *stack = get_protocol_stack();
|
||
|
||
- nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, nic_read_number);
|
||
+ nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, cfg->nic_read_number);
|
||
if (nr_pkts == 0) {
|
||
return 0;
|
||
}
|
||
|
||
- if (!use_ltran_flag && get_protocol_stack_group()->latency_start) {
|
||
+ if (!use_ltran() && get_protocol_stack_group()->latency_start) {
|
||
uint64_t time_stamp = get_current_time();
|
||
time_stamp_into_mbuf(nr_pkts, stack->pkts, time_stamp);
|
||
}
|
||
@@ -816,7 +149,7 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
|
||
/* 1 current thread recv; 0 other thread recv; -1 kni recv; */
|
||
int transfer_type = TRANSFER_CURRENT_THREAD;
|
||
/* copy arp into other stack */
|
||
- if (!use_ltran_flag) {
|
||
+ if (!use_ltran()) {
|
||
struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *);
|
||
u16_t type;
|
||
type = ethh->ether_type;
|
||
@@ -946,6 +279,10 @@ int32_t ethdev_init(struct protocol_stack *stack)
|
||
LSTACK_LOG(ERR, LSTACK, "fill mbuf to rx_ring failed ret=%d\n", ret);
|
||
return ret;
|
||
}
|
||
+ } else {
|
||
+ if (cfg->tuple_filter && stack->queue_id == 0) {
|
||
+ flow_init();
|
||
+ }
|
||
}
|
||
|
||
netif_set_default(&stack->netif);
|
||
diff --git a/src/lstack/netif/lstack_flow.c b/src/lstack/netif/lstack_flow.c
|
||
new file mode 100644
|
||
index 0000000..4e04209
|
||
--- /dev/null
|
||
+++ b/src/lstack/netif/lstack_flow.c
|
||
@@ -0,0 +1,680 @@
|
||
+/*
|
||
+* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
|
||
+* gazelle is licensed under the Mulan PSL v2.
|
||
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
+* You may obtain a copy of Mulan PSL v2 at:
|
||
+* http://license.coscl.org.cn/MulanPSL2
|
||
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
+* PURPOSE.
|
||
+* See the Mulan PSL v2 for more details.
|
||
+*/
|
||
+#include <sys/socket.h>
|
||
+#include <sys/un.h>
|
||
+#include <securec.h>
|
||
+
|
||
+#include <rte_mbuf.h>
|
||
+#include <rte_flow.h>
|
||
+#include <rte_jhash.h>
|
||
+#include <uthash.h>
|
||
+
|
||
+#include <lwip/posix_api.h>
|
||
+#include <lwip/tcp.h>
|
||
+#include <lwip/prot/tcp.h>
|
||
+
|
||
+#include "dpdk_common.h"
|
||
+#include "lstack_log.h"
|
||
+#include "lstack_dpdk.h"
|
||
+#include "lstack_cfg.h"
|
||
+#include "lstack_protocol_stack.h"
|
||
+#include "lstack_flow.h"
|
||
+
|
||
+#define MAX_PATTERN_NUM 4
|
||
+#define MAX_ACTION_NUM 2
|
||
+#define FULL_MASK 0xffffffff /* full mask */
|
||
+#define EMPTY_MASK 0x0 /* empty mask */
|
||
+#define LSTACK_MBUF_LEN 64
|
||
+#define TRANSFER_TCP_MUBF_LEN (LSTACK_MBUF_LEN + 3)
|
||
+#define DELETE_FLOWS_PARAMS_NUM 3
|
||
+#define DELETE_FLOWS_PARAMS_LENGTH 30
|
||
+#define CREATE_FLOWS_PARAMS_NUM 6
|
||
+#define CREATE_FLOWS_PARAMS_LENGTH 60
|
||
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25
|
||
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3
|
||
+#define REPLY_LEN 10
|
||
+#define SUCCESS_REPLY "success"
|
||
+#define ERROR_REPLY "error"
|
||
+
|
||
+#define GET_LSTACK_NUM 14
|
||
+#define GET_LSTACK_NUM_STRING "get_lstack_num"
|
||
+
|
||
+#define SERVER_PATH "/var/run/gazelle/server.socket"
|
||
+#define SPLIT_DELIM ","
|
||
+
|
||
+#define UNIX_TCP_PORT_MAX 65535
|
||
+
|
||
+#define INVAILD_PROCESS_IDX 255
|
||
+
|
||
+#define IPV4_VERSION_OFFSET 4
|
||
+#define IPV4_VERSION 4
|
||
+
|
||
+static uint8_t g_user_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
|
||
+static uint8_t g_listen_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
|
||
+
|
||
+/* flow rule map */
|
||
+#define RULE_KEY_LEN 23
|
||
+struct flow_rule {
|
||
+ char rule_key[RULE_KEY_LEN];
|
||
+ struct rte_flow *flow;
|
||
+ UT_hash_handle hh;
|
||
+};
|
||
+
|
||
+static uint16_t g_flow_num = 0;
|
||
+static struct flow_rule *g_flow_rules = NULL;
|
||
+static struct flow_rule *find_rule(char *rule_key)
|
||
+{
|
||
+ struct flow_rule *fl;
|
||
+ HASH_FIND_STR(g_flow_rules, rule_key, fl);
|
||
+ return fl;
|
||
+}
|
||
+
|
||
+static void add_rule(char* rule_key, struct rte_flow *flow)
|
||
+{
|
||
+ struct flow_rule *rule;
|
||
+ HASH_FIND_STR(g_flow_rules, rule_key, rule);
|
||
+ if (rule == NULL) {
|
||
+ rule = (struct flow_rule*)malloc(sizeof(struct flow_rule));
|
||
+ strcpy_s(rule->rule_key, RULE_KEY_LEN, rule_key);
|
||
+ HASH_ADD_STR(g_flow_rules, rule_key, rule);
|
||
+ }
|
||
+ rule->flow = flow;
|
||
+}
|
||
+
|
||
+static void delete_rule(char* rule_key)
|
||
+{
|
||
+ struct flow_rule *rule = NULL;
|
||
+ HASH_FIND_STR(g_flow_rules, rule_key, rule);
|
||
+ if (rule != NULL) {
|
||
+ HASH_DEL(g_flow_rules, rule);
|
||
+ free(rule);
|
||
+ }
|
||
+}
|
||
+
|
||
+static void init_listen_and_user_ports(void)
|
||
+{
|
||
+ memset_s(g_user_ports, sizeof(g_user_ports), INVAILD_PROCESS_IDX, sizeof(g_user_ports));
|
||
+ memset_s(g_listen_ports, sizeof(g_listen_ports), INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
|
||
+}
|
||
+
|
||
+static int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply)
|
||
+{
|
||
+ /* other process queue_id */
|
||
+ struct sockaddr_un serun;
|
||
+ int sockfd;
|
||
+ int ret = 0;
|
||
+
|
||
+ sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0);
|
||
+ memset_s(&serun, sizeof(serun), 0, sizeof(serun));
|
||
+ serun.sun_family = AF_UNIX;
|
||
+ sprintf_s(serun.sun_path, PATH_MAX, "%s%d", SERVER_PATH, process_index);
|
||
+ int32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
|
||
+ if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0) {
|
||
+ return CONNECT_ERROR;
|
||
+ }
|
||
+ posix_api->write_fn(sockfd, buf, write_len);
|
||
+ if (need_reply) {
|
||
+ char reply_message[REPLY_LEN];
|
||
+ int32_t read_result = posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
|
||
+ if (read_result > 0) {
|
||
+ if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
|
||
+ ret = TRANSFER_SUCESS;
|
||
+ } else if (strcmp(reply_message, ERROR_REPLY) == 0) {
|
||
+ ret = REPLY_ERROR;
|
||
+ } else {
|
||
+ ret = atoi(reply_message);
|
||
+ }
|
||
+ } else {
|
||
+ ret = REPLY_ERROR;
|
||
+ }
|
||
+ }
|
||
+ posix_api->close_fn(sockfd);
|
||
+
|
||
+ return ret;
|
||
+}
|
||
+
|
||
+int32_t check_params_from_primary(void)
|
||
+{
|
||
+ struct cfg_params *cfg = get_global_cfg_params();
|
||
+ if (cfg->is_primary) {
|
||
+ return 0;
|
||
+ }
|
||
+ // check lstack num
|
||
+ char get_lstack_num[GET_LSTACK_NUM];
|
||
+ sprintf_s(get_lstack_num, GET_LSTACK_NUM, "%s", GET_LSTACK_NUM_STRING);
|
||
+ int32_t ret = transfer_pkt_to_other_process(get_lstack_num, 0, GET_LSTACK_NUM, true);
|
||
+ if (ret != cfg->num_cpu) {
|
||
+ return -1;
|
||
+ }
|
||
+ return 0;
|
||
+}
|
||
+
|
||
+static struct rte_flow *create_flow_director(uint16_t port_id, uint16_t queue_id,
|
||
+ uint32_t src_ip, uint32_t dst_ip,
|
||
+ uint16_t src_port, uint16_t dst_port,
|
||
+ struct rte_flow_error *error)
|
||
+{
|
||
+ struct rte_flow_attr attr;
|
||
+ struct rte_flow_item pattern[MAX_PATTERN_NUM];
|
||
+ struct rte_flow_action action[MAX_ACTION_NUM];
|
||
+ struct rte_flow *flow = NULL;
|
||
+ struct rte_flow_action_queue queue = { .index = queue_id };
|
||
+ struct rte_flow_item_ipv4 ip_spec;
|
||
+ struct rte_flow_item_ipv4 ip_mask;
|
||
+
|
||
+ struct rte_flow_item_tcp tcp_spec;
|
||
+ struct rte_flow_item_tcp tcp_mask;
|
||
+ int res;
|
||
+
|
||
+ memset_s(pattern, sizeof(pattern), 0, sizeof(pattern));
|
||
+ memset_s(action, sizeof(action), 0, sizeof(action));
|
||
+
|
||
+ /*
|
||
+ * set the rule attribute.
|
||
+ * in this case only ingress packets will be checked.
|
||
+ */
|
||
+ memset_s(&attr, sizeof(struct rte_flow_attr), 0, sizeof(struct rte_flow_attr));
|
||
+ attr.ingress = 1;
|
||
+
|
||
+ /*
|
||
+ * create the action sequence.
|
||
+ * one action only, move packet to queue
|
||
+ */
|
||
+ action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
|
||
+ action[0].conf = &queue;
|
||
+ action[1].type = RTE_FLOW_ACTION_TYPE_END;
|
||
+
|
||
+ // not limit eth header
|
||
+ pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
|
||
+
|
||
+ // ip header
|
||
+ memset_s(&ip_spec, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
|
||
+ memset_s(&ip_mask, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
|
||
+ ip_spec.hdr.dst_addr = dst_ip;
|
||
+ ip_mask.hdr.dst_addr = FULL_MASK;
|
||
+ ip_spec.hdr.src_addr = src_ip;
|
||
+ ip_mask.hdr.src_addr = FULL_MASK;
|
||
+ pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
|
||
+ pattern[1].spec = &ip_spec;
|
||
+ pattern[1].mask = &ip_mask;
|
||
+
|
||
+ // tcp header, full mask 0xffff
|
||
+ memset_s(&tcp_spec, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
|
||
+ memset_s(&tcp_mask, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
|
||
+ pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP; // 2: pattern 2 is tcp header
|
||
+ tcp_spec.hdr.src_port = src_port;
|
||
+ tcp_spec.hdr.dst_port = dst_port;
|
||
+ tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port;
|
||
+ tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port;
|
||
+ pattern[2].spec = &tcp_spec;
|
||
+ pattern[2].mask = &tcp_mask;
|
||
+
|
||
+ /* the final level must be always type end */
|
||
+ pattern[3].type = RTE_FLOW_ITEM_TYPE_END;
|
||
+ res = rte_flow_validate(port_id, &attr, pattern, action, error);
|
||
+ if (!res) {
|
||
+ flow = rte_flow_create(port_id, &attr, pattern, action, error);
|
||
+ } else {
|
||
+ LSTACK_LOG(ERR, LSTACK, "rte_flow_create.rte_flow_validate error, res %d \n", res);
|
||
+ }
|
||
+
|
||
+ return flow;
|
||
+}
|
||
+
|
||
+static void config_flow_director(uint16_t queue_id, uint32_t src_ip,
|
||
+ uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||
+{
|
||
+ uint16_t port_id = get_protocol_stack_group()->port_id;
|
||
+ char rule_key[RULE_KEY_LEN] = {0};
|
||
+ sprintf_s(rule_key, sizeof(rule_key), "%u_%u_%u", src_ip, src_port, dst_port);
|
||
+ struct flow_rule *fl_exist = find_rule(rule_key);
|
||
+ if (fl_exist != NULL) {
|
||
+ return;
|
||
+ }
|
||
+
|
||
+ LSTACK_LOG(INFO, LSTACK,
|
||
+ "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs:%u\n",
|
||
+ queue_id, src_ip, ntohs(src_port), ntohs(dst_port));
|
||
+
|
||
+ struct rte_flow_error error;
|
||
+ struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error);
|
||
+ if (!flow) {
|
||
+ LSTACK_LOG(ERR, LSTACK,"flow can not be created. queue_id %u, src_ip %u, src_port %u,"
|
||
+ "dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n",
|
||
+ queue_id, src_ip, src_port, dst_port, ntohs(dst_port),
|
||
+ error.type, error.message ? error.message : "(no stated reason)");
|
||
+ return;
|
||
+ }
|
||
+ __sync_fetch_and_add(&g_flow_num, 1);
|
||
+ add_rule(rule_key, flow);
|
||
+}
|
||
+
|
||
+static void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||
+{
|
||
+ uint16_t port_id = get_protocol_stack_group()->port_id;
|
||
+ char rule_key[RULE_KEY_LEN] = {0};
|
||
+ sprintf_s(rule_key, RULE_KEY_LEN, "%u_%u_%u",dst_ip, dst_port, src_port);
|
||
+ struct flow_rule *fl = find_rule(rule_key);
|
||
+
|
||
+ if(fl != NULL) {
|
||
+ struct rte_flow_error error;
|
||
+ int ret = rte_flow_destroy(port_id, fl->flow, &error);
|
||
+ if (ret != 0) {
|
||
+ LSTACK_LOG(ERR, LSTACK, "Flow can't be delete %d message: %s\n",
|
||
+ error.type, error.message ? error.message : "(no stated reason)");
|
||
+ }
|
||
+ delete_rule(rule_key);
|
||
+ __sync_fetch_and_sub(&g_flow_num, 1);
|
||
+ }
|
||
+}
|
||
+
|
||
+/* if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0. */
|
||
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||
+{
|
||
+ if (get_global_cfg_params()->is_primary) {
|
||
+ delete_flow_director(dst_ip, src_port, dst_port);
|
||
+ } else {
|
||
+ char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
|
||
+ sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u",
|
||
+ dst_ip, SPLIT_DELIM, src_port, SPLIT_DELIM, dst_port);
|
||
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
|
||
+ if (ret != TRANSFER_SUCESS) {
|
||
+ LSTACK_LOG(ERR, LSTACK, "error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
|
||
+ rte_gettid(), dst_ip, src_port, dst_port);
|
||
+ }
|
||
+ }
|
||
+}
|
||
+
|
||
+// if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0.
|
||
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
|
||
+ uint32_t dst_ip, uint16_t src_port,
|
||
+ uint16_t dst_port)
|
||
+{
|
||
+ char process_server_path[CREATE_FLOWS_PARAMS_LENGTH];
|
||
+ /* exchage src_ip and dst_ip, src_port and dst_port */
|
||
+ uint8_t process_idx = get_global_cfg_params()->process_idx;
|
||
+ sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
|
||
+ dst_ip, SPLIT_DELIM, src_ip, SPLIT_DELIM,
|
||
+ dst_port, SPLIT_DELIM, src_port, SPLIT_DELIM,
|
||
+ queue_id, SPLIT_DELIM, process_idx);
|
||
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
|
||
+ if (ret != TRANSFER_SUCESS) {
|
||
+ LSTACK_LOG(ERR, LSTACK, "error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u,"
|
||
+ "queue_id %u, process_idx %u\n",
|
||
+ rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
|
||
+ }
|
||
+}
|
||
+
|
||
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add)
|
||
+{
|
||
+ char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
|
||
+ sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH,
|
||
+ "%u%s%u%s%u", listen_port, SPLIT_DELIM, process_idx, SPLIT_DELIM, is_add);
|
||
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
|
||
+ if (ret != TRANSFER_SUCESS) {
|
||
+ LSTACK_LOG(ERR, LSTACK, "error. tid %d. listen_port %u, process_idx %u\n",
|
||
+ rte_gettid(), listen_port, process_idx);
|
||
+ }
|
||
+}
|
||
+
|
||
+static int str_to_array(char *args, uint32_t *array, int size)
|
||
+{
|
||
+ int val;
|
||
+ uint16_t cnt = 0;
|
||
+ char *elem = NULL;
|
||
+ char *next_token = NULL;
|
||
+
|
||
+ memset_s(array, sizeof(*array) * size, 0, sizeof(*array) * size);
|
||
+ elem = strtok_s((char *)args, SPLIT_DELIM, &next_token);
|
||
+ while (elem != NULL) {
|
||
+ if (cnt >= size) {
|
||
+ return -1;
|
||
+ }
|
||
+ val = atoi(elem);
|
||
+ if (val < 0) {
|
||
+ return -1;
|
||
+ }
|
||
+ array[cnt] = (uint32_t)val;
|
||
+ cnt++;
|
||
+
|
||
+ elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
|
||
+ }
|
||
+
|
||
+ return cnt;
|
||
+}
|
||
+
|
||
+static void parse_and_delete_rule(char* buf)
|
||
+{
|
||
+ uint32_t array[DELETE_FLOWS_PARAMS_NUM];
|
||
+ str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM);
|
||
+ uint32_t dst_ip = array[0];
|
||
+ uint16_t src_port = array[1];
|
||
+ uint16_t dst_port = array[2];
|
||
+ delete_flow_director(dst_ip, src_port, dst_port);
|
||
+}
|
||
+
|
||
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type)
|
||
+{
|
||
+ if (type == PORT_LISTEN) {
|
||
+ g_listen_ports[dst_port] = process_idx;
|
||
+ } else {
|
||
+ g_user_ports[dst_port] = process_idx;
|
||
+ }
|
||
+}
|
||
+
|
||
+void delete_user_process_port(uint16_t dst_port, enum port_type type)
|
||
+{
|
||
+ if (type == PORT_LISTEN) {
|
||
+ g_listen_ports[dst_port] = INVAILD_PROCESS_IDX;
|
||
+ } else {
|
||
+ g_user_ports[dst_port] = INVAILD_PROCESS_IDX;
|
||
+ }
|
||
+}
|
||
+
|
||
+static void parse_and_create_rule(char* buf)
|
||
+{
|
||
+ uint32_t array[CREATE_FLOWS_PARAMS_NUM];
|
||
+ str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM);
|
||
+ uint32_t src_ip = array[0];
|
||
+ uint32_t dst_ip = array[1];
|
||
+ uint16_t src_port = array[2];
|
||
+ uint16_t dst_port = array[3];
|
||
+ uint16_t queue_id = array[4];
|
||
+ uint8_t process_idx = array[5];
|
||
+ config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port);
|
||
+ add_user_process_port(dst_port, process_idx, PORT_CONNECT);
|
||
+}
|
||
+
|
||
+static void parse_and_add_or_delete_listen_port(char* buf)
|
||
+{
|
||
+ uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM];
|
||
+ str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM);
|
||
+ uint16_t listen_port = array[0];
|
||
+ uint8_t process_idx = array[1];
|
||
+ uint8_t is_add = array[2];
|
||
+ if (is_add == 1) {
|
||
+ add_user_process_port(listen_port, process_idx, PORT_LISTEN);
|
||
+ } else {
|
||
+ delete_user_process_port(listen_port, PORT_LISTEN);
|
||
+ }
|
||
+}
|
||
+
|
||
+void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
|
||
+{
|
||
+ struct cfg_params *cfgs = get_global_cfg_params();
|
||
+
|
||
+ for (int i = 1; i < cfgs->num_process; i++) {
|
||
+ char arp_mbuf[LSTACK_MBUF_LEN] = {0};
|
||
+ sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
|
||
+ int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
|
||
+ if (result == CONNECT_ERROR) {
|
||
+ LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i);
|
||
+ } else if (result == REPLY_ERROR) {
|
||
+ LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i);
|
||
+ }
|
||
+ }
|
||
+}
|
||
+
|
||
+static void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx)
|
||
+{
|
||
+ /* current process queue_id */
|
||
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx];
|
||
+ int ret = -1;
|
||
+ while (ret != 0) {
|
||
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf);
|
||
+ printf("transfer_tcp_to_thread, ret : %d \n", ret);
|
||
+ }
|
||
+}
|
||
+
|
||
+static void parse_arp_and_transefer(char* buf)
|
||
+{
|
||
+ struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf);
|
||
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
||
+ struct rte_mbuf *mbuf_copy = NULL;
|
||
+ struct protocol_stack *stack = NULL;
|
||
+ int32_t ret;
|
||
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
|
||
+ stack = stack_group->stacks[i];
|
||
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
|
||
+ while (ret != 0) {
|
||
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
|
||
+ stack->stats.rx_allocmbuf_fail++;
|
||
+ }
|
||
+ copy_mbuf(mbuf_copy, mbuf);
|
||
+
|
||
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
|
||
+
|
||
+ while (ret != 0) {
|
||
+ rpc_call_arp(&stack->rpc_queue, mbuf_copy);
|
||
+ }
|
||
+ }
|
||
+}
|
||
+
|
||
+static void parse_tcp_and_transefer(char* buf)
|
||
+{
|
||
+ char *next_token = NULL;
|
||
+ char *elem = strtok_s(buf, SPLIT_DELIM, &next_token);
|
||
+ struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem);
|
||
+ elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
|
||
+ uint16_t queue_id = atoll(elem);
|
||
+
|
||
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
||
+ uint16_t num_queue = get_global_cfg_params()->num_queue;
|
||
+ uint16_t stk_index = queue_id % num_queue;
|
||
+ struct rte_mbuf *mbuf_copy = NULL;
|
||
+ struct protocol_stack *stack = stack_group->stacks[stk_index];
|
||
+
|
||
+ int32_t ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
|
||
+ while (ret != 0) {
|
||
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
|
||
+ stack->stats.rx_allocmbuf_fail++;
|
||
+ }
|
||
+
|
||
+ copy_mbuf(mbuf_copy,mbuf);
|
||
+ transfer_tcp_to_thread(mbuf_copy, stk_index);
|
||
+}
|
||
+
|
||
+int recv_pkts_from_other_process(int process_index, void* arg)
|
||
+{
|
||
+ struct sockaddr_un serun, cliun;
|
||
+ socklen_t cliun_len;
|
||
+ int listenfd, connfd, size;
|
||
+ char buf[132];
|
||
+ /* socket */
|
||
+ if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
|
||
+ perror("socket error");
|
||
+ return -1;
|
||
+ }
|
||
+ /* bind */
|
||
+ memset_s(&serun, sizeof(serun), 0, sizeof(serun));
|
||
+ serun.sun_family = AF_UNIX;
|
||
+ char process_server_path[PATH_MAX];
|
||
+ sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", SERVER_PATH, process_index);
|
||
+ strcpy_s(serun.sun_path, sizeof(serun.sun_path), process_server_path);
|
||
+ size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
|
||
+ unlink(process_server_path);
|
||
+ if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) {
|
||
+ perror("bind error");
|
||
+ return -1;
|
||
+ }
|
||
+ if (posix_api->listen_fn(listenfd, 20) < 0) { /* 20: max backlog */
|
||
+ perror("listen error");
|
||
+ return -1;
|
||
+ }
|
||
+ sem_post((sem_t *)arg);
|
||
+ /* block */
|
||
+ while (1) {
|
||
+ cliun_len = sizeof(cliun);
|
||
+ if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0) {
|
||
+ perror("accept error");
|
||
+ continue;
|
||
+ }
|
||
+ while (1) {
|
||
+ int n = posix_api->read_fn(connfd, buf, sizeof(buf));
|
||
+ if (n < 0) {
|
||
+ perror("read error");
|
||
+ break;
|
||
+ } else if (n == 0) {
|
||
+ break;
|
||
+ }
|
||
+
|
||
+ if (n == LSTACK_MBUF_LEN) {
|
||
+ /* arp */
|
||
+ parse_arp_and_transefer(buf);
|
||
+ } else if (n == TRANSFER_TCP_MUBF_LEN) {
|
||
+ /* tcp. lstack_mbuf_queue_id */
|
||
+ parse_tcp_and_transefer(buf);
|
||
+ } else if (n == DELETE_FLOWS_PARAMS_LENGTH) {
|
||
+ /* delete rule */
|
||
+ parse_and_delete_rule(buf);
|
||
+ } else if (n == CREATE_FLOWS_PARAMS_LENGTH) {
|
||
+ /* add rule */
|
||
+ parse_and_create_rule(buf);
|
||
+ char reply_buf[REPLY_LEN];
|
||
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
|
||
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
|
||
+ } else if (n == GET_LSTACK_NUM) {
|
||
+ char reply_buf[REPLY_LEN];
|
||
+ sprintf_s(reply_buf, sizeof(reply_buf), "%d", get_global_cfg_params()->num_cpu);
|
||
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
|
||
+ } else {
|
||
+ /* add port */
|
||
+ parse_and_add_or_delete_listen_port(buf);
|
||
+ char reply_buf[REPLY_LEN];
|
||
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
|
||
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
|
||
+ }
|
||
+
|
||
+ }
|
||
+ posix_api->close_fn(connfd);
|
||
+ }
|
||
+ posix_api->close_fn(listenfd);
|
||
+ return 0;
|
||
+}
|
||
+
|
||
+void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id,
|
||
+ char* mbuf_and_queue_id, int write_len)
|
||
+{
|
||
+ sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf, SPLIT_DELIM, queue_id);
|
||
+}
|
||
+
|
||
+static int mbuf_to_idx(struct rte_mbuf *mbuf, uint16_t *dst_port)
|
||
+{
|
||
+ struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
|
||
+ u16_t type = rte_be_to_cpu_16(ethh->ether_type);
|
||
+ uint32_t index = 0;
|
||
+ if (type == RTE_ETHER_TYPE_IPV4) {
|
||
+ struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
|
||
+ uint8_t ip_version = (iph->version_ihl & 0xf0) >> IPV4_VERSION_OFFSET;
|
||
+ if (likely(ip_version == IPV4_VERSION)) {
|
||
+ if (likely(iph->next_proto_id == IPPROTO_TCP)) {
|
||
+ struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
|
||
+ sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
|
||
+ *dst_port = tcp_hdr->dst_port;
|
||
+
|
||
+ if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
|
||
+ uint32_t src_ip = iph->src_addr;
|
||
+ uint16_t src_port = tcp_hdr->src_port;
|
||
+ index = rte_jhash_3words(src_ip, src_port | ((*dst_port) << 16), 0, 0);
|
||
+ } else {
|
||
+ return -1;
|
||
+ }
|
||
+ }
|
||
+ }
|
||
+ } else if (type == RTE_ETHER_TYPE_IPV6) {
|
||
+ struct rte_ipv6_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv6_hdr *, sizeof(struct rte_ether_hdr));
|
||
+ if (likely(iph->proto == IPPROTO_TCP)) {
|
||
+ struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
|
||
+ sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv6_hdr));
|
||
+ *dst_port = tcp_hdr->dst_port;
|
||
+
|
||
+ if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
|
||
+ uint32_t *src_ip = (uint32_t *) &iph->src_addr;
|
||
+ uint16_t src_port = tcp_hdr->src_port;
|
||
+ uint32_t v = rte_jhash_3words(src_ip[0], src_ip[1], src_ip[2], 0);
|
||
+ index = rte_jhash_3words(src_ip[3], src_port | ((*dst_port) << 16), v, 0);
|
||
+ } else {
|
||
+ return -1;
|
||
+ }
|
||
+ }
|
||
+ } else {
|
||
+ return -1;
|
||
+ }
|
||
+ return index;
|
||
+}
|
||
+
|
||
+int distribute_pakages(struct rte_mbuf *mbuf)
|
||
+{
|
||
+ uint16_t dst_port = 0;
|
||
+ uint32_t index = mbuf_to_idx(mbuf, &dst_port);
|
||
+ if (index == -1) {
|
||
+ return TRANSFER_CURRENT_THREAD;
|
||
+ }
|
||
+
|
||
+ uint16_t queue_id = 0;
|
||
+ uint32_t user_process_idx = 0;
|
||
+ int each_process_queue_num = get_global_cfg_params()->num_queue;
|
||
+ index = index % each_process_queue_num;
|
||
+ if (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) {
|
||
+ user_process_idx = g_listen_ports[dst_port];
|
||
+ } else {
|
||
+ user_process_idx = g_user_ports[dst_port];
|
||
+ }
|
||
+
|
||
+ if (user_process_idx == INVAILD_PROCESS_IDX) {
|
||
+ return TRANSFER_KERNEL;
|
||
+ }
|
||
+
|
||
+ if (get_global_cfg_params()->seperate_send_recv) {
|
||
+ queue_id = user_process_idx * each_process_queue_num + (index / 2) * 2;
|
||
+ } else {
|
||
+ queue_id = user_process_idx * each_process_queue_num + index;
|
||
+ }
|
||
+ if (queue_id != 0) {
|
||
+ if (user_process_idx == 0) {
|
||
+ transfer_tcp_to_thread(mbuf, queue_id);
|
||
+ } else {
|
||
+ char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN];
|
||
+ concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN);
|
||
+ transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx,
|
||
+ TRANSFER_TCP_MUBF_LEN, false);
|
||
+ }
|
||
+ return TRANSFER_OTHER_THREAD;
|
||
+ } else {
|
||
+ return TRANSFER_CURRENT_THREAD;
|
||
+ }
|
||
+
|
||
+ return TRANSFER_KERNEL;
|
||
+}
|
||
+
|
||
+void gazelle_listen_thread(void *arg)
|
||
+{
|
||
+ struct cfg_params *cfg_param = get_global_cfg_params();
|
||
+ recv_pkts_from_other_process(cfg_param->process_idx, arg);
|
||
+}
|
||
+
|
||
+void flow_init(void)
|
||
+{
|
||
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
||
+ init_listen_and_user_ports();
|
||
+
|
||
+ /* run to completion mode does not currently support multiple process */
|
||
+ if (!use_ltran() && !get_global_cfg_params()->stack_mode_rtc) {
|
||
+ char name[PATH_MAX];
|
||
+ sem_init(&stack_group->sem_listen_thread, 0, 0);
|
||
+ sprintf_s(name, sizeof(name), "%s", "listen_thread");
|
||
+ struct sys_thread *thread = sys_thread_new(name, gazelle_listen_thread,
|
||
+ (void*)(&stack_group->sem_listen_thread), 0, 0);
|
||
+ free(thread);
|
||
+ sem_wait(&stack_group->sem_listen_thread);
|
||
+ }
|
||
+}
|
||
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
|
||
index f78e48a..3703092 100644
|
||
--- a/src/lstack/netif/lstack_vdev.c
|
||
+++ b/src/lstack/netif/lstack_vdev.c
|
||
@@ -33,6 +33,7 @@
|
||
#include "lstack_protocol_stack.h"
|
||
#include "gazelle_reg_msg.h"
|
||
#include "lstack_lwip.h"
|
||
+#include "lstack_flow.h"
|
||
#include "lstack_vdev.h"
|
||
|
||
/* INUSE_TX_PKTS_WATERMARK < VDEV_RX_QUEUE_SZ;
|
||
--
|
||
2.27.0
|
||
|