342 lines
13 KiB
Diff
342 lines
13 KiB
Diff
From c109336bcf860c9dd16ba8becd9de72ecdce4d8f Mon Sep 17 00:00:00 2001
|
|
From: jiangheng <jiangheng14@huawei.com>
|
|
Date: Tue, 11 Jun 2024 14:15:49 +0800
|
|
Subject: [PATCH] refactor udp send distinguish tcp/udp get_from_sendring
|
|
cancel the restrictioin that maximum of 2 rpc msg can be send over the same
|
|
udp sock
|
|
|
|
---
|
|
src/lstack/core/lstack_lwip.c | 107 ++++++++++++++++--------
|
|
src/lstack/core/lstack_protocol_stack.c | 39 ++++++++-
|
|
src/lstack/core/lstack_thread_rpc.c | 29 ++++++-
|
|
src/lstack/include/lstack_lwip.h | 3 +-
|
|
src/lstack/include/lstack_rpc_proc.h | 3 +-
|
|
src/lstack/include/lstack_thread_rpc.h | 3 +-
|
|
6 files changed, 141 insertions(+), 43 deletions(-)
|
|
|
|
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
|
|
index 153c5cc..db948b0 100644
|
|
--- a/src/lstack/core/lstack_lwip.c
|
|
+++ b/src/lstack/core/lstack_lwip.c
|
|
@@ -101,7 +101,7 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u
|
|
void *data = rte_pktmbuf_mtod(mbuf, void *);
|
|
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
|
|
if (pbuf) {
|
|
- pbuf->allow_in = 1;
|
|
+ pbuf->allow_append = 1;
|
|
pbuf->addr = *IP_ANY_TYPE;
|
|
pbuf->port = 0;
|
|
pthread_spin_init(&pbuf->pbuf_lock, PTHREAD_PROCESS_SHARED);
|
|
@@ -227,24 +227,61 @@ struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type typ
|
|
return init_mbuf_to_pbuf(mbuf, layer, length, type);
|
|
}
|
|
|
|
-struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags)
|
|
+static inline bool pbuf_allow_append(struct pbuf *pbuf, uint16_t remain_size)
|
|
+{
|
|
+ pthread_spin_lock(&pbuf->pbuf_lock);
|
|
+ if (pbuf->tot_len > remain_size) {
|
|
+ pthread_spin_unlock(&pbuf->pbuf_lock);
|
|
+ return false;
|
|
+ }
|
|
+ if (pbuf->allow_append == 1) {
|
|
+ __sync_fetch_and_sub(&pbuf->allow_append, 1);
|
|
+ }
|
|
+
|
|
+ pthread_spin_unlock(&pbuf->pbuf_lock);
|
|
+ return true;
|
|
+}
|
|
+
|
|
+struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size)
|
|
+{
|
|
+ int count;
|
|
+ /* when remain_size is 0, fill_sendring write one pbuf to sendring */
|
|
+ if (remain_size == 0) {
|
|
+ count = 1;
|
|
+ } else {
|
|
+ count = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN;
|
|
+ }
|
|
+
|
|
+ struct pbuf *pbufs[count];
|
|
+
|
|
+ int actual_count = gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbufs, count);
|
|
+ if (unlikely(actual_count != count)) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "udp get pbuf from sendring error, expected: %d, actual: %d\n",
|
|
+ count, actual_count);
|
|
+ }
|
|
+
|
|
+ if (unlikely(pbufs[0]->tot_len != remain_size)) {
|
|
+ LSTACK_LOG(ERR, LSTACK, "udp get pbuf size error, expected: %d, actual: %d\n",
|
|
+ remain_size, pbufs[0]->tot_len);
|
|
+ }
|
|
+
|
|
+ for (int i = 0; get_protocol_stack_group()->latency_start && i < count; i++) {
|
|
+ calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_WRITE_LWIP, 0);
|
|
+ }
|
|
+
|
|
+ return pbufs[0];
|
|
+}
|
|
+
|
|
+struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size)
|
|
{
|
|
struct pbuf *pbuf = NULL;
|
|
|
|
if (unlikely(sock->send_pre_del)) {
|
|
- pbuf = sock->send_pre_del;
|
|
- pthread_spin_lock(&pbuf->pbuf_lock);
|
|
- if (pbuf->tot_len > remain_size) {
|
|
- pthread_spin_unlock(&pbuf->pbuf_lock);
|
|
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
|
|
+ if (pbuf_allow_append(sock->send_pre_del, remain_size)) {
|
|
+ return sock->send_pre_del;
|
|
+ } else {
|
|
return NULL;
|
|
}
|
|
- if (pbuf->allow_in == 1) {
|
|
- __sync_fetch_and_sub(&pbuf->allow_in, 1);
|
|
- }
|
|
- pthread_spin_unlock(&pbuf->pbuf_lock);
|
|
-
|
|
- return pbuf;
|
|
}
|
|
|
|
gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf, 1);
|
|
@@ -252,17 +289,6 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s
|
|
return NULL;
|
|
}
|
|
|
|
- /* udp send a pbuf chain, dequeue all pbufs except head pbuf */
|
|
- if (NETCONN_IS_UDP(sock) && remain_size > MBUF_MAX_DATA_LEN) {
|
|
- int size = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN - 1;
|
|
- struct pbuf *pbuf_used[size];
|
|
- gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf_used, size);
|
|
-
|
|
- for (uint32_t i = 0; get_protocol_stack_group()->latency_start && i < size; i++) {
|
|
- calculate_lstack_latency(&sock->stack->latency, pbuf_used[i], GAZELLE_LATENCY_WRITE_LWIP, 0);
|
|
- }
|
|
- }
|
|
-
|
|
if (get_protocol_stack_group()->latency_start) {
|
|
calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_WRITE_LWIP, 0);
|
|
}
|
|
@@ -270,19 +296,11 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s
|
|
sock->send_pre_del = pbuf;
|
|
|
|
if (!gazelle_ring_readover_count(sock->send_ring)) {
|
|
- pthread_spin_lock(&pbuf->pbuf_lock);
|
|
- if (pbuf->tot_len > remain_size) {
|
|
- pthread_spin_unlock(&pbuf->pbuf_lock);
|
|
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
|
|
+ if (!pbuf_allow_append(pbuf, remain_size)) {
|
|
return NULL;
|
|
}
|
|
- if (pbuf->allow_in == 1) {
|
|
- __sync_fetch_and_sub(&pbuf->allow_in, 1);
|
|
- }
|
|
- pthread_spin_unlock(&pbuf->pbuf_lock);
|
|
} else {
|
|
if (pbuf->tot_len > remain_size) {
|
|
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
|
|
return NULL;
|
|
}
|
|
}
|
|
@@ -388,7 +406,7 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r)
|
|
if (pthread_spin_trylock(&last_pbuf->pbuf_lock) != 0) {
|
|
return NULL;
|
|
}
|
|
- if (last_pbuf->allow_in != 1) {
|
|
+ if (last_pbuf->allow_append != 1) {
|
|
pthread_spin_unlock(&last_pbuf->pbuf_lock);
|
|
return NULL;
|
|
}
|
|
@@ -675,17 +693,34 @@ ssize_t do_lwip_recvmsg_from_stack(int32_t s, const struct msghdr *message, int3
|
|
return buflen;
|
|
}
|
|
|
|
-static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
|
|
+static inline void notice_stack_tcp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
|
|
{
|
|
// 2: call_num >= 2, don't need add new rpc send
|
|
if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) < 2) {
|
|
- while (rpc_call_send(&sock->stack->rpc_queue, fd, NULL, len, flags) < 0) {
|
|
+ while (rpc_call_tcp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
|
|
usleep(1000); // 1000: wait 1ms to exec again
|
|
}
|
|
__sync_fetch_and_add(&sock->call_num, 1);
|
|
}
|
|
}
|
|
|
|
+static inline void notice_stack_udp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
|
|
+{
|
|
+ __sync_fetch_and_add(&sock->call_num, 1);
|
|
+ while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
|
|
+ usleep(1000); // 1000: wait 1ms to exec again
|
|
+ }
|
|
+}
|
|
+
|
|
+static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
|
|
+{
|
|
+ if (NETCONN_IS_UDP(sock)) {
|
|
+ notice_stack_udp_send(sock, fd, len, flags);
|
|
+ } else {
|
|
+ notice_stack_tcp_send(sock, fd, len, flags);
|
|
+ }
|
|
+}
|
|
+
|
|
/* process on same node use ring to recv data */
|
|
ssize_t gazelle_same_node_ring_recv(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
|
|
{
|
|
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
|
|
index f6d381e..d130c91 100644
|
|
--- a/src/lstack/core/lstack_protocol_stack.c
|
|
+++ b/src/lstack/core/lstack_protocol_stack.c
|
|
@@ -877,7 +877,7 @@ void stack_recv(struct rpc_msg *msg)
|
|
msg->args[MSG_ARG_3].i);
|
|
}
|
|
|
|
-void stack_send(struct rpc_msg *msg)
|
|
+void stack_tcp_send(struct rpc_msg *msg)
|
|
{
|
|
int32_t fd = msg->args[MSG_ARG_0].i;
|
|
size_t len = msg->args[MSG_ARG_1].size;
|
|
@@ -913,6 +913,39 @@ void stack_send(struct rpc_msg *msg)
|
|
return;
|
|
}
|
|
|
|
+void stack_udp_send(struct rpc_msg *msg)
|
|
+{
|
|
+ int32_t fd = msg->args[MSG_ARG_0].i;
|
|
+ size_t len = msg->args[MSG_ARG_1].size;
|
|
+ struct protocol_stack *stack = get_protocol_stack();
|
|
+ int replenish_again;
|
|
+ uint32_t call_num;
|
|
+
|
|
+ if (get_protocol_stack_group()->latency_start) {
|
|
+ calculate_rpcmsg_latency(&stack->latency, msg, GAZELLE_LATENCY_WRITE_RPC_MSG);
|
|
+ }
|
|
+
|
|
+ struct lwip_sock *sock = get_socket(fd);
|
|
+ if (sock == NULL) {
|
|
+ msg->result = -1;
|
|
+ LSTACK_LOG(ERR, LSTACK, "get sock error! fd=%d, len=%ld\n", fd, len);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
|
|
+ call_num = __sync_fetch_and_sub(&sock->call_num, 1);
|
|
+ if (replenish_again < 0) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if ((call_num == 1) && (replenish_again > 0)) {
|
|
+ rpc_call_replenish(&stack->rpc_queue, sock);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ return;
|
|
+}
|
|
+
|
|
/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
|
|
void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
|
|
{
|
|
@@ -1040,6 +1073,10 @@ void stack_replenish_sendring(struct rpc_msg *msg)
|
|
struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
|
|
|
|
msg->result = do_lwip_replenish_sendring(stack, sock);
|
|
+ if (msg->result == true) {
|
|
+ msg->recall_flag = 1;
|
|
+ rpc_call(&stack->rpc_queue, msg);
|
|
+ }
|
|
}
|
|
|
|
void stack_get_conntable(struct rpc_msg *msg)
|
|
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
|
|
index 04bdc3a..e438c37 100644
|
|
--- a/src/lstack/core/lstack_thread_rpc.c
|
|
+++ b/src/lstack/core/lstack_thread_rpc.c
|
|
@@ -460,13 +460,36 @@ int32_t rpc_call_replenish(rpc_queue *queue, void *sock)
|
|
}
|
|
|
|
msg->args[MSG_ARG_0].p = sock;
|
|
+ msg->sync_flag = 0;
|
|
|
|
- return rpc_sync_call(queue, msg);
|
|
+ rpc_call(queue, msg);
|
|
+ return 0;
|
|
+}
|
|
+
|
|
+int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
|
|
+{
|
|
+ struct rpc_msg *msg = rpc_msg_alloc(stack_tcp_send);
|
|
+ if (msg == NULL) {
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ if (get_protocol_stack_group()->latency_start) {
|
|
+ time_stamp_into_rpcmsg(get_socket_by_fd(fd));
|
|
+ }
|
|
+
|
|
+ msg->args[MSG_ARG_0].i = fd;
|
|
+ msg->args[MSG_ARG_1].size = len;
|
|
+ msg->args[MSG_ARG_2].i = flags;
|
|
+ msg->sync_flag = 0;
|
|
+
|
|
+ rpc_call(queue, msg);
|
|
+
|
|
+ return 0;
|
|
}
|
|
|
|
-int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags)
|
|
+int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
|
|
{
|
|
- struct rpc_msg *msg = rpc_msg_alloc(stack_send);
|
|
+ struct rpc_msg *msg = rpc_msg_alloc(stack_udp_send);
|
|
if (msg == NULL) {
|
|
return -1;
|
|
}
|
|
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
|
|
index fa10e3f..85c9c20 100644
|
|
--- a/src/lstack/include/lstack_lwip.h
|
|
+++ b/src/lstack/include/lstack_lwip.h
|
|
@@ -33,7 +33,8 @@ int do_lwip_close(int32_t fd);
|
|
void do_lwip_init_sock(int32_t fd);
|
|
void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
|
|
|
|
-struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags);
|
|
+struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
|
|
+struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
|
|
void do_lwip_get_from_sendring_over(struct lwip_sock *sock);
|
|
bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
|
|
ssize_t do_lwip_read_from_lwip(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
|
|
diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h
|
|
index 71f0c58..77b18bd 100644
|
|
--- a/src/lstack/include/lstack_rpc_proc.h
|
|
+++ b/src/lstack/include/lstack_rpc_proc.h
|
|
@@ -30,7 +30,8 @@ void stack_getsockopt(struct rpc_msg *msg);
|
|
void stack_setsockopt(struct rpc_msg *msg);
|
|
void stack_fcntl(struct rpc_msg *msg);
|
|
void stack_ioctl(struct rpc_msg *msg);
|
|
-void stack_send(struct rpc_msg *msg);
|
|
+void stack_tcp_send(struct rpc_msg *msg);
|
|
+void stack_udp_send(struct rpc_msg *msg);
|
|
void stack_mempool_size(struct rpc_msg *msg);
|
|
void stack_rpcpool_size(struct rpc_msg *msg);
|
|
void stack_create_shadow_fd(struct rpc_msg *msg);
|
|
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
|
|
index 276ebb2..fa98b0c 100644
|
|
--- a/src/lstack/include/lstack_thread_rpc.h
|
|
+++ b/src/lstack/include/lstack_thread_rpc.h
|
|
@@ -83,7 +83,8 @@ int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr,
|
|
int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog);
|
|
int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
|
|
int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
|
|
-int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags);
|
|
+int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags);
|
|
+int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
|
|
int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
|
|
int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
|
|
int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen);
|
|
--
|
|
2.33.0
|
|
|