diff --git a/0196-refactor-udp-send.patch b/0196-refactor-udp-send.patch new file mode 100644 index 0000000..a622143 --- /dev/null +++ b/0196-refactor-udp-send.patch @@ -0,0 +1,341 @@ +From c109336bcf860c9dd16ba8becd9de72ecdce4d8f Mon Sep 17 00:00:00 2001 +From: jiangheng +Date: Tue, 11 Jun 2024 14:15:49 +0800 +Subject: [PATCH] refactor udp send distinguish tcp/udp get_from_sendring + cancel the restrictioin that maximum of 2 rpc msg can be send over the same + udp sock + +--- + src/lstack/core/lstack_lwip.c | 107 ++++++++++++++++-------- + src/lstack/core/lstack_protocol_stack.c | 39 ++++++++- + src/lstack/core/lstack_thread_rpc.c | 29 ++++++- + src/lstack/include/lstack_lwip.h | 3 +- + src/lstack/include/lstack_rpc_proc.h | 3 +- + src/lstack/include/lstack_thread_rpc.h | 3 +- + 6 files changed, 141 insertions(+), 43 deletions(-) + +diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c +index 153c5cc..db948b0 100644 +--- a/src/lstack/core/lstack_lwip.c ++++ b/src/lstack/core/lstack_lwip.c +@@ -101,7 +101,7 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u + void *data = rte_pktmbuf_mtod(mbuf, void *); + struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ); + if (pbuf) { +- pbuf->allow_in = 1; ++ pbuf->allow_append = 1; + pbuf->addr = *IP_ANY_TYPE; + pbuf->port = 0; + pthread_spin_init(&pbuf->pbuf_lock, PTHREAD_PROCESS_SHARED); +@@ -227,24 +227,61 @@ struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type typ + return init_mbuf_to_pbuf(mbuf, layer, length, type); + } + +-struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags) ++static inline bool pbuf_allow_append(struct pbuf *pbuf, uint16_t remain_size) ++{ ++ pthread_spin_lock(&pbuf->pbuf_lock); ++ if (pbuf->tot_len > remain_size) { ++ pthread_spin_unlock(&pbuf->pbuf_lock); ++ return false; ++ } ++ if (pbuf->allow_append == 1) { ++ __sync_fetch_and_sub(&pbuf->allow_append, 1); ++ } ++ ++ pthread_spin_unlock(&pbuf->pbuf_lock); ++ return true; ++} ++ ++struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size) ++{ ++ int count; ++ /* when remain_size is 0, fill_sendring write one pbuf to sendring */ ++ if (remain_size == 0) { ++ count = 1; ++ } else { ++ count = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN; ++ } ++ ++ struct pbuf *pbufs[count]; ++ ++ int actual_count = gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbufs, count); ++ if (unlikely(actual_count != count)) { ++ LSTACK_LOG(ERR, LSTACK, "udp get pbuf from sendring error, expected: %d, actual: %d\n", ++ count, actual_count); ++ } ++ ++ if (unlikely(pbufs[0]->tot_len != remain_size)) { ++ LSTACK_LOG(ERR, LSTACK, "udp get pbuf size error, expected: %d, actual: %d\n", ++ remain_size, pbufs[0]->tot_len); ++ } ++ ++ for (int i = 0; get_protocol_stack_group()->latency_start && i < count; i++) { ++ calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_WRITE_LWIP, 0); ++ } ++ ++ return pbufs[0]; ++} ++ ++struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size) + { + struct pbuf *pbuf = NULL; + + if (unlikely(sock->send_pre_del)) { +- pbuf = sock->send_pre_del; +- pthread_spin_lock(&pbuf->pbuf_lock); +- if (pbuf->tot_len > remain_size) { +- pthread_spin_unlock(&pbuf->pbuf_lock); +- *apiflags &= ~TCP_WRITE_FLAG_MORE; ++ if (pbuf_allow_append(sock->send_pre_del, remain_size)) { ++ return sock->send_pre_del; ++ } else { + return NULL; + } +- if (pbuf->allow_in == 1) { +- __sync_fetch_and_sub(&pbuf->allow_in, 1); +- } +- pthread_spin_unlock(&pbuf->pbuf_lock); +- +- return pbuf; + } + + gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf, 1); +@@ -252,17 +289,6 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s + return NULL; + } + +- /* udp send a pbuf chain, dequeue all pbufs except head pbuf */ +- if (NETCONN_IS_UDP(sock) && remain_size > MBUF_MAX_DATA_LEN) { +- int size = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN - 1; +- struct pbuf *pbuf_used[size]; +- gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf_used, size); +- +- for (uint32_t i = 0; get_protocol_stack_group()->latency_start && i < size; i++) { +- calculate_lstack_latency(&sock->stack->latency, pbuf_used[i], GAZELLE_LATENCY_WRITE_LWIP, 0); +- } +- } +- + if (get_protocol_stack_group()->latency_start) { + calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_WRITE_LWIP, 0); + } +@@ -270,19 +296,11 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s + sock->send_pre_del = pbuf; + + if (!gazelle_ring_readover_count(sock->send_ring)) { +- pthread_spin_lock(&pbuf->pbuf_lock); +- if (pbuf->tot_len > remain_size) { +- pthread_spin_unlock(&pbuf->pbuf_lock); +- *apiflags &= ~TCP_WRITE_FLAG_MORE; ++ if (!pbuf_allow_append(pbuf, remain_size)) { + return NULL; + } +- if (pbuf->allow_in == 1) { +- __sync_fetch_and_sub(&pbuf->allow_in, 1); +- } +- pthread_spin_unlock(&pbuf->pbuf_lock); + } else { + if (pbuf->tot_len > remain_size) { +- *apiflags &= ~TCP_WRITE_FLAG_MORE; + return NULL; + } + } +@@ -388,7 +406,7 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r) + if (pthread_spin_trylock(&last_pbuf->pbuf_lock) != 0) { + return NULL; + } +- if (last_pbuf->allow_in != 1) { ++ if (last_pbuf->allow_append != 1) { + pthread_spin_unlock(&last_pbuf->pbuf_lock); + return NULL; + } +@@ -675,17 +693,34 @@ ssize_t do_lwip_recvmsg_from_stack(int32_t s, const struct msghdr *message, int3 + return buflen; + } + +-static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) ++static inline void notice_stack_tcp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) + { + // 2: call_num >= 2, don't need add new rpc send + if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) < 2) { +- while (rpc_call_send(&sock->stack->rpc_queue, fd, NULL, len, flags) < 0) { ++ while (rpc_call_tcp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) { + usleep(1000); // 1000: wait 1ms to exec again + } + __sync_fetch_and_add(&sock->call_num, 1); + } + } + ++static inline void notice_stack_udp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) ++{ ++ __sync_fetch_and_add(&sock->call_num, 1); ++ while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) { ++ usleep(1000); // 1000: wait 1ms to exec again ++ } ++} ++ ++static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) ++{ ++ if (NETCONN_IS_UDP(sock)) { ++ notice_stack_udp_send(sock, fd, len, flags); ++ } else { ++ notice_stack_tcp_send(sock, fd, len, flags); ++ } ++} ++ + /* process on same node use ring to recv data */ + ssize_t gazelle_same_node_ring_recv(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags) + { +diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c +index f6d381e..d130c91 100644 +--- a/src/lstack/core/lstack_protocol_stack.c ++++ b/src/lstack/core/lstack_protocol_stack.c +@@ -877,7 +877,7 @@ void stack_recv(struct rpc_msg *msg) + msg->args[MSG_ARG_3].i); + } + +-void stack_send(struct rpc_msg *msg) ++void stack_tcp_send(struct rpc_msg *msg) + { + int32_t fd = msg->args[MSG_ARG_0].i; + size_t len = msg->args[MSG_ARG_1].size; +@@ -913,6 +913,39 @@ void stack_send(struct rpc_msg *msg) + return; + } + ++void stack_udp_send(struct rpc_msg *msg) ++{ ++ int32_t fd = msg->args[MSG_ARG_0].i; ++ size_t len = msg->args[MSG_ARG_1].size; ++ struct protocol_stack *stack = get_protocol_stack(); ++ int replenish_again; ++ uint32_t call_num; ++ ++ if (get_protocol_stack_group()->latency_start) { ++ calculate_rpcmsg_latency(&stack->latency, msg, GAZELLE_LATENCY_WRITE_RPC_MSG); ++ } ++ ++ struct lwip_sock *sock = get_socket(fd); ++ if (sock == NULL) { ++ msg->result = -1; ++ LSTACK_LOG(ERR, LSTACK, "get sock error! fd=%d, len=%ld\n", fd, len); ++ return; ++ } ++ ++ replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0); ++ call_num = __sync_fetch_and_sub(&sock->call_num, 1); ++ if (replenish_again < 0) { ++ return; ++ } ++ ++ if ((call_num == 1) && (replenish_again > 0)) { ++ rpc_call_replenish(&stack->rpc_queue, sock); ++ return; ++ } ++ ++ return; ++} ++ + /* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */ + void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack) + { +@@ -1040,6 +1073,10 @@ void stack_replenish_sendring(struct rpc_msg *msg) + struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p; + + msg->result = do_lwip_replenish_sendring(stack, sock); ++ if (msg->result == true) { ++ msg->recall_flag = 1; ++ rpc_call(&stack->rpc_queue, msg); ++ } + } + + void stack_get_conntable(struct rpc_msg *msg) +diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c +index 04bdc3a..e438c37 100644 +--- a/src/lstack/core/lstack_thread_rpc.c ++++ b/src/lstack/core/lstack_thread_rpc.c +@@ -460,13 +460,36 @@ int32_t rpc_call_replenish(rpc_queue *queue, void *sock) + } + + msg->args[MSG_ARG_0].p = sock; ++ msg->sync_flag = 0; + +- return rpc_sync_call(queue, msg); ++ rpc_call(queue, msg); ++ return 0; ++} ++ ++int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags) ++{ ++ struct rpc_msg *msg = rpc_msg_alloc(stack_tcp_send); ++ if (msg == NULL) { ++ return -1; ++ } ++ ++ if (get_protocol_stack_group()->latency_start) { ++ time_stamp_into_rpcmsg(get_socket_by_fd(fd)); ++ } ++ ++ msg->args[MSG_ARG_0].i = fd; ++ msg->args[MSG_ARG_1].size = len; ++ msg->args[MSG_ARG_2].i = flags; ++ msg->sync_flag = 0; ++ ++ rpc_call(queue, msg); ++ ++ return 0; + } + +-int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags) ++int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags) + { +- struct rpc_msg *msg = rpc_msg_alloc(stack_send); ++ struct rpc_msg *msg = rpc_msg_alloc(stack_udp_send); + if (msg == NULL) { + return -1; + } +diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h +index fa10e3f..85c9c20 100644 +--- a/src/lstack/include/lstack_lwip.h ++++ b/src/lstack/include/lstack_lwip.h +@@ -33,7 +33,8 @@ int do_lwip_close(int32_t fd); + void do_lwip_init_sock(int32_t fd); + void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock); + +-struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags); ++struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size); ++struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size); + void do_lwip_get_from_sendring_over(struct lwip_sock *sock); + bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock); + ssize_t do_lwip_read_from_lwip(struct lwip_sock *sock, int32_t flags, uint8_t apiflags); +diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h +index 71f0c58..77b18bd 100644 +--- a/src/lstack/include/lstack_rpc_proc.h ++++ b/src/lstack/include/lstack_rpc_proc.h +@@ -30,7 +30,8 @@ void stack_getsockopt(struct rpc_msg *msg); + void stack_setsockopt(struct rpc_msg *msg); + void stack_fcntl(struct rpc_msg *msg); + void stack_ioctl(struct rpc_msg *msg); +-void stack_send(struct rpc_msg *msg); ++void stack_tcp_send(struct rpc_msg *msg); ++void stack_udp_send(struct rpc_msg *msg); + void stack_mempool_size(struct rpc_msg *msg); + void stack_rpcpool_size(struct rpc_msg *msg); + void stack_create_shadow_fd(struct rpc_msg *msg); +diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h +index 276ebb2..fa98b0c 100644 +--- a/src/lstack/include/lstack_thread_rpc.h ++++ b/src/lstack/include/lstack_thread_rpc.h +@@ -83,7 +83,8 @@ int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, + int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog); + int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags); + int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen); +-int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags); ++int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags); ++int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags); + int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); + int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); + int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen); +-- +2.33.0 + diff --git a/gazelle.spec b/gazelle.spec index 6da2f74..fd7cee6 100644 --- a/gazelle.spec +++ b/gazelle.spec @@ -212,6 +212,8 @@ Patch9192: 0192-remove-legacy-mem.patch Patch9193: 0193-cfg-bond_slave_mac-support-pci-addr.patch Patch9194: 0194-refactor-tx-cache-module.patch Patch9195: 0195-virtio-create-and-init-virtio_port.patch +Patch9196: 0196-refactor-udp-send.patch + %description %{name} is a high performance user-mode stack. @@ -252,6 +254,9 @@ install -Dpm 0640 %{_builddir}/%{name}-%{version}/src/ltran/ltran.conf %{b %config(noreplace) %{conf_path}/ltran.conf %changelog +* Fri Jun 14 2024 jiangheng - 1.0.2-42 +- refacotr udp send + * Fri Jun 21 2024 yinbin6 - 1.0.2-42 - [virtio]: create and init virtio_port - refactor tx cache module