syn add pbuf lock when aggregate pbuf

(cherry picked from commit f101784b6c8a7494a49b219c9a46e9d86abc7e0b)
This commit is contained in:
jiangheng12 2023-03-18 15:13:27 +08:00 committed by openeuler-sync-bot
parent a5c4f95262
commit f133fa18a1
10 changed files with 4793 additions and 2 deletions

View File

@ -0,0 +1,563 @@
From c095a3145da08e976bb6e1e2a2abc6bc3d3aac31 Mon Sep 17 00:00:00 2001
From: jianheng <jiangheng14@huawei.com>
Date: Sun, 12 Mar 2023 01:30:34 +0800
Subject: [PATCH] add pbuf lock when aggregate pbuf reduce invalid send rpc
count
---
src/common/gazelle_dfx_msg.h | 1 -
src/lstack/core/lstack_cfg.c | 8 --
src/lstack/core/lstack_lwip.c | 150 ++++++++-------------
src/lstack/core/lstack_protocol_stack.c | 4 -
src/lstack/core/lstack_stack_stat.c | 3 -
src/lstack/core/lstack_thread_rpc.c | 46 +------
src/lstack/include/lstack_cfg.h | 1 -
src/lstack/include/lstack_lwip.h | 1 -
src/lstack/include/lstack_protocol_stack.h | 1 -
src/lstack/include/lstack_thread_rpc.h | 25 +++-
src/lstack/lstack.conf | 2 -
src/ltran/ltran_dfx.c | 1 -
12 files changed, 87 insertions(+), 156 deletions(-)
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index 674f2d7..608a023 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -81,7 +81,6 @@ struct gazelle_stat_pkts {
uint16_t conn_num;
uint64_t recv_list_cnt;
uint64_t call_alloc_fail;
- uint64_t send_list_cnt;
uint32_t mempool_freecnt;
struct gazelle_stack_stat stack_stat;
struct gazelle_wakeup_stat wakeup_stat;
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 86d0f14..9195f34 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -58,7 +58,6 @@ static int32_t parse_listen_shadow(void);
static int32_t parse_app_bind_numa(void);
static int32_t parse_main_thread_affinity(void);
static int32_t parse_unix_prefix(void);
-static int32_t parse_send_connect_number(void);
static int32_t parse_read_connect_number(void);
static int32_t parse_rpc_number(void);
static int32_t parse_nic_read_number(void);
@@ -108,7 +107,6 @@ static struct config_vector_t g_config_tbl[] = {
{ "unix_prefix", parse_unix_prefix },
{ "tcp_conn_count", parse_tcp_conn_count },
{ "mbuf_count_per_conn", parse_mbuf_count_per_conn },
- { "send_connect_number", parse_send_connect_number },
{ "read_connect_number", parse_read_connect_number },
{ "rpc_number", parse_rpc_number },
{ "nic_read_number", parse_nic_read_number },
@@ -734,12 +732,6 @@ static int32_t parse_mbuf_count_per_conn(void)
MBUF_COUNT_PER_CONN, 1, INT32_MAX);
}
-static int32_t parse_send_connect_number(void)
-{
- return parse_int(&g_config_params.send_connect_number, "send_connect_number",
- STACK_THREAD_DEFAULT, 1, INT32_MAX);
-}
-
static int32_t parse_read_connect_number(void)
{
return parse_int(&g_config_params.read_connect_number, "read_connect_number",
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index dcd7e05..e83bffa 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -94,7 +94,7 @@ static void reset_sock_data(struct lwip_sock *sock)
sock->listen_next = NULL;
sock->epoll_events = 0;
sock->events = 0;
- sock->in_send = 0;
+ sock->call_num = 0;
sock->remain_len = 0;
if (sock->recv_lastdata) {
@@ -117,9 +117,10 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u
pbuf->l4_len = 0;
pbuf->header_off = 0;
pbuf->rexmit = 0;
- pbuf->in_write = 0;
+ pbuf->allow_in = 1;
pbuf->head = 0;
pbuf->last = pbuf;
+ pthread_spin_init(&pbuf->pbuf_lock, PTHREAD_PROCESS_SHARED);
}
return pbuf;
@@ -193,7 +194,6 @@ void gazelle_init_sock(int32_t fd)
sock->stack->conn_num++;
init_list_node_null(&sock->recv_list);
init_list_node_null(&sock->event_list);
- init_list_node_null(&sock->send_list);
}
void gazelle_clean_sock(int32_t fd)
@@ -214,7 +214,6 @@ void gazelle_clean_sock(int32_t fd)
reset_sock_data(sock);
list_del_node_null(&sock->recv_list);
- list_del_node_null(&sock->send_list);
}
void gazelle_free_pbuf(struct pbuf *pbuf)
@@ -264,11 +263,16 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8
if (unlikely(sock->send_pre_del)) {
pbuf = sock->send_pre_del;
- if (pbuf->tot_len > remain_size ||
- (pbuf->head && __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE))) {
+ pthread_spin_lock(&pbuf->pbuf_lock);
+ if (pbuf->tot_len > remain_size) {
+ pthread_spin_unlock(&pbuf->pbuf_lock);
*apiflags &= ~TCP_WRITE_FLAG_MORE;
return NULL;
}
+ if (pbuf->allow_in == 1) {
+ __sync_fetch_and_sub(&pbuf->allow_in, 1);
+ }
+ pthread_spin_unlock(&pbuf->pbuf_lock);
if (pbuf->next) {
sock->send_lastdata = pbuf->next;
@@ -295,10 +299,24 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8
}
sock->send_pre_del = pbuf;
- if (pbuf->tot_len > remain_size || __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE)) {
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
- pbuf->head = 1;
- return NULL;
+ if (!gazelle_ring_readover_count(sock->send_ring)) {
+ pthread_spin_lock(&pbuf->pbuf_lock);
+ if (pbuf->tot_len > remain_size) {
+ pthread_spin_unlock(&pbuf->pbuf_lock);
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ pbuf->head = 1;
+ return NULL;
+ }
+ if(pbuf->allow_in == 1){
+ __sync_fetch_and_sub(&pbuf->allow_in, 1);
+ }
+ pthread_spin_unlock(&pbuf->pbuf_lock);
+ } else {
+ if (pbuf->tot_len > remain_size) {
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ pbuf->head = 1;
+ return NULL;
+ }
}
sock->send_lastdata = pbuf->next;
@@ -442,18 +460,17 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r)
struct pbuf *last_pbuf = NULL;
volatile uint32_t tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
uint32_t last = r->prod.tail - 1;
- if (last == tail || last - tail > r->capacity) {
+ if (last + 1 == tail || last + 1 - tail > r->capacity) {
return NULL;
}
__rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1);
- __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE);
-
- rte_mb();
- tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
- if (last == tail || last - tail > r->capacity) {
- __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
+ if (pthread_spin_trylock(&last_pbuf->pbuf_lock) != 0) {
+ return NULL;
+ }
+ if (last_pbuf->allow_in != 1) {
+ pthread_spin_unlock(&last_pbuf->pbuf_lock);
return NULL;
}
@@ -462,7 +479,7 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r)
static inline void gazelle_ring_lastover(struct pbuf *last_pbuf)
{
- __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
+ pthread_spin_unlock(&last_pbuf->pbuf_lock);
}
static inline size_t merge_data_lastpbuf(struct lwip_sock *sock, void *buf, size_t len)
@@ -623,56 +640,28 @@ void stack_send(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_3].p;
+ bool replenish_again;
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
msg->result = -1;
+ LSTACK_LOG(ERR, LSTACK, "stack_send: sock error!\n");
+ rpc_msg_free(msg);
return;
}
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
- rte_mb();
-
- /* have remain data or replenish again add sendlist */
- if (sock->errevent == 0 && NETCONN_IS_DATAOUT(sock)) {
- if (list_is_null(&sock->send_list)) {
- list_add_node(&stack->send_list, &sock->send_list);
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
- }
- }
-}
-
-void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
-{
- struct list_node *node, *temp;
- struct lwip_sock *sock;
- uint32_t read_num = 0;
- bool replenish_again;
-
- list_for_each_safe(node, temp, &stack->send_list) {
- sock = container_of(node, struct lwip_sock, send_list);
-
- if (++read_num > send_max) {
- /* list head move to next send */
- list_del_node(&stack->send_list);
- list_add_node(&sock->send_list, &stack->send_list);
- break;
- }
-
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
- rte_mb();
-
- if (sock->conn == NULL || sock->errevent > 0) {
- list_del_node_null(&sock->send_list);
- continue;
- }
-
- replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0);
-
- if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) {
- list_del_node_null(&sock->send_list);
+ replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0);
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) {
+ rpc_msg_free(msg);
+ return;
+ } else {
+ if(__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 0){
+ rpc_call(&stack->rpc_queue, msg);
+ __sync_fetch_and_add(&sock->call_num, 1);
} else {
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
+ rpc_msg_free(msg);
+ return;
}
}
}
@@ -686,29 +675,6 @@ static inline void free_recv_ring_readover(struct rte_ring *ring)
}
}
-static inline struct pbuf *gazelle_ring_enqueuelast(struct rte_ring *r)
-{
- struct pbuf *last_pbuf = NULL;
- volatile uint32_t head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE);
- uint32_t last = r->cons.head - 1;
- if (last == head || last - head > r->capacity) {
- return NULL;
- }
-
- __rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1);
- __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE);
-
- rte_mb();
-
- head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE);
- if (last == head || last - head > r->capacity) {
- __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
- return NULL;
- }
-
- return last_pbuf;
-}
-
static inline struct pbuf *pbuf_last(struct pbuf *pbuf)
{
while (pbuf->next) {
@@ -824,11 +790,14 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags)
static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
{
- if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) {
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
- if (rpc_call_send(fd, NULL, len, flags) != 0) {
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
+ if(__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) >= 2){
+ ;;
+ } else {
+ while (rpc_call_send(fd, NULL, len, flags) < 0) {
+ usleep(1000); // wait 1ms to exec again
+ LSTACK_LOG(INFO, LSTACK, "rpc_call_send failed, try again\n");
}
+ __sync_fetch_and_add(&sock->call_num, 1);
}
}
@@ -1240,13 +1209,6 @@ void stack_mempool_size(struct rpc_msg *msg)
msg->result = rte_mempool_avail_count(stack->rxtx_pktmbuf_pool);
}
-void stack_sendlist_count(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p;
-
- msg->result = get_list_count(&stack->send_list);
-}
-
void stack_recvlist_count(struct rpc_msg *msg)
{
struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p;
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index e13034f..300d7af 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -279,7 +279,6 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
stack->lwip_stats = &lwip_stats;
init_list_node(&stack->recv_list);
- init_list_node(&stack->send_list);
init_list_node(&stack->wakeup_list);
sys_calibrate_tsc();
@@ -417,7 +416,6 @@ static void* gazelle_stack_thread(void *arg)
struct cfg_params *cfg = get_global_cfg_params();
bool use_ltran_flag = cfg->use_ltran;;
bool kni_switch = cfg->kni_switch;
- uint32_t send_connect_number = cfg->send_connect_number;
uint32_t read_connect_number = cfg->read_connect_number;
uint32_t rpc_number = cfg->rpc_number;
uint32_t nic_read_number = cfg->nic_read_number;
@@ -441,8 +439,6 @@ static void* gazelle_stack_thread(void *arg)
for (;;) {
poll_rpc_msg(stack, rpc_number);
- send_stack_list(stack, send_connect_number);
-
gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
read_recv_list(stack, read_connect_number);
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 75322d5..a39e372 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -152,9 +152,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
rpc_call_result = rpc_call_recvlistcnt(stack);
dfx->data.pkts.recv_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
- rpc_call_result = rpc_call_sendlistcnt(stack);
- dfx->data.pkts.send_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
-
dfx->data.pkts.conn_num = stack->conn_num;
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 08fe20d..fe3b757 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -23,14 +23,6 @@
#include "lstack_dpdk.h"
#include "lstack_thread_rpc.h"
-#define RPC_MSG_MAX 512
-#define RPC_MSG_MASK (RPC_MSG_MAX - 1)
-struct rpc_msg_pool {
- struct rpc_msg msgs[RPC_MSG_MAX];
- uint32_t prod __rte_cache_aligned;
- uint32_t cons __rte_cache_aligned;
-};
-
static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL;
static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
@@ -76,21 +68,6 @@ static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func
return msg;
}
-static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg)
-{
- pthread_spin_destroy(&msg->lock);
-
- msg->self_release = 0;
- msg->func = NULL;
-
- atomic_fetch_add((_Atomic uint32_t *)&msg->pool->cons, 1);
-}
-
-static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
-{
- lockless_queue_mpsc_push(queue, &msg->queue_node);
-}
-
static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg)
{
int32_t ret;
@@ -124,10 +101,13 @@ void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
stack->stats.call_null++;
}
- if (msg->self_release) {
- pthread_spin_unlock(&msg->lock);
- } else {
- rpc_msg_free(msg);
+ /* stack_send free msg in stack_send */
+ if (msg->func != stack_send) {
+ if (msg->self_release) {
+ pthread_spin_unlock(&msg->lock);
+ } else {
+ rpc_msg_free(msg);
+ }
}
}
}
@@ -217,18 +197,6 @@ int32_t rpc_call_mempoolsize(struct protocol_stack *stack)
return rpc_sync_call(&stack->rpc_queue, msg);
}
-int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendlist_count);
- if (msg == NULL) {
- return -1;
- }
-
- msg->args[MSG_ARG_0].p = stack;
-
- return rpc_sync_call(&stack->rpc_queue, msg);
-}
-
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count);
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 5f16c19..2705fee 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -76,7 +76,6 @@ struct cfg_params {
uint32_t lpm_pkts_in_detect;
uint32_t tcp_conn_count;
uint32_t mbuf_count_per_conn;
- uint32_t send_connect_number;
uint32_t read_connect_number;
uint32_t rpc_number;
uint32_t nic_read_number;
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 6f5b4f4..02110e0 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -35,7 +35,6 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
void add_recv_list(int32_t fd);
-void stack_sendlist_count(struct rpc_msg *msg);
void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 795db39..11b001c 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -68,7 +68,6 @@ struct protocol_stack {
struct rte_mbuf *pkts[RTE_TEST_RX_DESC_DEFAULT];
struct list_node recv_list;
- struct list_node send_list;
struct list_node wakeup_list;
volatile uint16_t conn_num;
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index aff30dc..ed111fb 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -24,6 +24,10 @@
#define MSG_ARG_3 (3)
#define MSG_ARG_4 (4)
#define RPM_MSG_ARG_SIZE (5)
+
+#define RPC_MSG_MAX 512
+#define RPC_MSG_MASK (RPC_MSG_MAX - 1)
+
struct rpc_msg;
typedef void (*rpc_msg_func)(struct rpc_msg *msg);
union rpc_msg_arg {
@@ -48,6 +52,12 @@ struct rpc_msg {
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
};
+struct rpc_msg_pool {
+ struct rpc_msg msgs[RPC_MSG_MAX];
+ uint32_t prod __rte_cache_aligned;
+ uint32_t cons __rte_cache_aligned;
+};
+
struct protocol_stack;
struct rte_mbuf;
struct wakeup_poll;
@@ -57,7 +67,6 @@ void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wake
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
-int32_t rpc_call_sendlistcnt(struct protocol_stack *stack);
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn);
@@ -79,4 +88,18 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp);
int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock);
int32_t rpc_call_mempoolsize(struct protocol_stack *stack);
+static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
+{
+ lockless_queue_mpsc_push(queue, &msg->queue_node);
+}
+
+static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg)
+{
+ pthread_spin_destroy(&msg->lock);
+
+ msg->self_release = 0;
+
+ __atomic_fetch_add((_Atomic uint32_t *)&msg->pool->cons, 1, __ATOMIC_SEQ_CST);
+}
+
#endif
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index a4571ff..cf81954 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -28,8 +28,6 @@ send_ring_size = 256
expand_send_ring = 0
#protocol stack thread per loop params
-#send connect to nic
-send_connect_number = 4
#read data form protocol stack into recv_ring
read_connect_number = 4
#process rpc msg number
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 1c9d4fa..051787e 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -572,7 +572,6 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("write_lwip: %-17"PRIu64" ", lstack_stat->data.pkts.stack_stat.write_lwip_cnt);
printf("app_write_rpc: %-14"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_rpc);
printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list_cnt);
- printf("send_list: %-18"PRIu64" ", lstack_stat->data.pkts.send_list_cnt);
printf("conn_num: %-19hu \n", lstack_stat->data.pkts.conn_num);
printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.stack_stat.wakeup_events);
printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_events);
--
2.23.0

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,457 @@
From a991e372a23ac1a065f4a4a99096729a688d2856 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Mon, 13 Mar 2023 14:25:17 +0800
Subject: [PATCH] add gazellectl -x to show nic stats add gazellectl -a to show
small packet aggregtion degree
---
src/common/gazelle_dfx_msg.h | 28 ++++
src/lstack/core/lstack_dpdk.c | 19 +++
src/lstack/core/lstack_lwip.c | 1 +
src/lstack/core/lstack_stack_stat.c | 41 ++++++
src/lstack/include/lstack_dpdk.h | 3 +
src/lstack/include/lstack_protocol_stack.h | 1 +
src/lstack/include/lstack_stack_stat.h | 1 +
src/ltran/ltran_dfx.c | 148 +++++++++++++++++----
8 files changed, 213 insertions(+), 29 deletions(-)
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index 608a023..1863837 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -43,6 +43,8 @@ enum GAZELLE_STAT_MODE {
GAZELLE_STAT_LSTACK_SHOW_CONN,
GAZELLE_STAT_LSTACK_SHOW_LATENCY,
GAZELLE_STAT_LSTACK_LOW_POWER_MDF,
+ GAZELLE_STAT_LSTACK_SHOW_XSTATS,
+ GAZELLE_STAT_LSTACK_SHOW_AGGREGATE,
GAZELLE_STAT_MODE_MAX,
};
@@ -197,6 +199,30 @@ struct gazelle_stat_low_power_info {
uint16_t lpm_rx_pkts;
};
+#define RTE_ETH_XSTATS_NAME_SIZE 64
+struct nic_eth_xstats_name {
+ char name[RTE_ETH_XSTATS_NAME_SIZE];
+};
+
+struct nic_eth_xstats {
+ struct nic_eth_xstats_name xstats_name[128];
+ uint64_t values[128];
+ uint32_t len;
+ uint16_t port_id;
+};
+
+struct gazelle_stack_aggregate_stats {
+ /* 0: RX, 1: TX, 2: APP_TX */
+ uint32_t size_1_64[3];
+ uint32_t size_65_512[3];
+ uint32_t size_513_1460[3];
+ uint32_t size_1461_8192[3];
+ uint32_t size_8193_max[3];
+
+ uint64_t rx_bytes;
+ uint64_t tx_bytes;
+};
+
struct gazelle_stack_dfx_data {
/* indicates whether the current message is the last */
uint32_t eof;
@@ -209,6 +235,8 @@ struct gazelle_stack_dfx_data {
struct gazelle_stack_latency latency;
struct gazelle_stat_lstack_conn conn;
struct gazelle_stat_lstack_snmp snmp;
+ struct nic_eth_xstats nic_xstats;
+ struct gazelle_stack_aggregate_stats aggregate_stats;
} data;
};
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 1beb66b..c6d6290 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -645,3 +645,22 @@ bool port_in_stack_queue(uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, ui
return (reta_index % stack_group->nb_queues) == stack->queue_id;
}
+void dpdk_nic_xstats_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id)
+{
+ int32_t ret;
+ int32_t len = rte_eth_xstats_get_names_by_id(port_id, NULL, 0, NULL);
+ dfx->data.nic_xstats.len = len;
+ dfx->data.nic_xstats.port_id = port_id;
+ if (len < 0) {
+ return;
+ }
+ if (len != rte_eth_xstats_get_names_by_id(port_id, (struct rte_eth_xstat_name *)dfx->data.nic_xstats.xstats_name, len, NULL)) {
+ dfx->data.nic_xstats.len = -1;
+ return;
+ }
+
+ ret = rte_eth_xstats_get_by_id(port_id, NULL, dfx->data.nic_xstats.values, len);
+ if (ret < 0 || ret > len) {
+ dfx->data.nic_xstats.len = -1;
+ }
+}
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index e83bffa..063eea4 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -714,6 +714,7 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
}
recv_len += pbufs[i]->tot_len;
+ lstack_calculate_aggregate(0, pbufs[i]->tot_len);
read_count++;
/* once we have some data to return, only add more if we don't need to wait */
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index a39e372..92d7a39 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -27,6 +27,7 @@
#include "lstack_protocol_stack.h"
#include "lstack_stack_stat.h"
#include "posix/lstack_epoll.h"
+#include "lstack_dpdk.h"
#define US_PER_SEC 1000000
@@ -67,6 +68,31 @@ void calculate_lstack_latency(struct gazelle_stack_latency *stack_latency, const
latency_stat->latency_pkts++;
}
+void lstack_calculate_aggregate(int type, uint32_t len)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ if (stack_group->latency_start) {
+ struct protocol_stack *stack = get_protocol_stack();
+ if (type == 1) {
+ stack->aggregate_stats.tx_bytes += len;
+ } else if (type == 0) {
+ stack->aggregate_stats.rx_bytes += len;
+ }
+
+ if (len <= 64) {
+ stack->aggregate_stats.size_1_64[type]++;
+ } else if (len <= 512) {
+ stack->aggregate_stats.size_65_512[type]++;
+ } else if (len <= 1460) {
+ stack->aggregate_stats.size_513_1460[type]++;
+ } else if (len <= 8192) {
+ stack->aggregate_stats.size_1461_8192[type]++;
+ } else {
+ stack->aggregate_stats.size_8193_max[type]++;
+ }
+ }
+}
+
static void set_latency_start_flag(bool start)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
@@ -89,6 +115,7 @@ static void set_latency_start_flag(bool start)
stack->latency.start_time = get_current_time();
stack->latency.lwip_latency.latency_min = ~((uint64_t)0);
stack->latency.read_latency.latency_min = ~((uint64_t)0);
+ memset_s(&stack->aggregate_stats, sizeof(struct gazelle_stack_aggregate_stats), 0, sizeof(stack->aggregate_stats));
}
}
@@ -185,6 +212,12 @@ static void get_stack_dfx_data(struct gazelle_stack_dfx_data *dfx, struct protoc
LSTACK_LOG(ERR, LSTACK, "memcpy_s err ret=%d \n", ret);
}
break;
+ case GAZELLE_STAT_LSTACK_SHOW_AGGREGATE:
+ ret = memcpy_s(&dfx->data.aggregate_stats, sizeof(dfx->data.aggregate_stats), &stack->aggregate_stats, sizeof(stack->aggregate_stats));
+ if (ret != EOK) {
+ LSTACK_LOG(ERR, LSTACK, "memcpy_s err ret=%d \n", ret);
+ }
+ break;
case GAZELLE_STAT_LTRAN_START_LATENCY:
set_latency_start_flag(true);
break;
@@ -221,6 +254,14 @@ int32_t handle_stack_cmd(int32_t fd, enum GAZELLE_STAT_MODE stat_mode)
struct gazelle_stack_dfx_data dfx;
struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ if (stat_mode == GAZELLE_STAT_LSTACK_SHOW_XSTATS) {
+ dpdk_nic_xstats_get(&dfx, 0);
+ dfx.tid = 0;
+ dfx.eof = 1;
+ send_control_cmd_data(fd, &dfx);
+ return 0;
+ }
+
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
struct protocol_stack *stack = stack_group->stacks[i];
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index 55ca7a1..7c222ca 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -14,6 +14,7 @@
#define _GAZELLE_DPDK_H_
#include "gazelle_opt.h"
+#include "gazelle_dfx_msg.h"
#define RXTX_CACHE_SZ (VDEV_RX_QUEUE_SZ)
#define KNI_NB_MBUF (DEFAULT_RING_SIZE << 4)
@@ -53,4 +54,6 @@ void dpdk_restore_pci(void);
bool port_in_stack_queue(uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
uint16_t get_port_id();
struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,uint32_t mbuf_cache_size, uint16_t queue_id);
+
+void dpdk_nic_xstats_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id);
#endif /* GAZELLE_DPDK_H */
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index b5d3f7d..d58c98a 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -77,6 +77,7 @@ struct protocol_stack {
struct stats_ *lwip_stats;
struct gazelle_stack_latency latency;
struct gazelle_stack_stat stats;
+ struct gazelle_stack_aggregate_stats aggregate_stats;
};
struct eth_params;
diff --git a/src/lstack/include/lstack_stack_stat.h b/src/lstack/include/lstack_stack_stat.h
index 6057fe1..5737bae 100644
--- a/src/lstack/include/lstack_stack_stat.h
+++ b/src/lstack/include/lstack_stack_stat.h
@@ -28,5 +28,6 @@ int32_t handle_stack_cmd(int fd, enum GAZELLE_STAT_MODE stat_mode);
uint64_t get_current_time(void);
void lstack_get_low_power_info(struct gazelle_stat_low_power_info *low_power_info);
void unregister_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup);
+void lstack_calculate_aggregate(int type, uint32_t len);
#endif /* GAZELLE_STACK_STAT_H */
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 051787e..331a23c 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -86,6 +86,8 @@ static void gazelle_print_lstack_stat_latency(void *buf, const struct gazelle_st
static void gazelle_print_lstack_stat_lpm(void *buf, const struct gazelle_stat_msg_request *req_msg);
static void gazelle_print_ltran_sock(void *buf, const struct gazelle_stat_msg_request *req_msg);
static void gazelle_print_ltran_conn(void *buf, const struct gazelle_stat_msg_request *req_msg);
+static void gazelle_print_lstack_xstats(void *buf, const struct gazelle_stat_msg_request *req_msg);
+static void gazelle_print_lstack_aggregate(void *buf, const struct gazelle_stat_msg_request *req_msg);
static struct gazelle_dfx_list g_gazelle_dfx_tbl[] = {
{GAZELLE_STAT_LTRAN_SHOW, sizeof(struct gazelle_stat_ltran_total), gazelle_print_ltran_stat_total},
@@ -106,10 +108,51 @@ static struct gazelle_dfx_list g_gazelle_dfx_tbl[] = {
{GAZELLE_STAT_LSTACK_SHOW_CONN, sizeof(struct gazelle_stack_dfx_data), gazelle_print_lstack_stat_conn},
{GAZELLE_STAT_LSTACK_SHOW_LATENCY, sizeof(struct gazelle_stack_dfx_data), gazelle_print_lstack_stat_latency},
{GAZELLE_STAT_LSTACK_LOW_POWER_MDF, sizeof(struct gazelle_stack_dfx_data), gazelle_print_lstack_stat_lpm},
+ {GAZELLE_STAT_LSTACK_SHOW_XSTATS, sizeof(struct gazelle_stack_dfx_data), gazelle_print_lstack_xstats},
+ {GAZELLE_STAT_LSTACK_SHOW_AGGREGATE, sizeof(struct gazelle_stack_dfx_data), gazelle_print_lstack_aggregate},
};
static int32_t g_wait_reply = 1;
+static double rate_convert_type(uint64_t bytes, char **type)
+{
+ static char *rate_type[] = {"b/s", "Kb/s", "Mb/s"};
+ const uint32_t per_unit = 1024; // 1KB=1024B
+ double now = bytes * 8;
+ uint32_t type_max = sizeof(rate_type) / sizeof(char *);
+ uint32_t index = 0;
+
+ while (now > per_unit && index < type_max - 1) {
+ now /= per_unit;
+ index++;
+ }
+
+ *type = rate_type[index];
+ return now;
+}
+
+static void gazelle_print_lstack_xstats(void *buf, const struct gazelle_stat_msg_request *req_msg)
+{
+ struct gazelle_stack_dfx_data *stat = (struct gazelle_stack_dfx_data *)buf;
+ struct nic_eth_xstats *xstats = &stat->data.nic_xstats;
+ static const char *nic_stats_border = "########################";
+
+ printf("###### NIC extended statistics for port %-2d #########\n", 0);
+ printf("%s############################\n",nic_stats_border);
+ if (xstats->len <= 0) {
+ printf("Cannot get xstats\n");
+ return;
+ }
+
+ for (uint32_t i = 0; i < xstats->len; i++) {
+ printf("%s: %"PRIu64"\n", xstats->xstats_name[i].name,
+ xstats->values[i]);
+ }
+
+ printf("%s############################\n",
+ nic_stats_border);
+}
+
static void gazelle_print_ltran_conn(void *buf, const struct gazelle_stat_msg_request *req_msg)
{
struct gazelle_stat_forward_table *table = (struct gazelle_stat_forward_table *)buf;
@@ -331,23 +374,6 @@ static void gazelle_print_ltran_stat_total(void *buf, const struct gazelle_stat_
}
}
-static double rate_convert_type(uint64_t bytes, char **type)
-{
- static char *rate_type[] = {"B/s", "KB/s", "MB/s", "GB/s"};
- const uint32_t per_unit = 1024; // 1KB=1024B
- double now = bytes;
- uint32_t type_max = sizeof(rate_type) / sizeof(char *);
- uint32_t index = 0;
-
- while (now > per_unit && index < type_max - 1) {
- now /= per_unit;
- index++;
- }
-
- *type = rate_type[index];
- return now;
-}
-
static void gazelle_print_ltran_stat_rate(void *buf, const struct gazelle_stat_msg_request *req_msg)
{
uint32_t i;
@@ -949,6 +975,8 @@ static void show_usage(void)
" -s, snmp show lstack snmp \n"
" -c, connect show lstack connect \n"
" -l, latency [time] show lstack latency \n"
+ " -x, xstats show lstack xstats \n"
+ " -a, aggregatin [time] show lstack send/recv aggregation \n"
" set: \n"
" loglevel {error | info | debug} set lstack loglevel \n"
" lowpower {0 | 1} set lowpower enable \n"
@@ -1037,6 +1065,46 @@ static int32_t parse_dfx_ltran_show_args(int32_t argc, char *argv[], struct gaze
return cmd_index;
}
+static void gazelle_print_lstack_aggregate(void *buf, const struct gazelle_stat_msg_request *req_msg)
+{
+ struct gazelle_stack_dfx_data *dfx = (struct gazelle_stack_dfx_data *)buf;
+ struct gazelle_stack_aggregate_stats *stats = &dfx->data.aggregate_stats;
+ char *rate_type = NULL;
+ double rate;
+ int32_t ret = 0;
+
+ do {
+ printf("\n================Stack(%d) Aggregate===============\n", dfx->tid);
+ rate = rate_convert_type(stats->rx_bytes / g_wait_reply, &rate_type);
+ printf("rx throught: %f%s\n", rate, rate_type);
+ rate = rate_convert_type(stats->tx_bytes / g_wait_reply, &rate_type);
+ printf("tx throught: %f%s\n", rate, rate_type);
+
+ printf("rx_szie_1_64: %u\n", stats->size_1_64[0]);
+ printf("rx_size_65_512: %u\n", stats->size_65_512[0]);
+ printf("rx_size_513_1460 byte: %u\n", stats->size_513_1460[0]);
+ printf("rx_size_1461_8192 byte: %u\n", stats->size_1461_8192[0]);
+ printf("rx_size_8193_max byte: %u\n", stats->size_8193_max[0]);
+
+ printf("tx_szie_1_64: %u\n", stats->size_1_64[1]);
+ printf("tx_size_65_512: %u\n", stats->size_65_512[1]);
+ printf("tx_size_513_1460 byte: %u\n", stats->size_513_1460[1]);
+ printf("tx_size_1461_8192 byte: %u\n", stats->size_1461_8192[1]);
+ printf("tx_size_8193_max byte: %u\n", stats->size_8193_max[1]);
+
+ printf("app_tx_szie_1_64: %u\n", stats->size_1_64[2]);
+ printf("app_tx_size_65_512: %u\n", stats->size_65_512[2]);
+ printf("app_tx_size_513_1460 byte: %u\n", stats->size_513_1460[2]);
+ printf("app_tx_size_1461_8192 byte: %u\n", stats->size_1461_8192[2]);
+ printf("app_tx_size_8193_max byte: %u\n", stats->size_8193_max[2]);
+
+ if ((dfx->eof != 0) || (ret != GAZELLE_OK)) {
+ break;
+ }
+ ret = dfx_stat_read_from_ltran(buf, sizeof(struct gazelle_stack_dfx_data), req_msg->stat_mode);
+ } while(true);
+}
+
static int32_t parse_dfx_ltran_args(int32_t argc, char *argv[], struct gazelle_stat_msg_request *req_msg)
{
int32_t num_cmd = 0;
@@ -1098,6 +1166,23 @@ static int32_t parse_dfx_lstack_set_args(int32_t argc, char *argv[], struct gaze
return cmd_index;
}
+static int parse_delay_arg(int32_t argc, char *argv[], long int delay)
+{
+ if (argc > GAZELLE_OPTIONS2_ARG_IDX) {
+ char *end = NULL;
+ delay = strtol(argv[GAZELLE_OPTIONS2_ARG_IDX], &end, GAZELLE_DECIMAL);
+ if (delay <= 0 || (end == NULL) || (*end != '\0')) {
+ return -1;
+ }
+ if (delay > GAZELLE_MAX_LATENCY_TIME) {
+ printf("Exceeds the maximum(30mins) latency time, will be set to maximum(30mins)\n");
+ delay = GAZELLE_MAX_LATENCY_TIME;
+ }
+ }
+ g_wait_reply = delay;
+ return 0;
+}
+
static int32_t parse_dfx_lstack_show_args(int32_t argc, char *argv[], struct gazelle_stat_msg_request *req_msg)
{
int32_t cmd_index = 0;
@@ -1119,6 +1204,9 @@ static int32_t parse_dfx_lstack_show_args(int32_t argc, char *argv[], struct gaz
if (strcmp(param, "connect") == 0 || strcmp(param, "-c") == 0) {
req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LSTACK_SHOW_CONN;
}
+ if (strcmp(param, "xstats") == 0 || strcmp(param, "-x") == 0) {
+ req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LSTACK_SHOW_XSTATS;
+ }
if (strcmp(param, "latency") == 0 || strcmp(param, "-l") == 0) {
req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LTRAN_START_LATENCY;
req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LTRAN_STOP_LATENCY;
@@ -1127,18 +1215,17 @@ static int32_t parse_dfx_lstack_show_args(int32_t argc, char *argv[], struct gaz
req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LTRAN_SHOW_LATENCY;
}
- if (argc > GAZELLE_OPTIONS2_ARG_IDX) {
- char *end = NULL;
- delay = strtol(argv[GAZELLE_OPTIONS2_ARG_IDX], &end, GAZELLE_DECIMAL);
- if (delay <= 0 || (end == NULL) || (*end != '\0')) {
- return -1;
- }
- if (delay > GAZELLE_MAX_LATENCY_TIME) {
- printf("Exceeds the maximum(30mins) latency time, will be set to maximum(30mins)\n");
- delay = GAZELLE_MAX_LATENCY_TIME;
- }
+ if (parse_delay_arg(argc, argv, delay) != 0) {
+ return 0;
+ }
+ }
+ if (strcmp(param, "aggragate") == 0 || strcmp(param, "-a") == 0) {
+ req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LTRAN_START_LATENCY;
+ req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LTRAN_STOP_LATENCY;
+ req_msg[cmd_index++].stat_mode = GAZELLE_STAT_LSTACK_SHOW_AGGREGATE;
+ if (parse_delay_arg(argc, argv, delay) != 0) {
+ return 0;
}
- g_wait_reply = delay;
}
return cmd_index;
@@ -1212,10 +1299,13 @@ static int32_t check_cmd_support(struct gazelle_stat_msg_request *req_msg, int32
case GAZELLE_STAT_LSTACK_SHOW_CONN:
case GAZELLE_STAT_LSTACK_SHOW_LATENCY:
case GAZELLE_STAT_LSTACK_LOW_POWER_MDF:
+ case GAZELLE_STAT_LSTACK_SHOW_XSTATS:
+ case GAZELLE_STAT_LSTACK_SHOW_AGGREGATE:
return 0;
default:
if (req_msg[0].stat_mode == GAZELLE_STAT_LTRAN_START_LATENCY &&
- req_msg[req_msg_num - 1].stat_mode == GAZELLE_STAT_LSTACK_SHOW_LATENCY) {
+ (req_msg[req_msg_num - 1].stat_mode == GAZELLE_STAT_LSTACK_SHOW_LATENCY ||
+ req_msg[req_msg_num - 1].stat_mode == GAZELLE_STAT_LSTACK_SHOW_AGGREGATE)) {
return 0;
}
/* keep output consistency */
--
2.23.0

View File

@ -0,0 +1,590 @@
From a8fbb2cc4f9367e4a83c3611e7a7bdb821504015 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Mon, 13 Mar 2023 16:08:13 +0800
Subject: [PATCH] add same node ring for inter-proces communication
---
src/lstack/api/lstack_wrap.c | 21 +-
src/lstack/core/lstack_lwip.c | 377 +++++++++++++++++++--
src/lstack/core/lstack_protocol_stack.c | 5 +
src/lstack/include/lstack_ethdev.h | 1 +
src/lstack/include/lstack_lwip.h | 3 +-
src/lstack/include/lstack_protocol_stack.h | 1 +
src/lstack/netif/lstack_ethdev.c | 2 +-
7 files changed, 381 insertions(+), 29 deletions(-)
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 561c6e4..46cbcec 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -200,6 +200,16 @@ static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen
return rpc_call_bind(s, name, namelen);
}
+bool is_dst_ip_localhost(const struct sockaddr *addr)
+{
+ struct cfg_params *global_params = get_global_cfg_params();
+ struct sockaddr_in *servaddr = (struct sockaddr_in *) addr;
+ if(global_params->host_addr.addr == servaddr->sin_addr.s_addr){
+ return true;
+ }
+ return false;
+}
+
static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t namelen)
{
if (name == NULL) {
@@ -224,9 +234,14 @@ static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t name
return ret;
}
- ret = posix_api->connect_fn(s, name, namelen);
- if (ret == 0) {
- return ret;
+ char listen_ring_name[RING_NAME_LEN];
+ int remote_port = htons(((struct sockaddr_in *)name)->sin_port);
+ snprintf(listen_ring_name, sizeof(listen_ring_name), "listen_rx_ring_%u", remote_port);
+ if (!is_dst_ip_localhost(name) || rte_ring_lookup(listen_ring_name) == NULL) {
+ ret = posix_api->connect_fn(s, name, namelen);
+ if (ret == 0) {
+ return ret;
+ }
}
return -1;
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 063eea4..60abfe8 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -19,6 +19,7 @@
#include <lwip/pbuf.h>
#include <lwip/priv/tcp_priv.h>
#include <lwip/posix_api.h>
+#include <lwip/tcp.h>
#include <securec.h>
#include <rte_errno.h>
#include <rte_malloc.h>
@@ -106,7 +107,6 @@ static void reset_sock_data(struct lwip_sock *sock)
static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, uint16_t length, pbuf_type type)
{
struct pbuf_custom *pbuf_custom = mbuf_to_pbuf(mbuf);
- pbuf_custom->custom_free_function = gazelle_free_pbuf;
void *data = rte_pktmbuf_mtod(mbuf, void *);
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
@@ -229,18 +229,11 @@ void gazelle_free_pbuf(struct pbuf *pbuf)
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num)
{
- struct pbuf_custom *pbuf_custom = NULL;
-
int32_t ret = rte_pktmbuf_alloc_bulk(pool, mbufs, num);
if (ret != 0) {
return ret;
}
- for (uint32_t i = 0; i < num; i++) {
- pbuf_custom = mbuf_to_pbuf(mbufs[i]);
- pbuf_custom->custom_free_function = gazelle_free_pbuf;
- }
-
return 0;
}
@@ -802,6 +795,93 @@ static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t
}
}
+static inline void del_data_in_event(struct lwip_sock *sock)
+{
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+
+ /* check again avoid cover event add in stack thread */
+ if (!NETCONN_IS_DATAIN(sock)) {
+ sock->events &= ~EPOLLIN;
+
+ if (sock->events == 0) {
+ list_del_node_null(&sock->event_list);
+ }
+ }
+
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+}
+
+/* process on same node use ring to recv data */
+ssize_t gazelle_same_node_ring_recv(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
+{
+ unsigned long long cur_begin = sock->same_node_rx_ring->sndbegin;
+ unsigned long long cur_end;
+ unsigned long long index = cur_begin + 1;
+ size_t act_len = 0;
+
+ cur_end = __atomic_load_n(&sock->same_node_rx_ring->sndend, __ATOMIC_ACQUIRE);
+ if (cur_begin == cur_end) {
+ errno = EAGAIN;
+ act_len = -1;
+ goto END;
+ }
+
+ act_len = cur_end - index + 1;
+ act_len = RTE_MIN(act_len, len);
+
+ if ((index & SAME_NODE_RING_MASK) + act_len > SAME_NODE_RING_LEN) {
+ size_t act_len1 = SAME_NODE_RING_LEN - (index & SAME_NODE_RING_MASK);
+ size_t act_len2 = act_len - act_len1;
+ rte_memcpy((char *)buf, (char *)sock->same_node_rx_ring->mz->addr + (index & SAME_NODE_RING_MASK), act_len1);
+ rte_memcpy((char *)buf + act_len1, (char *)sock->same_node_rx_ring->mz->addr, act_len2);
+ } else {
+ rte_memcpy((char *)buf, (char *)sock->same_node_rx_ring->mz->addr + (index & SAME_NODE_RING_MASK), act_len);
+ }
+
+ index += act_len;
+ __atomic_store_n(&sock->same_node_rx_ring->sndbegin, index - 1, __ATOMIC_RELEASE);
+
+END:
+ /* rte_ring_count reduce lock */
+ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)) {
+ del_data_in_event(sock);
+ }
+ return act_len;
+}
+
+/* processes on same node use ring to send data */
+ssize_t gazelle_same_node_ring_send(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
+{
+ unsigned long long cur_begin = __atomic_load_n(&sock->same_node_tx_ring->sndbegin, __ATOMIC_ACQUIRE);
+ unsigned long long cur_end = sock->same_node_tx_ring->sndend;
+ if (cur_end >= cur_begin + SAME_NODE_RING_LEN) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ unsigned long long index = cur_end + 1;
+ size_t act_len = SAME_NODE_RING_LEN - (cur_end - cur_begin);
+ act_len = RTE_MIN(act_len, len);
+
+ if ((index & SAME_NODE_RING_MASK) + act_len > SAME_NODE_RING_LEN) {
+ size_t act_len1 = SAME_NODE_RING_LEN - (index & SAME_NODE_RING_MASK);
+ size_t act_len2 = act_len - act_len1;
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr + (index & SAME_NODE_RING_MASK), buf, act_len1);
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr, (char *)buf + act_len1, act_len2);
+ } else {
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr + (index & SAME_NODE_RING_MASK), buf, act_len);
+ }
+
+ index += act_len;
+ __atomic_store_n(&sock->same_node_tx_ring->sndend, index - 1, __ATOMIC_RELEASE);
+ if (act_len == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return act_len;
+}
+
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
{
if (buf == NULL) {
@@ -813,6 +893,9 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
}
struct lwip_sock *sock = get_socket_by_fd(fd);
+ if (sock->same_node_tx_ring != NULL) {
+ return gazelle_same_node_ring_send(sock, buf, len, flags);
+ }
ssize_t send = write_stack_data(sock, buf, len);
if (send <= 0) {
return send;
@@ -857,22 +940,6 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
return buflen;
}
-static inline void del_data_in_event(struct lwip_sock *sock)
-{
- pthread_spin_lock(&sock->wakeup->event_list_lock);
-
- /* check again avoid cover event add in stack thread */
- if (!NETCONN_IS_DATAIN(sock)) {
- sock->events &= ~EPOLLIN;
-
- if (sock->events == 0) {
- list_del_node_null(&sock->event_list);
- }
- }
-
- pthread_spin_unlock(&sock->wakeup->event_list_lock);
-}
-
static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len)
{
uint32_t tot_len = pbuf->tot_len - free_len;
@@ -906,6 +973,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
return 0;
}
+ if (sock->same_node_rx_ring != NULL) {
+ return gazelle_same_node_ring_recv(sock, buf, len, flags);
+ }
+
while (recv_left > 0) {
if (sock->recv_lastdata) {
pbuf = sock->recv_lastdata;
@@ -962,6 +1033,21 @@ void add_recv_list(int32_t fd)
}
}
+void read_same_node_recv_list(struct protocol_stack *stack)
+{
+ struct list_node *list = &(stack->same_node_recv_list);
+ struct list_node *node, *temp;
+ struct lwip_sock *sock;
+
+ list_for_each_safe(node, temp, list) {
+ sock = container_of(node, struct lwip_sock, recv_list);
+
+ if (sock->same_node_rx_ring != NULL && same_node_ring_count(sock)) {
+ add_sock_event(sock, EPOLLIN);
+ }
+ }
+}
+
void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
{
struct list_node *list = &(stack->recv_list);
@@ -1216,3 +1302,246 @@ void stack_recvlist_count(struct rpc_msg *msg)
msg->result = get_list_count(&stack->recv_list);
}
+
+void netif_poll(struct netif *netif)
+{
+ struct tcp_pcb *pcb = NULL;
+ struct tcp_pcb_listen *pcbl = NULL;
+
+ for (pcb = tcp_active_pcbs; pcb != NULL; pcb = pcb->next) {
+#define NETIF_POLL_READ_COUNT 32
+ struct pbuf *pbufs[NETIF_POLL_READ_COUNT];
+ int ret;
+
+ if (pcb->client_rx_ring != NULL) {
+ ret = rte_ring_sc_dequeue_burst(pcb->client_rx_ring, (void **)pbufs, NETIF_POLL_READ_COUNT, NULL);
+ for (int i = 0; i < ret; i++) {
+ if (ip_input(pbufs[i], netif) != 0) {
+ LSTACK_LOG(INFO, LSTACK, "netif_poll: ip_input return err\n");
+ pbuf_free(pbufs[i]);
+ }
+ }
+ }
+ }
+ for (pcbl = tcp_listen_pcbs.listen_pcbs; pcbl != NULL; pcbl = pcbl->next) {
+ if (pcbl->listen_rx_ring != NULL) {
+ struct pbuf *pbuf;
+ if (rte_ring_sc_dequeue(pcbl->listen_rx_ring, (void **)&pbuf) == 0) {
+ if (ip_input(pbuf, netif) != ERR_OK) {
+ pbuf_free(pbuf);
+ }
+ }
+ }
+ }
+}
+
+/* processes on same node handshake packet use this function */
+err_t netif_loop_output(struct netif *netif, struct pbuf *p)
+{
+ struct tcp_pcb *pcb = p->pcb;
+ struct pbuf *head = NULL;
+
+ if (pcb == NULL || pcb->client_tx_ring == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: pcb is null\n");
+ return ERR_ARG;
+ }
+
+ if (p->next != NULL) {
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: not support chained pbuf\n");
+ return ERR_ARG;
+ }
+
+ struct tcp_hdr *tcp_hdr = (struct tcp_hdr *)((char *)p->payload + sizeof(struct ip_hdr));
+ uint8_t flags = TCPH_FLAGS(tcp_hdr);
+
+ head = pbuf_alloc(0, p->len, PBUF_RAM);
+ if (head == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: pbuf_alloc failed\n");
+ return ERR_MEM;
+ }
+ head->ol_flags = p->ol_flags;
+ memcpy(head->payload, p->payload, p->len);
+
+ if ((flags & TCP_SYN) && !(flags & TCP_ACK)) {
+ /* SYN packet, send to listen_ring */
+ char ring_name[RING_NAME_LEN] = {0};
+ snprintf(ring_name, sizeof(ring_name), "listen_rx_ring_%d", pcb->remote_port);
+ struct rte_ring *ring = rte_ring_lookup(ring_name);
+ if (ring == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "netif_loop_output: cant find listen_rx_ring %d\n", pcb->remote_port);
+ pbuf_free(head);
+ } else {
+ if (rte_ring_mp_enqueue(ring, head) != 0) {
+ LSTACK_LOG(INFO, LSTACK, "enqueue sync packet failed\n");
+ pbuf_free(head);
+ }
+ }
+ } else {
+ /* send other type packet to tx_ring */
+ if (rte_ring_sp_enqueue(pcb->client_tx_ring, head) != 0) {
+ LSTACK_LOG(INFO, LSTACK, "client tx ring full\n");
+ pbuf_free(head);
+ }
+ }
+
+ return ERR_OK;
+}
+
+err_t find_same_node_memzone(struct tcp_pcb *pcb, struct lwip_sock *nsock)
+{
+ char name[RING_NAME_LEN];
+ snprintf(name, sizeof(name), "rte_mz_rx_%u", pcb->remote_port);
+ if ((nsock->same_node_tx_ring_mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ } else {
+ LSTACK_LOG(INFO, LSTACK, "lookup %s success\n", name);
+ }
+ nsock->same_node_tx_ring = (struct same_node_ring *)nsock->same_node_tx_ring_mz->addr;
+
+ snprintf(name, sizeof(name), "rte_mz_buf_rx_%u", pcb->remote_port);
+ if ((nsock->same_node_tx_ring->mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ }
+
+ snprintf(name, sizeof(name), "rte_mz_tx_%u", pcb->remote_port);
+ if ((nsock->same_node_rx_ring_mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ } else {
+ LSTACK_LOG(INFO, LSTACK, "lookup %s success\n", name);
+ }
+ nsock->same_node_rx_ring = (struct same_node_ring *)nsock->same_node_rx_ring_mz->addr;
+
+ snprintf(name, sizeof(name), "rte_mz_buf_tx_%u", pcb->remote_port);
+ if ((nsock->same_node_rx_ring->mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ }
+
+ /* rcvlink init in alloc_socket() */
+ /* remove from g_rcv_process_list in free_socket */
+ list_add_node(&nsock->stack->same_node_recv_list, &nsock->recv_list);
+ return 0;
+}
+
+err_t same_node_memzone_create(const struct rte_memzone **zone, int size, int port, char *name, char *rx)
+{
+ char mem_name[RING_NAME_LEN] = {0};
+ snprintf(mem_name, sizeof(mem_name), "%s_%s_%u", name, rx, port);
+
+ *zone = rte_memzone_reserve_aligned(mem_name, size, rte_socket_id(), 0, RTE_CACHE_LINE_SIZE);
+ if (*zone == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "cannot reserve memzone:%s, errno is %d\n", mem_name, rte_errno);
+ return ERR_MEM;
+ }
+
+ LSTACK_LOG(INFO, LSTACK, "lstack id %d, reserve %s(%p) success, addr is %p, size is %u\n", rte_socket_id(), mem_name, *zone, (*zone)->addr, size);
+
+ return ERR_OK;
+}
+
+err_t same_node_ring_create(struct rte_ring **ring, int size, int port, char *name, char *rx)
+{
+ unsigned flags;
+ char ring_name[RING_NAME_LEN] = {0};
+ if (strcmp(name, "listen") == 0) {
+ flags = RING_F_SC_DEQ;
+ } else {
+ flags = RING_F_SP_ENQ | RING_F_SC_DEQ;
+ }
+
+ snprintf(ring_name, sizeof(ring_name), "%s_%s_ring_%u", name, rx, port);
+ *ring = rte_ring_create(ring_name, size, rte_socket_id(), flags);
+ if (*ring == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "cannot create rte_ring %s, errno is %d\n", ring_name, rte_errno);
+ return ERR_MEM;
+ }
+ LSTACK_LOG(INFO, LSTACK, "lstack socket id:%d, create %s(%p) success\n", rte_socket_id(), ring_name, *ring);
+ return ERR_OK;
+}
+
+static void init_same_node_ring(struct tcp_pcb *pcb)
+{
+ struct netconn *netconn = (struct netconn *)pcb->callback_arg;
+ struct lwip_sock *sock = get_socket(netconn->socket);
+
+ pcb->client_rx_ring = NULL;
+ pcb->client_tx_ring = NULL;
+ pcb->free_ring = 0;
+ sock->same_node_rx_ring = NULL;
+ sock->same_node_rx_ring_mz = NULL;
+ sock->same_node_tx_ring = NULL;
+ sock->same_node_tx_ring_mz = NULL;
+}
+
+#define CLIENT_RING_SIZE 512
+err_t create_same_node_ring(struct tcp_pcb *pcb)
+{
+ struct netconn *netconn = (struct netconn *)pcb->callback_arg;
+ struct lwip_sock *sock = get_socket(netconn->socket);
+
+ if (same_node_ring_create(&pcb->client_rx_ring, CLIENT_RING_SIZE, pcb->local_port, "client", "rx") != 0) {
+ goto END;
+ }
+ if (same_node_ring_create(&pcb->client_tx_ring, CLIENT_RING_SIZE, pcb->local_port, "client", "tx") != 0) {
+ goto END;
+ }
+ pcb->free_ring = 1;
+
+ if (same_node_memzone_create(&sock->same_node_rx_ring_mz, sizeof(struct same_node_ring), pcb->local_port, "rte_mz", "rx") != 0) {
+ goto END;
+ }
+ sock->same_node_rx_ring = (struct same_node_ring*)sock->same_node_rx_ring_mz->addr;
+
+ if (same_node_memzone_create(&sock->same_node_rx_ring->mz, SAME_NODE_RING_LEN, pcb->local_port, "rte_mz_buf", "rx") != 0) {
+ goto END;
+ }
+
+ sock->same_node_rx_ring->sndbegin = 0;
+ sock->same_node_rx_ring->sndend = 0;
+
+ if (same_node_memzone_create(&sock->same_node_tx_ring_mz, sizeof(struct same_node_ring), pcb->local_port, "rte_mz", "tx") != 0) {
+ goto END;
+ }
+ sock->same_node_tx_ring = (struct same_node_ring*)sock->same_node_tx_ring_mz->addr;
+
+ if (same_node_memzone_create(&sock->same_node_tx_ring->mz, SAME_NODE_RING_LEN, pcb->local_port, "rte_mz_buf", "tx") != 0) {
+ goto END;
+ }
+
+ sock->same_node_tx_ring->sndbegin = 0;
+ sock->same_node_tx_ring->sndend = 0;
+
+ return 0;
+END:
+ rte_ring_free(pcb->client_rx_ring);
+ rte_ring_free(pcb->client_tx_ring);
+ rte_memzone_free(sock->same_node_rx_ring->mz);
+ rte_memzone_free(sock->same_node_rx_ring_mz);
+ rte_memzone_free(sock->same_node_tx_ring->mz);
+ rte_memzone_free(sock->same_node_tx_ring_mz);
+ init_same_node_ring(pcb);
+ return ERR_BUF;
+}
+
+err_t find_same_node_ring(struct tcp_pcb *npcb)
+{
+ char name[RING_NAME_LEN] = {0};
+ snprintf(name, sizeof(name), "client_tx_ring_%u", npcb->remote_port);
+ npcb->client_rx_ring = rte_ring_lookup(name);
+ memset(name, 0, sizeof(name));
+ snprintf(name, sizeof(name), "client_rx_ring_%u", npcb->remote_port);
+ npcb->client_tx_ring = rte_ring_lookup(name);
+ npcb->free_ring = 0;
+ if (npcb->client_tx_ring == NULL ||
+ npcb->client_rx_ring == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lookup client rxtx ring failed, port is %d\n", npcb->remote_port);
+ tcp_abandon(npcb, 0);
+ return ERR_CONN;
+ } else {
+ LSTACK_LOG(INFO, LSTACK, "find client_tx_ring_%u and client_rx_ring_%u\n", npcb->remote_port, npcb->remote_port);
+ }
+ return 0;
+}
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 48eff1d..0d7b7f0 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -314,6 +314,7 @@ static int32_t init_stack_value(struct protocol_stack *stack, void *arg)
stack->lwip_stats = &lwip_stats;
init_list_node(&stack->recv_list);
+ init_list_node(&stack->same_node_recv_list);
init_list_node(&stack->wakeup_list);
sys_calibrate_tsc();
@@ -497,6 +498,10 @@ static void* gazelle_stack_thread(void *arg)
gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
+ /* reduce traversal times */
+ if ((wakeup_tick & 0xff) == 0) {
+ read_same_node_recv_list(stack);
+ }
read_recv_list(stack, read_connect_number);
if ((wakeup_tick & 0xf) == 0) {
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index a690adb..55cf769 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -45,5 +45,6 @@ void delete_user_process_port(uint16_t dst_port, enum port_type type);
void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void netif_poll(struct netif *netif);
#endif /* __GAZELLE_ETHDEV_H__ */
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 02110e0..d52a06d 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -14,7 +14,7 @@
#define __GAZELLE_LWIP_H__
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
-#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata))
+#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata) || (sock->same_node_rx_ring != NULL && same_node_ring_count(sock)))
#define NETCONN_IS_DATAOUT(sock) (gazelle_ring_readover_count((sock)->send_ring) || (sock)->send_lastdata || (sock)->send_pre_del)
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
@@ -33,6 +33,7 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len);
ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
+void read_same_node_recv_list(struct protocol_stack *stack);
void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
void add_recv_list(int32_t fd);
void get_lwip_conntable(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index d58c98a..3691250 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -71,6 +71,7 @@ struct protocol_stack {
struct rte_mbuf *pkts[RTE_TEST_RX_DESC_DEFAULT];
struct list_node recv_list;
+ struct list_node same_node_recv_list; /* used for same node processes communication */
struct list_node wakeup_list;
volatile uint16_t conn_num;
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 60ea897..01b1280 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -82,7 +82,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
len = (uint16_t)rte_pktmbuf_data_len(m);
payload = rte_pktmbuf_mtod(m, void *);
pc = mbuf_to_pbuf(m);
- pc->custom_free_function = gazelle_free_pbuf;
next = pbuf_alloced_custom(PBUF_RAW, (uint16_t)len, PBUF_RAM, pc, payload, (uint16_t)len);
if (next == NULL) {
stack->stats.rx_allocmbuf_fail++;
@@ -653,6 +652,7 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
{
uint32_t nr_pkts;
+ netif_poll(&stack->netif);
nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, nic_read_number);
if (nr_pkts == 0) {
return 0;
--
2.23.0

View File

@ -0,0 +1,404 @@
From 40aba0460aade8bb3dd775f459a177487a549384 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Mon, 13 Mar 2023 16:49:25 +0800
Subject: [PATCH] fix send reset by peer when not sleep after connect fix
duplicate port alloc fix poll(NULL, 0, timeout) error fix multi nic, portid
not correct fix add connect failed when transfer pkt to other process fix
kni=0x0 coredump in kni_handle_rx when num_cpus > 1 fix accidental connect
error or slowly
---
src/common/gazelle_opt.h | 2 +
src/lstack/api/lstack_wrap.c | 90 +++++++++++++++++++------
src/lstack/core/lstack_dpdk.c | 6 +-
src/lstack/core/lstack_protocol_stack.c | 34 ++++++----
src/lstack/core/lstack_stack_stat.c | 3 +
src/lstack/include/lstack_ethdev.h | 6 ++
src/lstack/netif/lstack_ethdev.c | 28 ++++----
7 files changed, 124 insertions(+), 45 deletions(-)
diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h
index e278107..4c0eef3 100644
--- a/src/common/gazelle_opt.h
+++ b/src/common/gazelle_opt.h
@@ -94,4 +94,6 @@
#define LSTACK_RECV_THREAD_NAME "lstack_recv"
#define LSTACK_THREAD_NAME "gazellelstack"
+#define SLEEP_US_BEFORE_LINK_UP 10000
+
#endif /* _GAZELLE_OPT_H_ */
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 46cbcec..ecde391 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -12,6 +12,7 @@
#define _GNU_SOURCE
#include <dlfcn.h>
+#include <string.h>
#include <signal.h>
#include <sys/socket.h>
@@ -22,9 +23,11 @@
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <unistd.h>
+#include <net/if.h>
#include <lwip/posix_api.h>
#include <lwip/lwipsock.h>
+#include <lwip/tcp.h>
#include "posix/lstack_epoll.h"
#include "posix/lstack_unistd.h"
@@ -84,6 +87,11 @@ static enum KERNEL_LWIP_PATH select_path(int fd)
return PATH_KERNEL;
}
+ struct tcp_pcb *pcb = sock->conn->pcb.tcp;
+ if (pcb != NULL && pcb->state <= ESTABLISHED) {
+ return PATH_LWIP;
+ }
+
return PATH_UNKNOW;
}
@@ -179,6 +187,28 @@ static int32_t do_accept4(int32_t s, struct sockaddr *addr, socklen_t *addrlen,
return posix_api->accept4_fn(s, addr, addrlen, flags);
}
+#define SIOCGIFADDR 0x8915
+static int get_addr(struct sockaddr_in *sin, char *interface)
+{
+ int sockfd = 0;
+ struct ifreq ifr;
+
+ if ((sockfd = posix_api->socket_fn(AF_INET, SOCK_STREAM, 0)) < 0) return -1;
+
+ memset(&ifr, 0, sizeof(ifr));
+ snprintf(ifr.ifr_name, (sizeof(ifr.ifr_name) - 1), "%s", interface);
+
+ if(posix_api->ioctl_fn(sockfd, SIOCGIFADDR, &ifr) < 0){
+ posix_api->close_fn(sockfd);
+ return -1;
+ }
+ posix_api->close_fn(sockfd);
+
+ memcpy(sin, &ifr.ifr_addr, sizeof(struct sockaddr_in));
+
+ return 0;
+}
+
static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen)
{
if (name == NULL) {
@@ -202,12 +232,36 @@ static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen
bool is_dst_ip_localhost(const struct sockaddr *addr)
{
- struct cfg_params *global_params = get_global_cfg_params();
struct sockaddr_in *servaddr = (struct sockaddr_in *) addr;
- if(global_params->host_addr.addr == servaddr->sin_addr.s_addr){
- return true;
+ FILE *ifh = fopen("/proc/net/dev", "r");
+ char *line = NULL;
+ char *p;
+ size_t linel = 0;
+ int linenum = 0;
+ struct sockaddr_in* sin = malloc(sizeof(struct sockaddr_in));
+
+ while (getdelim(&line, &linel, '\n', ifh) > 0) {
+ if (linenum++ < 2) continue;
+
+ p = line;
+ while (isspace(*p))
+ ++p;
+ int n = strcspn(p, ": \t");
+
+ char interface[20] = {0};
+ strncpy(interface, p, n);
+
+ memset(sin, 0, sizeof(struct sockaddr_in));
+ int ret = get_addr(sin, interface);
+ if (ret == 0) {
+ if(sin->sin_addr.s_addr == servaddr->sin_addr.s_addr){
+ return 1;
+ }
+ }
}
- return false;
+ free(sin);
+
+ return 0;
}
static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t namelen)
@@ -229,22 +283,17 @@ static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t name
GAZELLE_RETURN(EINVAL);
}
- int32_t ret = rpc_call_connect(s, name, namelen);
- if (ret == 0 || errno == EISCONN) {
- return ret;
- }
-
+ int32_t ret = 0;
char listen_ring_name[RING_NAME_LEN];
int remote_port = htons(((struct sockaddr_in *)name)->sin_port);
snprintf(listen_ring_name, sizeof(listen_ring_name), "listen_rx_ring_%u", remote_port);
- if (!is_dst_ip_localhost(name) || rte_ring_lookup(listen_ring_name) == NULL) {
+ if (is_dst_ip_localhost(name) && rte_ring_lookup(listen_ring_name) == NULL) {
ret = posix_api->connect_fn(s, name, namelen);
- if (ret == 0) {
- return ret;
- }
+ } else {
+ ret = rpc_call_connect(s, name, namelen);
}
- return -1;
+ return ret;
}
static inline int32_t do_listen(int32_t s, int32_t backlog)
@@ -253,7 +302,12 @@ static inline int32_t do_listen(int32_t s, int32_t backlog)
return posix_api->listen_fn(s, backlog);
}
- int32_t ret = stack_broadcast_listen(s, backlog);
+ int32_t ret;
+ if (use_ltran() && get_global_cfg_params()->listen_shadow == 0) {
+ ret = stack_single_listen(s, backlog);
+ } else {
+ ret = stack_broadcast_listen(s, backlog);
+ }
if (ret != 0) {
return ret;
}
@@ -467,11 +521,7 @@ static inline int32_t do_close(int32_t s)
static int32_t do_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
{
- if (fds == NULL) {
- GAZELLE_RETURN(EINVAL);
- }
-
- if (unlikely(posix_api->ues_posix) || nfds == 0 || !select_thread_path()) {
+ if (unlikely(posix_api->ues_posix) || fds == NULL || nfds == 0 || !select_thread_path()) {
return posix_api->poll_fn(fds, nfds, timeout);
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index c6d6290..2463d3e 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -116,6 +116,7 @@ int32_t dpdk_eal_init(void)
ret = 0;
} else {
LSTACK_PRE_LOG(LSTACK_ERR, "rte_eal_init failed init, rte_errno %d\n", rte_errno);
+ return ret;
}
} else {
LSTACK_PRE_LOG(LSTACK_INFO, "dpdk_eal_init success\n");
@@ -450,6 +451,7 @@ int32_t dpdk_ethdev_init(void)
if (port_id < 0) {
return port_id;
}
+ get_global_cfg_params()->port_id = port_id;
struct rte_eth_dev_info dev_info;
int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
@@ -477,6 +479,9 @@ int32_t dpdk_ethdev_init(void)
stack_group->port_id = eth_params->port_id;
stack_group->rx_offload = eth_params->conf.rxmode.offloads;
stack_group->tx_offload = eth_params->conf.txmode.offloads;
+ /* used for tcp port alloc */
+ stack_group->reta_mask = dev_info.reta_size - 1;
+ stack_group->nb_queues = nb_queues;
if (get_global_cfg_params()->is_primary) {
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
@@ -511,7 +516,6 @@ int32_t dpdk_ethdev_init(void)
rss_setup(port_id, nb_queues);
stack_group->reta_mask = dev_info.reta_size - 1;
}
- stack_group->nb_queues = nb_queues;
}
return 0;
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 0d7b7f0..5811b26 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -132,20 +132,28 @@ struct protocol_stack *get_bind_protocol_stack(void)
int min_conn_num = GAZELLE_MAX_CLIENTS;
/* close listen shadow, per app communication thread select only one stack */
- for (uint16_t i = 0; i < stack_group->stack_num; i++) {
- struct protocol_stack* stack = stack_group->stacks[i];
- if (get_global_cfg_params()->seperate_send_recv) {
- if (stack->is_send_thread && stack->conn_num < min_conn_num) {
- index = i;
- min_conn_num = stack->conn_num;
- }
- }else {
- if (stack->conn_num < min_conn_num) {
- index = i;
- min_conn_num = stack->conn_num;
+ if (use_ltran() && get_global_cfg_params()->listen_shadow == 0) {
+ static _Atomic uint16_t stack_index = 0;
+ index = atomic_fetch_add(&stack_index, 1);
+ if (index >= stack_group->stack_num) {
+ LSTACK_LOG(ERR, LSTACK, "thread =%hu larger than stack num = %hu\n", index, stack_group->stack_num);
+ return NULL;
+ }
+ } else {
+ for (uint16_t i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack* stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (stack->is_send_thread && stack->conn_num < min_conn_num) {
+ index = i;
+ min_conn_num = stack->conn_num;
+ }
+ }else {
+ if (stack->conn_num < min_conn_num) {
+ index = i;
+ min_conn_num = stack->conn_num;
+ }
}
}
-
}
bind_stack = stack_group->stacks[index];
@@ -426,6 +434,8 @@ static struct protocol_stack *stack_thread_init(void *arg)
wait_sem_value(&stack_group->ethdev_init, 1);
}
+ usleep(SLEEP_US_BEFORE_LINK_UP);
+
if (ethdev_init(stack) != 0) {
free(stack);
return NULL;
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 92d7a39..489b267 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -51,6 +51,9 @@ uint64_t get_current_time(void)
void calculate_lstack_latency(struct gazelle_stack_latency *stack_latency, const struct pbuf *pbuf,
enum GAZELLE_LATENCY_TYPE type)
{
+ if (pbuf == NULL) {
+ return;
+ }
const uint64_t *priv = (uint64_t *)((uint8_t *)(pbuf) + LATENCY_OFFSET);
if (*priv != ~(*(priv + 1)) || *priv < stack_latency->start_time) {
return;
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index 55cf769..25f5b8e 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -26,6 +26,12 @@ enum PACKET_TRANSFER_TYPE{
TRANSFER_CURRENT_THREAD,
};
+enum TRANSFER_MESSAGE_RESULT {
+ CONNECT_ERROR = -2,
+ REPLY_ERROR = -1,
+ TRANSFER_SUCESS = 0,
+};
+
struct protocol_stack;
struct rte_mbuf;
struct lstack_dev_ops {
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 01b1280..5032b5b 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -160,25 +160,23 @@ int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, b
int sockfd;
int ret = 0;
- if ((sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
- return -1;
- }
+ sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0);
memset(&serun, 0, sizeof(serun));
serun.sun_family = AF_UNIX;
sprintf_s(serun.sun_path, PATH_MAX,"%s%d", server_path, process_index);
int len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0){
- return -1;
+ return CONNECT_ERROR;
}
posix_api->write_fn(sockfd, buf, write_len);
if (need_reply) {
char reply_message[REPLY_LEN];
posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
- ret = 0;
+ ret = TRANSFER_SUCESS;
}else {
- ret = -1;
+ ret = REPLY_ERROR;
}
}
posix_api->close_fn(sockfd);
@@ -291,7 +289,7 @@ void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, u
char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u", dst_ip,split_delim, src_port,split_delim,dst_port);
int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
- if(ret != 0){
+ if(ret != TRANSFER_SUCESS){
LSTACK_LOG(ERR, LSTACK,"transfer_delete_rule_info_to_process0 error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
rte_gettid(), dst_ip, src_port, dst_port);
}
@@ -310,7 +308,7 @@ void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, u
sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
dst_ip,split_delim,src_ip,split_delim, dst_port,split_delim,src_port, split_delim,queue_id,split_delim,process_idx);
int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
- if(ret != 0){
+ if(ret != TRANSFER_SUCESS){
LSTACK_LOG(ERR, LSTACK,"transfer_create_rule_info_to_process0 error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u, queue_id %u, process_idx %u\n",
rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
}
@@ -321,7 +319,7 @@ void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_
char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, "%u%s%u%s%u", listen_port,split_delim,process_idx, split_delim, is_add);
int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
- if(ret != 0){
+ if(ret != TRANSFER_SUCESS) {
LSTACK_LOG(ERR, LSTACK,"transfer_add_or_delete_listen_port_to_process0 error. tid %d. listen_port %u, process_idx %u\n",
rte_gettid(), listen_port, process_idx);
}
@@ -416,8 +414,10 @@ void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
char arp_mbuf[LSTACK_MBUF_LEN] = {0};
sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
- if(result < 0){
- LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. \n", i);
+ if(result == CONNECT_ERROR){
+ LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i);
+ }else if (result == REPLY_ERROR) {
+ LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i);
}
}
}
@@ -616,7 +616,11 @@ int distribute_pakages(struct rte_mbuf *mbuf)
void kni_handle_rx(uint16_t port_id)
{
struct rte_mbuf *pkts_burst[PACKET_READ_SIZE];
- uint32_t nb_kni_rx = rte_kni_rx_burst(get_gazelle_kni(), pkts_burst, PACKET_READ_SIZE);
+ struct rte_kni* kni = get_gazelle_kni();
+ uint32_t nb_kni_rx = 0;
+ if (kni) {
+ nb_kni_rx = rte_kni_rx_burst(kni, pkts_burst, PACKET_READ_SIZE);
+ }
if (nb_kni_rx > 0) {
uint16_t nb_rx = rte_eth_tx_burst(port_id, 0, pkts_burst, nb_kni_rx);
for (uint16_t i = nb_rx; i < nb_kni_rx; ++i) {
--
2.23.0

View File

@ -0,0 +1,299 @@
From 9437f113443158cb3bd8aa31c69e40de5ff6c3dc Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Mon, 5 Sep 2022 01:33:52 +0800
Subject: [PATCH] add tuple filter in conf to diff rss rule and tuple filter
rule
---
src/lstack/api/lstack_wrap.c | 2 +-
src/lstack/core/lstack_cfg.c | 13 +++++
src/lstack/core/lstack_dpdk.c | 70 ++++++++++++-------------
src/lstack/core/lstack_protocol_stack.c | 8 ++-
src/lstack/include/lstack_cfg.h | 1 +
src/lstack/include/lstack_ethdev.h | 1 -
src/lstack/include/lstack_vdev.h | 1 -
src/lstack/lstack.conf | 10 +++-
src/lstack/netif/lstack_ethdev.c | 6 +--
src/lstack/netif/lstack_vdev.c | 5 +-
10 files changed, 68 insertions(+), 49 deletions(-)
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index ecde391..1fba81c 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -303,7 +303,7 @@ static inline int32_t do_listen(int32_t s, int32_t backlog)
}
int32_t ret;
- if (use_ltran() && get_global_cfg_params()->listen_shadow == 0) {
+ if (get_global_cfg_params()->listen_shadow == 0) {
ret = stack_single_listen(s, backlog);
} else {
ret = stack_broadcast_listen(s, backlog);
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 72a3292..88f69e1 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -69,6 +69,7 @@ static int32_t parse_num_process(void);
static int32_t parse_process_numa(void);
static int32_t parse_process_index(void);
static int32_t parse_seperate_sendrecv_args(void);
+static int32_t parse_tuple_filter(void);
static inline int32_t parse_int(void *arg, char * arg_string, int32_t default_val,
int32_t min_val, int32_t max_val)
@@ -120,6 +121,7 @@ static struct config_vector_t g_config_tbl[] = {
{ "num_process", parse_num_process },
{ "process_numa", parse_process_numa },
{ "process_idx", parse_process_index },
+ { "tuple_filter", parse_tuple_filter },
{ NULL, NULL }
};
@@ -1030,3 +1032,14 @@ static int parse_process_index(void)
return 0;
}
+static int parse_tuple_filter(void)
+{
+ parse_int(&g_config_params.tuple_filter, "tuple_filter", 0, 0, 1);
+ if (g_config_params.tuple_filter == 0) {
+ return 0;
+ }
+ if (g_config_params.use_ltran || g_config_params.listen_shadow) {
+ return -EINVAL;
+ }
+ return 0;
+}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 2463d3e..2ecfd1d 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -445,44 +445,44 @@ int32_t dpdk_ethdev_init(void)
nb_queues = get_global_cfg_params()->tot_queue_num;
}
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
- int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr);
- if (port_id < 0) {
- return port_id;
- }
- get_global_cfg_params()->port_id = port_id;
+ int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr);
+ if (port_id < 0) {
+ return port_id;
+ }
+ get_global_cfg_params()->port_id = port_id;
- struct rte_eth_dev_info dev_info;
- int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret);
- return ret;
- }
+ struct rte_eth_dev_info dev_info;
+ int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret);
+ return ret;
+ }
- int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues);
- if (max_queues < nb_queues) {
- LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues);
- return -EINVAL;
- }
+ int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues);
+ if (max_queues < nb_queues) {
+ LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues);
+ return -EINVAL;
+ }
+
+ struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues);
+ if (eth_params == NULL) {
+ return -ENOMEM;
+ }
+ eth_params_checksum(&eth_params->conf, &dev_info);
+ int32_t rss_enable = 0;
+ if (!get_global_cfg_params()->tuple_filter) {
+ rss_enable = eth_params_rss(&eth_params->conf, &dev_info);
+ }
+ stack_group->eth_params = eth_params;
+ stack_group->port_id = eth_params->port_id;
+ stack_group->rx_offload = eth_params->conf.rxmode.offloads;
+ stack_group->tx_offload = eth_params->conf.txmode.offloads;
+ /* used for tcp port alloc */
+ stack_group->reta_mask = dev_info.reta_size - 1;
+ stack_group->nb_queues = nb_queues;
- struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues);
- if (eth_params == NULL) {
- return -ENOMEM;
- }
- eth_params_checksum(&eth_params->conf, &dev_info);
- int32_t rss_enable = 0;
- if (use_ltran()) {
- rss_enable = eth_params_rss(&eth_params->conf, &dev_info);
- }
- stack_group->eth_params = eth_params;
- stack_group->port_id = eth_params->port_id;
- stack_group->rx_offload = eth_params->conf.rxmode.offloads;
- stack_group->tx_offload = eth_params->conf.txmode.offloads;
- /* used for tcp port alloc */
- stack_group->reta_mask = dev_info.reta_size - 1;
- stack_group->nb_queues = nb_queues;
-
if (get_global_cfg_params()->is_primary) {
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
struct protocol_stack *stack = stack_group->stacks[i];
@@ -512,7 +512,7 @@ int32_t dpdk_ethdev_init(void)
return ret;
}
- if (rss_enable && use_ltran()) {
+ if (rss_enable && !get_global_cfg_params()->tuple_filter) {
rss_setup(port_id, nb_queues);
stack_group->reta_mask = dev_info.reta_size - 1;
}
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 5811b26..4be981a 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -132,10 +132,10 @@ struct protocol_stack *get_bind_protocol_stack(void)
int min_conn_num = GAZELLE_MAX_CLIENTS;
/* close listen shadow, per app communication thread select only one stack */
- if (use_ltran() && get_global_cfg_params()->listen_shadow == 0) {
+ if (get_global_cfg_params()->listen_shadow == 0) {
static _Atomic uint16_t stack_index = 0;
- index = atomic_fetch_add(&stack_index, 1);
- if (index >= stack_group->stack_num) {
+ index = atomic_fetch_add(&stack_index, 1);
+ if (index >= stack_group->stack_num) {
LSTACK_LOG(ERR, LSTACK, "thread =%hu larger than stack num = %hu\n", index, stack_group->stack_num);
return NULL;
}
@@ -645,8 +645,6 @@ int32_t init_protocol_stack(void)
struct sys_thread *thread = sys_thread_new(name, libnet_listen_thread, (void*)(&stack_group->sem_listen_thread), 0, 0);
free(thread);
sem_wait(&stack_group->sem_listen_thread);
-
- create_flow_rule_map();
}
if (get_init_fail()) {
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 942c0b7..5f8e6b3 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -103,6 +103,7 @@ struct cfg_params {
char unix_socket_filename[NAME_MAX];
uint16_t send_ring_size;
bool expand_send_ring;
+ bool tuple_filter;
};
struct cfg_params *get_global_cfg_params(void);
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index 25f5b8e..39a972d 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -45,7 +45,6 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack);
int recv_pkts_from_other_process(int process_index, void* arg);
-void create_flow_rule_map();
void kni_handle_rx(uint16_t port_id);
void delete_user_process_port(uint16_t dst_port, enum port_type type);
void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h
index 0995277..540a31a 100644
--- a/src/lstack/include/lstack_vdev.h
+++ b/src/lstack/include/lstack_vdev.h
@@ -22,7 +22,6 @@ void vdev_dev_ops_init(struct lstack_dev_ops *dev_ops);
int vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple);
int recv_pkts_from_other_process(int process_index, void* arg);
-void create_flow_rule_map();
void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add);
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index 389a81c..64a2f42 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -50,6 +50,14 @@ mask_addr="255.255.255.0"
gateway_addr="192.168.1.1"
devices="aa:bb:cc:dd:ee:ff"
-num_process=2
+#0: use rss rule
+#1: use tcp tuple rule to specify packet to nic queue
+tuple_filter=0
+
+#tuple_filter=1, below cfg valid
+num_process=1
process_numa="0,1"
process_idx=0
+
+#tuple_filer=0, below cfg valid
+listen_shadow=0
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 5032b5b..5ec211d 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -530,7 +530,6 @@ int recv_pkts_from_other_process(int process_index, void* arg){
parse_arp_and_transefer(buf);
}else if(n == TRANSFER_TCP_MUBF_LEN) {
/* tcp. lstack_mbuf_queue_id */
- printf("recv_pkts_from_other_process, process idx %d \n ", process_index);
parse_tcp_and_transefer(buf);
}else if (n == DELETE_FLOWS_PARAMS_LENGTH) {
/* delete rule */
@@ -645,7 +644,6 @@ void kni_handle_tx(struct rte_mbuf *mbuf)
ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
}
- // 发送到内核协议栈
if (!rte_kni_tx_burst(get_gazelle_kni(), &mbuf, 1)) {
rte_pktmbuf_free(mbuf);
}
@@ -680,8 +678,8 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
transfer_arp_to_other_process(stack->pkts[i]);
transfer_type = TRANSFER_KERNEL;
}
- }else {
- if (!use_ltran_flag && stack->queue_id == 0) {
+ } else {
+ if (!use_ltran_flag && get_global_cfg_params()->tuple_filter && stack->queue_id == 0) {
transfer_type = distribute_pakages(stack->pkts[i]);
}
}
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 1752853..2a4c6ac 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -151,7 +151,7 @@ int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
return -1;
}
- if (!use_ltran()) {
+ if (!use_ltran() & get_global_cfg_params()->tuple_filter) {
if(type == REG_RING_TCP_LISTEN_CLOSE){
if (get_global_cfg_params()->is_primary) {
delete_user_process_port(qtuple->src_port, PORT_LISTEN);
@@ -190,6 +190,9 @@ int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
}
return 0;
}
+ if (!use_ltran()) {
+ return 0;
+ }
int32_t ret;
--
2.23.0

View File

@ -0,0 +1,40 @@
From bfc44497f434c275dcaee4e82f7edbbdf30d7eab Mon Sep 17 00:00:00 2001
From: kircher <majun65@huawei.com>
Date: Tue, 14 Mar 2023 21:39:03 +0800
Subject: [PATCH] disable tso without ipv4 checksum
---
src/common/dpdk_common.c | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/common/dpdk_common.c b/src/common/dpdk_common.c
index 2938a25..f5a20dd 100644
--- a/src/common/dpdk_common.c
+++ b/src/common/dpdk_common.c
@@ -118,6 +118,11 @@ void eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev
COMMON_INFO("DEV_TX_OFFLOAD_TCP_CKSUM\n");
}
+ if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_TSO) {
+ tx_ol |= (DEV_TX_OFFLOAD_TCP_TSO | DEV_TX_OFFLOAD_MULTI_SEGS);
+ COMMON_INFO("DEV_TX_OFFLOAD_TCP_TSO\n");
+ }
+
if (!(rx_ol & DEV_RX_OFFLOAD_TCP_CKSUM) || !(rx_ol & DEV_RX_OFFLOAD_IPV4_CKSUM)) {
rx_ol = 0;
}
@@ -125,11 +130,6 @@ void eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev
tx_ol = 0;
}
- if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_TSO) {
- tx_ol |= (DEV_TX_OFFLOAD_TCP_TSO | DEV_TX_OFFLOAD_MULTI_SEGS);
- COMMON_INFO("DEV_TX_OFFLOAD_TCP_TSO\n");
- }
-
conf->rxmode.offloads = rx_ol;
conf->txmode.offloads = tx_ol;
--
2.23.0

