591 lines
22 KiB
Diff
591 lines
22 KiB
Diff
From a8fbb2cc4f9367e4a83c3611e7a7bdb821504015 Mon Sep 17 00:00:00 2001
|
|
From: jiangheng <jiangheng14@huawei.com>
|
|
Date: Mon, 13 Mar 2023 16:08:13 +0800
|
|
Subject: [PATCH] add same node ring for inter-proces communication
|
|
|
|
---
|
|
src/lstack/api/lstack_wrap.c | 21 +-
|
|
src/lstack/core/lstack_lwip.c | 377 +++++++++++++++++++--
|
|
src/lstack/core/lstack_protocol_stack.c | 5 +
|
|
src/lstack/include/lstack_ethdev.h | 1 +
|
|
src/lstack/include/lstack_lwip.h | 3 +-
|
|
src/lstack/include/lstack_protocol_stack.h | 1 +
|
|
src/lstack/netif/lstack_ethdev.c | 2 +-
|
|
7 files changed, 381 insertions(+), 29 deletions(-)
|
|
|
|
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
|
|
index 561c6e4..46cbcec 100644
|
|
--- a/src/lstack/api/lstack_wrap.c
|
|
+++ b/src/lstack/api/lstack_wrap.c
|
|
@@ -200,6 +200,16 @@ static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen
|
|
return rpc_call_bind(s, name, namelen);
|
|
}
|
|
|
|
+bool is_dst_ip_localhost(const struct sockaddr *addr)
|
|
+{
|
|
+ struct cfg_params *global_params = get_global_cfg_params();
|
|
+ struct sockaddr_in *servaddr = (struct sockaddr_in *) addr;
|
|
+ if(global_params->host_addr.addr == servaddr->sin_addr.s_addr){
|
|
+ return true;
|
|
+ }
|
|
+ return false;
|
|
+}
|
|
+
|
|
static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t namelen)
|
|
{
|
|
if (name == NULL) {
|
|
@@ -224,9 +234,14 @@ static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t name
|
|
return ret;
|
|
}
|
|
|
|
- ret = posix_api->connect_fn(s, name, namelen);
|
|
- if (ret == 0) {
|
|
- return ret;
|
|
+ char listen_ring_name[RING_NAME_LEN];
|
|
+ int remote_port = htons(((struct sockaddr_in *)name)->sin_port);
|
|
+ snprintf(listen_ring_name, sizeof(listen_ring_name), "listen_rx_ring_%u", remote_port);
|
|
+ if (!is_dst_ip_localhost(name) || rte_ring_lookup(listen_ring_name) == NULL) {
|
|
+ ret = posix_api->connect_fn(s, name, namelen);
|
|
+ if (ret == 0) {
|
|
+ return ret;
|
|
+ }
|
|
}
|
|
|
|
return -1;
|
|
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
|
|
index 063eea4..60abfe8 100644
|
|
--- a/src/lstack/core/lstack_lwip.c
|
|
+++ b/src/lstack/core/lstack_lwip.c
|
|
@@ -19,6 +19,7 @@
|
|
#include <lwip/pbuf.h>
|
|
#include <lwip/priv/tcp_priv.h>
|
|
#include <lwip/posix_api.h>
|
|
+#include <lwip/tcp.h>
|
|
#include <securec.h>
|
|
#include <rte_errno.h>
|
|
#include <rte_malloc.h>
|
|
@@ -106,7 +107,6 @@ static void reset_sock_data(struct lwip_sock *sock)
|
|
static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, uint16_t length, pbuf_type type)
|
|
{
|
|
struct pbuf_custom *pbuf_custom = mbuf_to_pbuf(mbuf);
|
|
- pbuf_custom->custom_free_function = gazelle_free_pbuf;
|
|
|
|
void *data = rte_pktmbuf_mtod(mbuf, void *);
|
|
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
|
|
@@ -229,18 +229,11 @@ void gazelle_free_pbuf(struct pbuf *pbuf)
|
|
|
|
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num)
|
|
{
|
|
- struct pbuf_custom *pbuf_custom = NULL;
|
|
-
|
|
int32_t ret = rte_pktmbuf_alloc_bulk(pool, mbufs, num);
|
|
if (ret != 0) {
|
|
return ret;
|
|
}
|
|
|
|
- for (uint32_t i = 0; i < num; i++) {
|
|
- pbuf_custom = mbuf_to_pbuf(mbufs[i]);
|
|
- pbuf_custom->custom_free_function = gazelle_free_pbuf;
|
|
- }
|
|
-
|
|
return 0;
|
|
}
|
|
|
|
@@ -802,6 +795,93 @@ static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t
|
|
}
|
|
}
|
|
|
|
+static inline void del_data_in_event(struct lwip_sock *sock)
|
|
+{
|
|
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
|
|
+
|
|
+ /* check again avoid cover event add in stack thread */
|
|
+ if (!NETCONN_IS_DATAIN(sock)) {
|
|
+ sock->events &= ~EPOLLIN;
|
|
+
|
|
+ if (sock->events == 0) {
|
|
+ list_del_node_null(&sock->event_list);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
|
|
+}
|
|
+
|
|
+/* process on same node use ring to recv data */
|
|
+ssize_t gazelle_same_node_ring_recv(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
|
|
+{
|
|
+ unsigned long long cur_begin = sock->same_node_rx_ring->sndbegin;
|
|
+ unsigned long long cur_end;
|
|
+ unsigned long long index = cur_begin + 1;
|
|
+ size_t act_len = 0;
|
|
+
|
|
+ cur_end = __atomic_load_n(&sock->same_node_rx_ring->sndend, __ATOMIC_ACQUIRE);
|
|
+ if (cur_begin == cur_end) {
|
|
+ errno = EAGAIN;
|
|
+ act_len = -1;
|
|
+ goto END;
|
|
+ }
|
|
+
|
|
+ act_len = cur_end - index + 1;
|
|
+ act_len = RTE_MIN(act_len, len);
|
|
+
|
|
+ if ((index & SAME_NODE_RING_MASK) + act_len > SAME_NODE_RING_LEN) {
|
|
+ size_t act_len1 = SAME_NODE_RING_LEN - (index & SAME_NODE_RING_MASK);
|
|
+ size_t act_len2 = act_len - act_len1;
|
|
+ rte_memcpy((char *)buf, (char *)sock->same_node_rx_ring->mz->addr + (index & SAME_NODE_RING_MASK), act_len1);
|
|
+ rte_memcpy((char *)buf + act_len1, (char *)sock->same_node_rx_ring->mz->addr, act_len2);
|
|
+ } else {
|
|
+ rte_memcpy((char *)buf, (char *)sock->same_node_rx_ring->mz->addr + (index & SAME_NODE_RING_MASK), act_len);
|
|
+ }
|
|
+
|
|
+ index += act_len;
|
|
+ __atomic_store_n(&sock->same_node_rx_ring->sndbegin, index - 1, __ATOMIC_RELEASE);
|
|
+
|
|
+END:
|
|
+ /* rte_ring_count reduce lock */
|
|
+ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)) {
|
|
+ del_data_in_event(sock);
|
|
+ }
|
|
+ return act_len;
|
|
+}
|
|
+
|
|
+/* processes on same node use ring to send data */
|
|
+ssize_t gazelle_same_node_ring_send(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
|
|
+{
|
|
+ unsigned long long cur_begin = __atomic_load_n(&sock->same_node_tx_ring->sndbegin, __ATOMIC_ACQUIRE);
|
|
+ unsigned long long cur_end = sock->same_node_tx_ring->sndend;
|
|
+ if (cur_end >= cur_begin + SAME_NODE_RING_LEN) {
|
|
+ errno = EAGAIN;
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ unsigned long long index = cur_end + 1;
|
|
+ size_t act_len = SAME_NODE_RING_LEN - (cur_end - cur_begin);
|
|
+ act_len = RTE_MIN(act_len, len);
|
|
+
|
|
+ if ((index & SAME_NODE_RING_MASK) + act_len > SAME_NODE_RING_LEN) {
|
|
+ size_t act_len1 = SAME_NODE_RING_LEN - (index & SAME_NODE_RING_MASK);
|
|
+ size_t act_len2 = act_len - act_len1;
|
|
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr + (index & SAME_NODE_RING_MASK), buf, act_len1);
|
|
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr, (char *)buf + act_len1, act_len2);
|
|
+ } else {
|
|
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr + (index & SAME_NODE_RING_MASK), buf, act_len);
|
|
+ }
|
|
+
|
|
+ index += act_len;
|
|
+ __atomic_store_n(&sock->same_node_tx_ring->sndend, index - 1, __ATOMIC_RELEASE);
|
|
+ if (act_len == 0) {
|
|
+ errno = EAGAIN;
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ return act_len;
|
|
+}
|
|
+
|
|
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
|
|
{
|
|
if (buf == NULL) {
|
|
@@ -813,6 +893,9 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
|
|
}
|
|
|
|
struct lwip_sock *sock = get_socket_by_fd(fd);
|
|
+ if (sock->same_node_tx_ring != NULL) {
|
|
+ return gazelle_same_node_ring_send(sock, buf, len, flags);
|
|
+ }
|
|
ssize_t send = write_stack_data(sock, buf, len);
|
|
if (send <= 0) {
|
|
return send;
|
|
@@ -857,22 +940,6 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
|
|
return buflen;
|
|
}
|
|
|
|
-static inline void del_data_in_event(struct lwip_sock *sock)
|
|
-{
|
|
- pthread_spin_lock(&sock->wakeup->event_list_lock);
|
|
-
|
|
- /* check again avoid cover event add in stack thread */
|
|
- if (!NETCONN_IS_DATAIN(sock)) {
|
|
- sock->events &= ~EPOLLIN;
|
|
-
|
|
- if (sock->events == 0) {
|
|
- list_del_node_null(&sock->event_list);
|
|
- }
|
|
- }
|
|
-
|
|
- pthread_spin_unlock(&sock->wakeup->event_list_lock);
|
|
-}
|
|
-
|
|
static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len)
|
|
{
|
|
uint32_t tot_len = pbuf->tot_len - free_len;
|
|
@@ -906,6 +973,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
|
|
return 0;
|
|
}
|
|
|
|
+ if (sock->same_node_rx_ring != NULL) {
|
|
+ return gazelle_same_node_ring_recv(sock, buf, len, flags);
|
|
+ }
|
|
+
|
|
while (recv_left > 0) {
|
|
if (sock->recv_lastdata) {
|
|
pbuf = sock->recv_lastdata;
|
|
@@ -962,6 +1033,21 @@ void add_recv_list(int32_t fd)
|
|
}
|
|
}
|
|
|
|
+void read_same_node_recv_list(struct protocol_stack *stack)
|
|
+{
|
|
+ struct list_node *list = &(stack->same_node_recv_list);
|
|
+ struct list_node *node, *temp;
|
|
+ struct lwip_sock *sock;
|
|
+
|
|
+ list_for_each_safe(node, temp, list) {
|
|
+ sock = container_of(node, struct lwip_sock, recv_list);
|
|
+
|
|
+ if (sock->same_node_rx_ring != NULL && same_node_ring_count(sock)) {
|
|
+ add_sock_event(sock, EPOLLIN);
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
|
|
{
|
|
struct list_node *list = &(stack->recv_list);
|
|
@@ -1216,3 +1302,246 @@ void stack_recvlist_count(struct rpc_msg *msg)
|
|
|
|
msg->result = get_list_count(&stack->recv_list);
|
|
}
|
|
+
|
|
+void netif_poll(struct netif *netif)
|
|
+{
|
|
+ struct tcp_pcb *pcb = NULL;
|
|
+ struct tcp_pcb_listen *pcbl = NULL;
|
|
+
|
|
+ for (pcb = tcp_active_pcbs; pcb != NULL; pcb = pcb->next) {
|
|
+#define NETIF_POLL_READ_COUNT 32
|
|
+ struct pbuf *pbufs[NETIF_POLL_READ_COUNT];
|
|
+ int ret;
|
|
+
|
|
+ if (pcb->client_rx_ring != NULL) {
|
|
+ ret = rte_ring_sc_dequeue_burst(pcb->client_rx_ring, (void **)pbufs, NETIF_POLL_READ_COUNT, NULL);
|
|
+ for (int i = 0; i < ret; i++) {
|
|
+ if (ip_input(pbufs[i], netif) != 0) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "netif_poll: ip_input return err\n");
|
|
+ pbuf_free(pbufs[i]);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ for (pcbl = tcp_listen_pcbs.listen_pcbs; pcbl != NULL; pcbl = pcbl->next) {
|
|
+ if (pcbl->listen_rx_ring != NULL) {
|
|
+ struct pbuf *pbuf;
|
|
+ if (rte_ring_sc_dequeue(pcbl->listen_rx_ring, (void **)&pbuf) == 0) {
|
|
+ if (ip_input(pbuf, netif) != ERR_OK) {
|
|
+ pbuf_free(pbuf);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
+/* processes on same node handshake packet use this function */
|
|
+err_t netif_loop_output(struct netif *netif, struct pbuf *p)
|
|
+{
|
|
+ struct tcp_pcb *pcb = p->pcb;
|
|
+ struct pbuf *head = NULL;
|
|
+
|
|
+ if (pcb == NULL || pcb->client_tx_ring == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: pcb is null\n");
|
|
+ return ERR_ARG;
|
|
+ }
|
|
+
|
|
+ if (p->next != NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: not support chained pbuf\n");
|
|
+ return ERR_ARG;
|
|
+ }
|
|
+
|
|
+ struct tcp_hdr *tcp_hdr = (struct tcp_hdr *)((char *)p->payload + sizeof(struct ip_hdr));
|
|
+ uint8_t flags = TCPH_FLAGS(tcp_hdr);
|
|
+
|
|
+ head = pbuf_alloc(0, p->len, PBUF_RAM);
|
|
+ if (head == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: pbuf_alloc failed\n");
|
|
+ return ERR_MEM;
|
|
+ }
|
|
+ head->ol_flags = p->ol_flags;
|
|
+ memcpy(head->payload, p->payload, p->len);
|
|
+
|
|
+ if ((flags & TCP_SYN) && !(flags & TCP_ACK)) {
|
|
+ /* SYN packet, send to listen_ring */
|
|
+ char ring_name[RING_NAME_LEN] = {0};
|
|
+ snprintf(ring_name, sizeof(ring_name), "listen_rx_ring_%d", pcb->remote_port);
|
|
+ struct rte_ring *ring = rte_ring_lookup(ring_name);
|
|
+ if (ring == NULL) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "netif_loop_output: cant find listen_rx_ring %d\n", pcb->remote_port);
|
|
+ pbuf_free(head);
|
|
+ } else {
|
|
+ if (rte_ring_mp_enqueue(ring, head) != 0) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "enqueue sync packet failed\n");
|
|
+ pbuf_free(head);
|
|
+ }
|
|
+ }
|
|
+ } else {
|
|
+ /* send other type packet to tx_ring */
|
|
+ if (rte_ring_sp_enqueue(pcb->client_tx_ring, head) != 0) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "client tx ring full\n");
|
|
+ pbuf_free(head);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return ERR_OK;
|
|
+}
|
|
+
|
|
+err_t find_same_node_memzone(struct tcp_pcb *pcb, struct lwip_sock *nsock)
|
|
+{
|
|
+ char name[RING_NAME_LEN];
|
|
+ snprintf(name, sizeof(name), "rte_mz_rx_%u", pcb->remote_port);
|
|
+ if ((nsock->same_node_tx_ring_mz = rte_memzone_lookup(name)) == NULL) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
|
|
+ return -1;
|
|
+ } else {
|
|
+ LSTACK_LOG(INFO, LSTACK, "lookup %s success\n", name);
|
|
+ }
|
|
+ nsock->same_node_tx_ring = (struct same_node_ring *)nsock->same_node_tx_ring_mz->addr;
|
|
+
|
|
+ snprintf(name, sizeof(name), "rte_mz_buf_rx_%u", pcb->remote_port);
|
|
+ if ((nsock->same_node_tx_ring->mz = rte_memzone_lookup(name)) == NULL) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ snprintf(name, sizeof(name), "rte_mz_tx_%u", pcb->remote_port);
|
|
+ if ((nsock->same_node_rx_ring_mz = rte_memzone_lookup(name)) == NULL) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
|
|
+ return -1;
|
|
+ } else {
|
|
+ LSTACK_LOG(INFO, LSTACK, "lookup %s success\n", name);
|
|
+ }
|
|
+ nsock->same_node_rx_ring = (struct same_node_ring *)nsock->same_node_rx_ring_mz->addr;
|
|
+
|
|
+ snprintf(name, sizeof(name), "rte_mz_buf_tx_%u", pcb->remote_port);
|
|
+ if ((nsock->same_node_rx_ring->mz = rte_memzone_lookup(name)) == NULL) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ /* rcvlink init in alloc_socket() */
|
|
+ /* remove from g_rcv_process_list in free_socket */
|
|
+ list_add_node(&nsock->stack->same_node_recv_list, &nsock->recv_list);
|
|
+ return 0;
|
|
+}
|
|
+
|
|
+err_t same_node_memzone_create(const struct rte_memzone **zone, int size, int port, char *name, char *rx)
|
|
+{
|
|
+ char mem_name[RING_NAME_LEN] = {0};
|
|
+ snprintf(mem_name, sizeof(mem_name), "%s_%s_%u", name, rx, port);
|
|
+
|
|
+ *zone = rte_memzone_reserve_aligned(mem_name, size, rte_socket_id(), 0, RTE_CACHE_LINE_SIZE);
|
|
+ if (*zone == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "cannot reserve memzone:%s, errno is %d\n", mem_name, rte_errno);
|
|
+ return ERR_MEM;
|
|
+ }
|
|
+
|
|
+ LSTACK_LOG(INFO, LSTACK, "lstack id %d, reserve %s(%p) success, addr is %p, size is %u\n", rte_socket_id(), mem_name, *zone, (*zone)->addr, size);
|
|
+
|
|
+ return ERR_OK;
|
|
+}
|
|
+
|
|
+err_t same_node_ring_create(struct rte_ring **ring, int size, int port, char *name, char *rx)
|
|
+{
|
|
+ unsigned flags;
|
|
+ char ring_name[RING_NAME_LEN] = {0};
|
|
+ if (strcmp(name, "listen") == 0) {
|
|
+ flags = RING_F_SC_DEQ;
|
|
+ } else {
|
|
+ flags = RING_F_SP_ENQ | RING_F_SC_DEQ;
|
|
+ }
|
|
+
|
|
+ snprintf(ring_name, sizeof(ring_name), "%s_%s_ring_%u", name, rx, port);
|
|
+ *ring = rte_ring_create(ring_name, size, rte_socket_id(), flags);
|
|
+ if (*ring == NULL) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "cannot create rte_ring %s, errno is %d\n", ring_name, rte_errno);
|
|
+ return ERR_MEM;
|
|
+ }
|
|
+ LSTACK_LOG(INFO, LSTACK, "lstack socket id:%d, create %s(%p) success\n", rte_socket_id(), ring_name, *ring);
|
|
+ return ERR_OK;
|
|
+}
|
|
+
|
|
+static void init_same_node_ring(struct tcp_pcb *pcb)
|
|
+{
|
|
+ struct netconn *netconn = (struct netconn *)pcb->callback_arg;
|
|
+ struct lwip_sock *sock = get_socket(netconn->socket);
|
|
+
|
|
+ pcb->client_rx_ring = NULL;
|
|
+ pcb->client_tx_ring = NULL;
|
|
+ pcb->free_ring = 0;
|
|
+ sock->same_node_rx_ring = NULL;
|
|
+ sock->same_node_rx_ring_mz = NULL;
|
|
+ sock->same_node_tx_ring = NULL;
|
|
+ sock->same_node_tx_ring_mz = NULL;
|
|
+}
|
|
+
|
|
+#define CLIENT_RING_SIZE 512
|
|
+err_t create_same_node_ring(struct tcp_pcb *pcb)
|
|
+{
|
|
+ struct netconn *netconn = (struct netconn *)pcb->callback_arg;
|
|
+ struct lwip_sock *sock = get_socket(netconn->socket);
|
|
+
|
|
+ if (same_node_ring_create(&pcb->client_rx_ring, CLIENT_RING_SIZE, pcb->local_port, "client", "rx") != 0) {
|
|
+ goto END;
|
|
+ }
|
|
+ if (same_node_ring_create(&pcb->client_tx_ring, CLIENT_RING_SIZE, pcb->local_port, "client", "tx") != 0) {
|
|
+ goto END;
|
|
+ }
|
|
+ pcb->free_ring = 1;
|
|
+
|
|
+ if (same_node_memzone_create(&sock->same_node_rx_ring_mz, sizeof(struct same_node_ring), pcb->local_port, "rte_mz", "rx") != 0) {
|
|
+ goto END;
|
|
+ }
|
|
+ sock->same_node_rx_ring = (struct same_node_ring*)sock->same_node_rx_ring_mz->addr;
|
|
+
|
|
+ if (same_node_memzone_create(&sock->same_node_rx_ring->mz, SAME_NODE_RING_LEN, pcb->local_port, "rte_mz_buf", "rx") != 0) {
|
|
+ goto END;
|
|
+ }
|
|
+
|
|
+ sock->same_node_rx_ring->sndbegin = 0;
|
|
+ sock->same_node_rx_ring->sndend = 0;
|
|
+
|
|
+ if (same_node_memzone_create(&sock->same_node_tx_ring_mz, sizeof(struct same_node_ring), pcb->local_port, "rte_mz", "tx") != 0) {
|
|
+ goto END;
|
|
+ }
|
|
+ sock->same_node_tx_ring = (struct same_node_ring*)sock->same_node_tx_ring_mz->addr;
|
|
+
|
|
+ if (same_node_memzone_create(&sock->same_node_tx_ring->mz, SAME_NODE_RING_LEN, pcb->local_port, "rte_mz_buf", "tx") != 0) {
|
|
+ goto END;
|
|
+ }
|
|
+
|
|
+ sock->same_node_tx_ring->sndbegin = 0;
|
|
+ sock->same_node_tx_ring->sndend = 0;
|
|
+
|
|
+ return 0;
|
|
+END:
|
|
+ rte_ring_free(pcb->client_rx_ring);
|
|
+ rte_ring_free(pcb->client_tx_ring);
|
|
+ rte_memzone_free(sock->same_node_rx_ring->mz);
|
|
+ rte_memzone_free(sock->same_node_rx_ring_mz);
|
|
+ rte_memzone_free(sock->same_node_tx_ring->mz);
|
|
+ rte_memzone_free(sock->same_node_tx_ring_mz);
|
|
+ init_same_node_ring(pcb);
|
|
+ return ERR_BUF;
|
|
+}
|
|
+
|
|
+err_t find_same_node_ring(struct tcp_pcb *npcb)
|
|
+{
|
|
+ char name[RING_NAME_LEN] = {0};
|
|
+ snprintf(name, sizeof(name), "client_tx_ring_%u", npcb->remote_port);
|
|
+ npcb->client_rx_ring = rte_ring_lookup(name);
|
|
+ memset(name, 0, sizeof(name));
|
|
+ snprintf(name, sizeof(name), "client_rx_ring_%u", npcb->remote_port);
|
|
+ npcb->client_tx_ring = rte_ring_lookup(name);
|
|
+ npcb->free_ring = 0;
|
|
+ if (npcb->client_tx_ring == NULL ||
|
|
+ npcb->client_rx_ring == NULL) {
|
|
+ LSTACK_LOG(INFO, LSTACK, "lookup client rxtx ring failed, port is %d\n", npcb->remote_port);
|
|
+ tcp_abandon(npcb, 0);
|
|
+ return ERR_CONN;
|
|
+ } else {
|
|
+ LSTACK_LOG(INFO, LSTACK, "find client_tx_ring_%u and client_rx_ring_%u\n", npcb->remote_port, npcb->remote_port);
|
|
+ }
|
|
+ return 0;
|
|
+}
|
|
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
|
|
index 48eff1d..0d7b7f0 100644
|
|
--- a/src/lstack/core/lstack_protocol_stack.c
|
|
+++ b/src/lstack/core/lstack_protocol_stack.c
|
|
@@ -314,6 +314,7 @@ static int32_t init_stack_value(struct protocol_stack *stack, void *arg)
|
|
stack->lwip_stats = &lwip_stats;
|
|
|
|
init_list_node(&stack->recv_list);
|
|
+ init_list_node(&stack->same_node_recv_list);
|
|
init_list_node(&stack->wakeup_list);
|
|
|
|
sys_calibrate_tsc();
|
|
@@ -497,6 +498,10 @@ static void* gazelle_stack_thread(void *arg)
|
|
|
|
gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
|
|
|
|
+ /* reduce traversal times */
|
|
+ if ((wakeup_tick & 0xff) == 0) {
|
|
+ read_same_node_recv_list(stack);
|
|
+ }
|
|
read_recv_list(stack, read_connect_number);
|
|
|
|
if ((wakeup_tick & 0xf) == 0) {
|
|
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
|
|
index a690adb..55cf769 100644
|
|
--- a/src/lstack/include/lstack_ethdev.h
|
|
+++ b/src/lstack/include/lstack_ethdev.h
|
|
@@ -45,5 +45,6 @@ void delete_user_process_port(uint16_t dst_port, enum port_type type);
|
|
void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
|
|
void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
|
void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
|
|
+void netif_poll(struct netif *netif);
|
|
|
|
#endif /* __GAZELLE_ETHDEV_H__ */
|
|
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
|
|
index 02110e0..d52a06d 100644
|
|
--- a/src/lstack/include/lstack_lwip.h
|
|
+++ b/src/lstack/include/lstack_lwip.h
|
|
@@ -14,7 +14,7 @@
|
|
#define __GAZELLE_LWIP_H__
|
|
|
|
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
|
|
-#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata))
|
|
+#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata) || (sock->same_node_rx_ring != NULL && same_node_ring_count(sock)))
|
|
#define NETCONN_IS_DATAOUT(sock) (gazelle_ring_readover_count((sock)->send_ring) || (sock)->send_lastdata || (sock)->send_pre_del)
|
|
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
|
|
|
|
@@ -33,6 +33,7 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len);
|
|
ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
|
|
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
|
|
void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
|
|
+void read_same_node_recv_list(struct protocol_stack *stack);
|
|
void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
|
|
void add_recv_list(int32_t fd);
|
|
void get_lwip_conntable(struct rpc_msg *msg);
|
|
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
|
|
index d58c98a..3691250 100644
|
|
--- a/src/lstack/include/lstack_protocol_stack.h
|
|
+++ b/src/lstack/include/lstack_protocol_stack.h
|
|
@@ -71,6 +71,7 @@ struct protocol_stack {
|
|
|
|
struct rte_mbuf *pkts[RTE_TEST_RX_DESC_DEFAULT];
|
|
struct list_node recv_list;
|
|
+ struct list_node same_node_recv_list; /* used for same node processes communication */
|
|
struct list_node wakeup_list;
|
|
|
|
volatile uint16_t conn_num;
|
|
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
|
|
index 60ea897..01b1280 100644
|
|
--- a/src/lstack/netif/lstack_ethdev.c
|
|
+++ b/src/lstack/netif/lstack_ethdev.c
|
|
@@ -82,7 +82,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
|
|
len = (uint16_t)rte_pktmbuf_data_len(m);
|
|
payload = rte_pktmbuf_mtod(m, void *);
|
|
pc = mbuf_to_pbuf(m);
|
|
- pc->custom_free_function = gazelle_free_pbuf;
|
|
next = pbuf_alloced_custom(PBUF_RAW, (uint16_t)len, PBUF_RAM, pc, payload, (uint16_t)len);
|
|
if (next == NULL) {
|
|
stack->stats.rx_allocmbuf_fail++;
|
|
@@ -653,6 +652,7 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
|
|
{
|
|
uint32_t nr_pkts;
|
|
|
|
+ netif_poll(&stack->netif);
|
|
nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, nic_read_number);
|
|
if (nr_pkts == 0) {
|
|
return 0;
|
|
--
|
|
2.23.0
|
|
|