From c109336bcf860c9dd16ba8becd9de72ecdce4d8f Mon Sep 17 00:00:00 2001 From: jiangheng Date: Tue, 11 Jun 2024 14:15:49 +0800 Subject: [PATCH] refactor udp send distinguish tcp/udp get_from_sendring cancel the restrictioin that maximum of 2 rpc msg can be send over the same udp sock --- src/lstack/core/lstack_lwip.c | 107 ++++++++++++++++-------- src/lstack/core/lstack_protocol_stack.c | 39 ++++++++- src/lstack/core/lstack_thread_rpc.c | 29 ++++++- src/lstack/include/lstack_lwip.h | 3 +- src/lstack/include/lstack_rpc_proc.h | 3 +- src/lstack/include/lstack_thread_rpc.h | 3 +- 6 files changed, 141 insertions(+), 43 deletions(-) diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 153c5cc..db948b0 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -101,7 +101,7 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u void *data = rte_pktmbuf_mtod(mbuf, void *); struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ); if (pbuf) { - pbuf->allow_in = 1; + pbuf->allow_append = 1; pbuf->addr = *IP_ANY_TYPE; pbuf->port = 0; pthread_spin_init(&pbuf->pbuf_lock, PTHREAD_PROCESS_SHARED); @@ -227,24 +227,61 @@ struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type typ return init_mbuf_to_pbuf(mbuf, layer, length, type); } -struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags) +static inline bool pbuf_allow_append(struct pbuf *pbuf, uint16_t remain_size) +{ + pthread_spin_lock(&pbuf->pbuf_lock); + if (pbuf->tot_len > remain_size) { + pthread_spin_unlock(&pbuf->pbuf_lock); + return false; + } + if (pbuf->allow_append == 1) { + __sync_fetch_and_sub(&pbuf->allow_append, 1); + } + + pthread_spin_unlock(&pbuf->pbuf_lock); + return true; +} + +struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size) +{ + int count; + /* when remain_size is 0, fill_sendring write one pbuf to sendring */ + if (remain_size == 0) { + count = 1; + } else { + count = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN; + } + + struct pbuf *pbufs[count]; + + int actual_count = gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbufs, count); + if (unlikely(actual_count != count)) { + LSTACK_LOG(ERR, LSTACK, "udp get pbuf from sendring error, expected: %d, actual: %d\n", + count, actual_count); + } + + if (unlikely(pbufs[0]->tot_len != remain_size)) { + LSTACK_LOG(ERR, LSTACK, "udp get pbuf size error, expected: %d, actual: %d\n", + remain_size, pbufs[0]->tot_len); + } + + for (int i = 0; get_protocol_stack_group()->latency_start && i < count; i++) { + calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_WRITE_LWIP, 0); + } + + return pbufs[0]; +} + +struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size) { struct pbuf *pbuf = NULL; if (unlikely(sock->send_pre_del)) { - pbuf = sock->send_pre_del; - pthread_spin_lock(&pbuf->pbuf_lock); - if (pbuf->tot_len > remain_size) { - pthread_spin_unlock(&pbuf->pbuf_lock); - *apiflags &= ~TCP_WRITE_FLAG_MORE; + if (pbuf_allow_append(sock->send_pre_del, remain_size)) { + return sock->send_pre_del; + } else { return NULL; } - if (pbuf->allow_in == 1) { - __sync_fetch_and_sub(&pbuf->allow_in, 1); - } - pthread_spin_unlock(&pbuf->pbuf_lock); - - return pbuf; } gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf, 1); @@ -252,17 +289,6 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s return NULL; } - /* udp send a pbuf chain, dequeue all pbufs except head pbuf */ - if (NETCONN_IS_UDP(sock) && remain_size > MBUF_MAX_DATA_LEN) { - int size = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN - 1; - struct pbuf *pbuf_used[size]; - gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf_used, size); - - for (uint32_t i = 0; get_protocol_stack_group()->latency_start && i < size; i++) { - calculate_lstack_latency(&sock->stack->latency, pbuf_used[i], GAZELLE_LATENCY_WRITE_LWIP, 0); - } - } - if (get_protocol_stack_group()->latency_start) { calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_WRITE_LWIP, 0); } @@ -270,19 +296,11 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s sock->send_pre_del = pbuf; if (!gazelle_ring_readover_count(sock->send_ring)) { - pthread_spin_lock(&pbuf->pbuf_lock); - if (pbuf->tot_len > remain_size) { - pthread_spin_unlock(&pbuf->pbuf_lock); - *apiflags &= ~TCP_WRITE_FLAG_MORE; + if (!pbuf_allow_append(pbuf, remain_size)) { return NULL; } - if (pbuf->allow_in == 1) { - __sync_fetch_and_sub(&pbuf->allow_in, 1); - } - pthread_spin_unlock(&pbuf->pbuf_lock); } else { if (pbuf->tot_len > remain_size) { - *apiflags &= ~TCP_WRITE_FLAG_MORE; return NULL; } } @@ -388,7 +406,7 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r) if (pthread_spin_trylock(&last_pbuf->pbuf_lock) != 0) { return NULL; } - if (last_pbuf->allow_in != 1) { + if (last_pbuf->allow_append != 1) { pthread_spin_unlock(&last_pbuf->pbuf_lock); return NULL; } @@ -675,17 +693,34 @@ ssize_t do_lwip_recvmsg_from_stack(int32_t s, const struct msghdr *message, int3 return buflen; } -static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) +static inline void notice_stack_tcp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) { // 2: call_num >= 2, don't need add new rpc send if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) < 2) { - while (rpc_call_send(&sock->stack->rpc_queue, fd, NULL, len, flags) < 0) { + while (rpc_call_tcp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) { usleep(1000); // 1000: wait 1ms to exec again } __sync_fetch_and_add(&sock->call_num, 1); } } +static inline void notice_stack_udp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) +{ + __sync_fetch_and_add(&sock->call_num, 1); + while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) { + usleep(1000); // 1000: wait 1ms to exec again + } +} + +static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) +{ + if (NETCONN_IS_UDP(sock)) { + notice_stack_udp_send(sock, fd, len, flags); + } else { + notice_stack_tcp_send(sock, fd, len, flags); + } +} + /* process on same node use ring to recv data */ ssize_t gazelle_same_node_ring_recv(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags) { diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index f6d381e..d130c91 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -877,7 +877,7 @@ void stack_recv(struct rpc_msg *msg) msg->args[MSG_ARG_3].i); } -void stack_send(struct rpc_msg *msg) +void stack_tcp_send(struct rpc_msg *msg) { int32_t fd = msg->args[MSG_ARG_0].i; size_t len = msg->args[MSG_ARG_1].size; @@ -913,6 +913,39 @@ void stack_send(struct rpc_msg *msg) return; } +void stack_udp_send(struct rpc_msg *msg) +{ + int32_t fd = msg->args[MSG_ARG_0].i; + size_t len = msg->args[MSG_ARG_1].size; + struct protocol_stack *stack = get_protocol_stack(); + int replenish_again; + uint32_t call_num; + + if (get_protocol_stack_group()->latency_start) { + calculate_rpcmsg_latency(&stack->latency, msg, GAZELLE_LATENCY_WRITE_RPC_MSG); + } + + struct lwip_sock *sock = get_socket(fd); + if (sock == NULL) { + msg->result = -1; + LSTACK_LOG(ERR, LSTACK, "get sock error! fd=%d, len=%ld\n", fd, len); + return; + } + + replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0); + call_num = __sync_fetch_and_sub(&sock->call_num, 1); + if (replenish_again < 0) { + return; + } + + if ((call_num == 1) && (replenish_again > 0)) { + rpc_call_replenish(&stack->rpc_queue, sock); + return; + } + + return; +} + /* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack) { @@ -1040,6 +1073,10 @@ void stack_replenish_sendring(struct rpc_msg *msg) struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p; msg->result = do_lwip_replenish_sendring(stack, sock); + if (msg->result == true) { + msg->recall_flag = 1; + rpc_call(&stack->rpc_queue, msg); + } } void stack_get_conntable(struct rpc_msg *msg) diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index 04bdc3a..e438c37 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -460,13 +460,36 @@ int32_t rpc_call_replenish(rpc_queue *queue, void *sock) } msg->args[MSG_ARG_0].p = sock; + msg->sync_flag = 0; - return rpc_sync_call(queue, msg); + rpc_call(queue, msg); + return 0; +} + +int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags) +{ + struct rpc_msg *msg = rpc_msg_alloc(stack_tcp_send); + if (msg == NULL) { + return -1; + } + + if (get_protocol_stack_group()->latency_start) { + time_stamp_into_rpcmsg(get_socket_by_fd(fd)); + } + + msg->args[MSG_ARG_0].i = fd; + msg->args[MSG_ARG_1].size = len; + msg->args[MSG_ARG_2].i = flags; + msg->sync_flag = 0; + + rpc_call(queue, msg); + + return 0; } -int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags) +int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags) { - struct rpc_msg *msg = rpc_msg_alloc(stack_send); + struct rpc_msg *msg = rpc_msg_alloc(stack_udp_send); if (msg == NULL) { return -1; } diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h index fa10e3f..85c9c20 100644 --- a/src/lstack/include/lstack_lwip.h +++ b/src/lstack/include/lstack_lwip.h @@ -33,7 +33,8 @@ int do_lwip_close(int32_t fd); void do_lwip_init_sock(int32_t fd); void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock); -struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags); +struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size); +struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size); void do_lwip_get_from_sendring_over(struct lwip_sock *sock); bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock); ssize_t do_lwip_read_from_lwip(struct lwip_sock *sock, int32_t flags, uint8_t apiflags); diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h index 71f0c58..77b18bd 100644 --- a/src/lstack/include/lstack_rpc_proc.h +++ b/src/lstack/include/lstack_rpc_proc.h @@ -30,7 +30,8 @@ void stack_getsockopt(struct rpc_msg *msg); void stack_setsockopt(struct rpc_msg *msg); void stack_fcntl(struct rpc_msg *msg); void stack_ioctl(struct rpc_msg *msg); -void stack_send(struct rpc_msg *msg); +void stack_tcp_send(struct rpc_msg *msg); +void stack_udp_send(struct rpc_msg *msg); void stack_mempool_size(struct rpc_msg *msg); void stack_rpcpool_size(struct rpc_msg *msg); void stack_create_shadow_fd(struct rpc_msg *msg); diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index 276ebb2..fa98b0c 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -83,7 +83,8 @@ int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog); int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags); int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen); -int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags); +int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags); +int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags); int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen); -- 2.33.0