View File

@ -0,0 +1,190 @@
From 9b42f69b52aa335d5be445e7de73dfca242ee66a Mon Sep 17 00:00:00 2001
From: jiangheng12 <jiangheng14@huawei.com>
Date: Tue, 14 Mar 2023 21:07:15 +0800
Subject: [PATCH 210/210] support tuple rule add/delete
---
src/lstack/api/lstack_wrap.c | 3 +-
src/lstack/core/lstack_protocol_stack.c | 2 +-
src/lstack/netif/lstack_ethdev.c | 72 ++++++++++++++++++++++---
src/lstack/netif/lstack_vdev.c | 7 ++-
4 files changed, 74 insertions(+), 10 deletions(-)
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 1fba81c..9a021d7 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -303,7 +303,8 @@ static inline int32_t do_listen(int32_t s, int32_t backlog)
}
int32_t ret;
- if (get_global_cfg_params()->listen_shadow == 0) {
+ if (!get_global_cfg_params()->tuple_filter &&
+ !get_global_cfg_params()->listen_shadow) {
ret = stack_single_listen(s, backlog);
} else {
ret = stack_broadcast_listen(s, backlog);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 4be981a..5e510bd 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -132,7 +132,7 @@ struct protocol_stack *get_bind_protocol_stack(void)
int min_conn_num = GAZELLE_MAX_CLIENTS;
/* close listen shadow, per app communication thread select only one stack */
- if (get_global_cfg_params()->listen_shadow == 0) {
+ if (!get_global_cfg_params()->tuple_filter && !get_global_cfg_params()->listen_shadow) {
static _Atomic uint16_t stack_index = 0;
index = atomic_fetch_add(&stack_index, 1);
if (index >= stack_group->stack_num) {
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 5ec211d..532f006 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -26,6 +26,7 @@
#include <securec.h>
#include <rte_jhash.h>
+#include <uthash.h>
#include "lstack_cfg.h"
#include "lstack_vdev.h"
@@ -114,7 +115,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
}
}
-
int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
@@ -148,9 +148,49 @@ int32_t eth_dev_poll(void)
return nr_pkts;
}
-void init_listen_and_user_ports(){
- memset(g_user_ports, INVAILD_PROCESS_IDX, sizeof(g_user_ports));
- memset(g_listen_ports, INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
+/* flow rule map */
+#define RULE_KEY_LEN 22
+struct flow_rule {
+ char rule_key[RULE_KEY_LEN];
+ struct rte_flow *flow;
+ UT_hash_handle hh;
+};
+
+static uint16_t g_flow_num = 0;
+struct flow_rule *g_flow_rules = NULL;
+struct flow_rule *find_rule(char *rule_key)
+{
+ struct flow_rule *fl;
+ HASH_FIND_STR(g_flow_rules, rule_key, fl);
+ return fl;
+}
+
+void add_rule(char* rule_key, struct rte_flow *flow)
+{
+ struct flow_rule *rule;
+ HASH_FIND_STR(g_flow_rules, rule_key, rule);
+ if (rule == NULL) {
+ rule = (struct flow_rule*)malloc(sizeof(struct flow_rule));
+ strcpy(rule->rule_key, rule_key);
+ HASH_ADD_STR(g_flow_rules, rule_key, rule);
+ }
+ rule->flow = flow;
+}
+
+void delete_rule(char* rule_key)
+{
+ struct flow_rule *rule = NULL;
+ HASH_FIND_STR(g_flow_rules, rule_key, rule);
+ if (rule == NULL) {
+ HASH_DEL(g_flow_rules, rule);
+ free(rule);
+ }
+}
+
+void init_listen_and_user_ports(void)
+{
+ memset_s(g_user_ports, sizeof(g_user_ports), INVAILD_PROCESS_IDX, sizeof(g_user_ports));
+ memset_s(g_listen_ports, sizeof(g_listen_ports), INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
}
int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply)
@@ -258,6 +298,12 @@ create_flow_director(uint16_t port_id, uint16_t queue_id, uint32_t src_ip, uint3
void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port){
uint16_t port_id = get_port_id();
+ char rule_key[RULE_KEY_LEN];
+ sprintf(rule_key,"%u_%u_%u",src_ip,src_port,dst_port);
+ struct flow_rule *fl_exist = find_rule(rule_key);
+ if(fl_exist != NULL){
+ return;
+ }
LSTACK_LOG(INFO, LSTACK, "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs :%u \n",
queue_id, src_ip,ntohs(src_port), ntohs(dst_port) );
@@ -269,12 +315,26 @@ void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, u
queue_id, src_ip,src_port,dst_port,ntohs(dst_port), error.type, error.message ? error.message : "(no stated reason)");
return;
}
+ __sync_fetch_and_add(&g_flow_num, 1);
+ add_rule(rule_key, flow);
}
void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
{
uint16_t port_id = get_port_id();
- (void)port_id;
+ char rule_key[RULE_KEY_LEN];
+ sprintf(rule_key,"%u_%u_%u",dst_ip,dst_port,src_port);
+ struct flow_rule *fl = find_rule(rule_key);
+
+ if(fl != NULL){
+ struct rte_flow_error error;
+ int ret = rte_flow_destroy(port_id, fl->flow, &error);
+ if(ret != 0){
+ LSTACK_LOG(ERR, PORT,"Flow can't be delete %d message: %s\n",error.type,error.message ? error.message : "(no stated reason)");
+ }
+ delete_rule(rule_key);
+ __sync_fetch_and_sub(&g_flow_num, 1);
+ }
}
/*
@@ -679,7 +739,7 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
transfer_type = TRANSFER_KERNEL;
}
} else {
- if (!use_ltran_flag && get_global_cfg_params()->tuple_filter && stack->queue_id == 0) {
+ if (get_global_cfg_params()->tuple_filter && stack->queue_id == 0) {
transfer_type = distribute_pakages(stack->pkts[i]);
}
}
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 2a4c6ac..ba0db39 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -151,7 +151,7 @@ int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
return -1;
}
- if (!use_ltran() & get_global_cfg_params()->tuple_filter) {
+ if (!use_ltran() && get_global_cfg_params()->tuple_filter) {
if(type == REG_RING_TCP_LISTEN_CLOSE){
if (get_global_cfg_params()->is_primary) {
delete_user_process_port(qtuple->src_port, PORT_LISTEN);
@@ -163,7 +163,10 @@ int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
if (type == REG_RING_TCP_CONNECT_CLOSE) {
if (get_global_cfg_params()->is_primary) {
delete_user_process_port(qtuple->src_port, PORT_CONNECT);
- delete_flow_director(qtuple->dst_ip, qtuple->src_port, qtuple->dst_port);
+ uint16_t queue_id = get_protocol_stack()->queue_id;
+ if (queue_id != 0) {
+ delete_flow_director(qtuple->dst_ip, qtuple->src_port, qtuple->dst_port);
+ }
}else{
transfer_delete_rule_info_to_process0(qtuple->dst_ip,qtuple->src_port,qtuple->dst_port);
}
--
2.23.0

View File

@ -0,0 +1,220 @@
From 2c3000bba21c4c636860498530b57e842f56480f Mon Sep 17 00:00:00 2001
From: Lemmy Huang <huangliming5@huawei.com>
Date: Thu, 16 Mar 2023 11:40:29 +0800
Subject: [PATCH] refactor mbuf private data
Signed-off-by: Lemmy Huang <huangliming5@huawei.com>
---
src/common/dpdk_common.h | 49 ++++++++++++++++++-----------
src/lstack/core/lstack_dpdk.c | 2 +-
src/lstack/core/lstack_stack_stat.c | 12 ++++---
src/ltran/CMakeLists.txt | 3 +-
src/ltran/ltran_ethdev.c | 4 +--
src/ltran/ltran_forward.c | 14 ++++-----
6 files changed, 50 insertions(+), 34 deletions(-)
diff --git a/src/common/dpdk_common.h b/src/common/dpdk_common.h
index 1305819..6b107ae 100644
--- a/src/common/dpdk_common.h
+++ b/src/common/dpdk_common.h
@@ -16,32 +16,44 @@
#include <stdbool.h>
#include <rte_mbuf.h>
#include <rte_ring.h>
+#include <lwip/pbuf.h>
#include "gazelle_opt.h"
#define GAZELLE_KNI_NAME "kni" // will be removed during dpdk update
+
/* Layout:
- * | rte_mbuf | pbuf_custom| tcp_seg | gazelle_prive | payload |
- * | 128 | 64 | 32 | 16 |
- * rte_prefetch0 in lwip project,tcp_out.c,tcp_output_segment use constants
- * cacheline is 64, make sure pbuf_custom in same cacheline
+ * | rte_mbuf | mbuf_private | payload |
+ * | 128 | | |
**/
-struct pbuf;
-#define LATENCY_TIMESTAMP_SIZE (sizeof(uint64_t) * 2)
-#define MBUF_PRIVATE_SIZE 128
-#define LATENCY_OFFSET 96
-static __rte_always_inline uint64_t *mbuf_to_private(struct rte_mbuf *mbuf)
+struct latency_timestamp {
+ uint64_t stamp; // time stamp
+ uint64_t check; // just for later vaild check
+};
+struct mbuf_private {
+ /* struct pbuf_custom must at first */
+ struct pbuf_custom pc;
+ /* don't use `struct tcp_seg` directly to avoid conflicts by include lwip tcp header */
+ char ts[32]; // 32 > sizeof(struct tcp_seg)
+ struct latency_timestamp lt;
+};
+
+static __rte_always_inline struct mbuf_private *mbuf_to_private(const struct rte_mbuf *m)
+{
+ return (struct mbuf_private *)RTE_PTR_ADD(m, sizeof(struct rte_mbuf));
+}
+static __rte_always_inline struct pbuf_custom *mbuf_to_pbuf(const struct rte_mbuf *m)
{
- return (uint64_t *)((uint8_t *)(mbuf) + sizeof(struct rte_mbuf) + LATENCY_OFFSET);
+ return &mbuf_to_private(m)->pc;
}
-static __rte_always_inline struct rte_mbuf *pbuf_to_mbuf(struct pbuf *p)
+static __rte_always_inline struct rte_mbuf *pbuf_to_mbuf(const struct pbuf *p)
{
- return ((struct rte_mbuf *)(void *)((uint8_t *)(p) - sizeof(struct rte_mbuf)));
+ return (struct rte_mbuf *)RTE_PTR_SUB(p, sizeof(struct rte_mbuf));
}
-static __rte_always_inline struct pbuf_custom *mbuf_to_pbuf(struct rte_mbuf *m)
+static __rte_always_inline struct mbuf_private *pbuf_to_private(const struct pbuf *p)
{
- return ((struct pbuf_custom *)((uint8_t *)(m) + sizeof(struct rte_mbuf)));
+ return mbuf_to_private(pbuf_to_mbuf(p));
}
/* NOTE!!! magic code, even the order.
@@ -69,15 +81,16 @@ static __rte_always_inline void copy_mbuf(struct rte_mbuf *dst, struct rte_mbuf
// copy private date.
dst_data = (uint8_t *)mbuf_to_private(dst);
src_data = (uint8_t *)mbuf_to_private(src);
- rte_memcpy(dst_data, src_data, LATENCY_TIMESTAMP_SIZE);
+ rte_memcpy(dst_data, src_data, sizeof(struct mbuf_private));
}
static __rte_always_inline void time_stamp_into_mbuf(uint32_t rx_count, struct rte_mbuf *buf[], uint64_t time_stamp)
{
+ struct latency_timestamp *lt;
for (uint32_t i = 0; i < rx_count; i++) {
- uint64_t *priv = mbuf_to_private(buf[i]);
- *priv = time_stamp; // time stamp
- *(priv + 1) = ~(*priv); // just for later vaid check
+ lt = &mbuf_to_private(buf[i])->lt;
+ lt->stamp = time_stamp;
+ lt->check = ~(time_stamp);
}
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 2ecfd1d..7ded9e9 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -146,7 +146,7 @@ struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
}
/* time stamp before pbuf_custom as priv_data */
- uint16_t private_size = RTE_ALIGN(MBUF_PRIVATE_SIZE, RTE_CACHE_LINE_SIZE);
+ uint16_t private_size = RTE_ALIGN(sizeof(struct mbuf_private), RTE_CACHE_LINE_SIZE);
pool = rte_pktmbuf_pool_create(pool_name, nb_mbuf, mbuf_cache_size, private_size, MBUF_SZ, rte_socket_id());
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 489b267..eed7fbf 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -51,16 +51,18 @@ uint64_t get_current_time(void)
void calculate_lstack_latency(struct gazelle_stack_latency *stack_latency, const struct pbuf *pbuf,
enum GAZELLE_LATENCY_TYPE type)
{
+ uint64_t latency;
+ const struct latency_timestamp *lt;
+
if (pbuf == NULL) {
return;
}
- const uint64_t *priv = (uint64_t *)((uint8_t *)(pbuf) + LATENCY_OFFSET);
- if (*priv != ~(*(priv + 1)) || *priv < stack_latency->start_time) {
+
+ lt = &pbuf_to_private(pbuf)->lt;
+ if (lt->stamp != ~(lt->check) || lt->stamp < stack_latency->start_time) {
return;
}
-
- uint64_t latency = get_current_time();
- latency = latency - *priv;
+ latency = get_current_time() - lt->stamp;
struct stack_latency *latency_stat = (type == GAZELLE_LATENCY_LWIP) ?
&stack_latency->lwip_latency : &stack_latency->read_latency;
diff --git a/src/ltran/CMakeLists.txt b/src/ltran/CMakeLists.txt
index 1c82dae..f37a232 100644
--- a/src/ltran/CMakeLists.txt
+++ b/src/ltran/CMakeLists.txt
@@ -12,6 +12,7 @@ cmake_minimum_required(VERSION 3.12.1)
project(ltran)
set(COMMON_DIR ${PROJECT_SOURCE_DIR}/../common)
+set(LWIP_DIR /usr/include/lwip)
set(CMAKE_VERBOSE_MAKEFILE ON)
if (CMAKE_C_COMPILER_ID STREQUAL "GNU")
@@ -31,7 +32,7 @@ add_executable(ltran main.c ltran_param.c ltran_config.c ltran_ethdev.c ltran_st
ltran_forward.c ltran_timer.c ${COMMON_DIR}/gazelle_dfx_msg.c ${COMMON_DIR}/dpdk_common.c
${COMMON_DIR}/gazelle_parse_config.c)
-target_include_directories(ltran PRIVATE ${COMMON_DIR} ${PROJECT_SOURCE_DIR})
+target_include_directories(ltran PRIVATE ${COMMON_DIR} ${PROJECT_SOURCE_DIR} ${LWIP_DIR})
target_compile_options(ltran PRIVATE -march=native -fno-strict-aliasing -D__ARM_FEATURE_CRC32=1 -DRTE_MACHINE_CPUFLAG_NEON
-DRTE_MACHINE_CPUFLAG_CRC32 -DRTE_MACHINE_CPUFLAG_PMULL -DRTE_MACHINE_CPUFLAG_AES
-DRTE_MACHINE_CPUFLAG_SHA1 -DRTE_MACHINE_CPUFLAG_SHA2 -include rte_config.h
diff --git a/src/ltran/ltran_ethdev.c b/src/ltran/ltran_ethdev.c
index e0c824a..e2eb4a8 100644
--- a/src/ltran/ltran_ethdev.c
+++ b/src/ltran/ltran_ethdev.c
@@ -147,7 +147,7 @@ static struct rte_mempool *ltran_create_rx_mbuf_pool(uint32_t bond_port_index)
return NULL;
}
- uint16_t private_size = RTE_ALIGN(MBUF_PRIVATE_SIZE, RTE_CACHE_LINE_SIZE);
+ uint16_t private_size = RTE_ALIGN(sizeof(struct mbuf_private), RTE_CACHE_LINE_SIZE);
return rte_pktmbuf_pool_create(mbuf_pool_name, num_mbufs, GAZELLE_MBUFS_CACHE_SIZE, private_size,
RTE_MBUF_DEFAULT_BUF_SIZE, (int32_t)rte_socket_id());
}
@@ -166,7 +166,7 @@ static struct rte_mempool *ltran_create_tx_mbuf_pool(uint32_t bond_port_index)
return NULL;
}
- uint16_t private_size = RTE_ALIGN(MBUF_PRIVATE_SIZE, RTE_CACHE_LINE_SIZE);
+ uint16_t private_size = RTE_ALIGN(sizeof(struct mbuf_private), RTE_CACHE_LINE_SIZE);
return rte_pktmbuf_pool_create(mbuf_pool_name, num_mbufs, GAZELLE_MBUFS_CACHE_SIZE, private_size,
RTE_MBUF_DEFAULT_BUF_SIZE, (int32_t)rte_socket_id());
}
diff --git a/src/ltran/ltran_forward.c b/src/ltran/ltran_forward.c
index 4d9c1bb..8629acb 100644
--- a/src/ltran/ltran_forward.c
+++ b/src/ltran/ltran_forward.c
@@ -50,22 +50,22 @@ static __rte_always_inline struct gazelle_stack *get_kni_stack(void)
static void calculate_ltran_latency(struct gazelle_stack *stack, const struct rte_mbuf *mbuf)
{
+ struct latency_timestamp *lt;
uint64_t latency;
- uint64_t *priv = NULL;
- priv = (uint64_t *)RTE_PTR_ADD(mbuf, sizeof(struct rte_mbuf) + LATENCY_OFFSET);
- // priv--time stamp priv+1 --- vaild check
- if (*priv != ~(*(priv + 1))) {
+ lt = &mbuf_to_private(mbuf)->lt;
+ // vaild check
+ if (lt->stamp != ~(lt->check)) {
return;
}
// time stamp must > start time
- if (*priv < get_start_time_stamp()) {
- *priv = 0;
+ if (lt->stamp < get_start_time_stamp()) {
+ lt->stamp = 0;
return;
}
- latency = get_current_time() - *priv;
+ latency = get_current_time() - lt->stamp;
stack->stack_stats.latency_total += latency;
stack->stack_stats.latency_pkts++;
--
2.23.0

View File

@ -2,7 +2,7 @@
Name: gazelle
Version: 1.0.1
Release: 51
Release: 52
Summary: gazelle is a high performance user-mode stack
License: MulanPSL-2.0
URL: https://gitee.com/openeuler/gazelle
@ -10,7 +10,7 @@ Source0: %{name}-%{version}.tar.gz
BuildRequires: cmake gcc-c++ lwip
BuildRequires: dpdk-devel >= 21.11-5
BuildRequires: numactl-devel libpcap-devel libconfig-devel libboundscheck
BuildRequires: numactl-devel libpcap-devel libconfig-devel libboundscheck uthash-devel
Requires: dpdk >= 21.11-5
Requires: numactl libpcap libconfig libboundscheck
@ -217,6 +217,15 @@ Patch9199: 0199-remove-rxtx-driver-cache.patch
Patch9200: 0200-send-ring-size-is-configure.patch
Patch9201: 0201-send-should-return-1-errno-EAGAIN-when-ring-full.patch
Patch9202: 0202-when-send-ring-full-whether-dynamic-alloc-mbuf-is-co.patch
Patch9203: 0203-add-pbuf-lock-when-aggregate-pbuf.patch
Patch9204: 0204-supprot-multi-process.patch
Patch9205: 0205-add-gazellectl-x-to-show-nic-stats.patch
Patch9206: 0206-add-same-node-ring-for-inter-proces-communication.patch
Patch9207: 0207-fix-send-reset-by-peer-when-not-sleep-after-connect.patch
Patch9208: 0208-add-tuple-filter-in-conf-to-diff-rss-rule-and-tuple-.patch
Patch9209: 0210-disable-tso-without-ipv4-checksum.patch
Patch9210: 0210-support-tuple-rule-add-delete.patch
Patch9211: 0211-refactor-mbuf-private-data.patch
%description
%{name} is a high performance user-mode stack.
@ -257,6 +266,17 @@ install -Dpm 0640 %{_builddir}/%{name}-%{version}/src/ltran/ltran.conf %{b
%config(noreplace) %{conf_path}/ltran.conf
%changelog
* Sat Mar 18 2023 jiangheng<jiangheng14@huawei.com> - 1.0.1-52
- add pbuf lock when aggregate pbuf
- support multi process
- add gazellectl -x -a args
- add same node ring for iner-process communication
- fix send reset by peer when not sleep after connect
- add tuple filter in conf to diff rss rule adn tuple filter rule
- support tuple rule add/delete
- disable tso without ipv4 checksum
- refactor mbuf private data
* Sat Mar 11 2023 kircher <majun65@huawei.com> - 1.0.1-51
- when send ring full whether dynamic alloc mbuf is configurable reduce cpu usage when send ring full
- send should return -1, errno EAGAIN when ring full