From 388b2202a0b248026e77ef2c340144ed547c87b7 Mon Sep 17 00:00:00 2001 From: jiangheng12 Date: Wed, 14 Jun 2023 11:58:01 +0800 Subject: [PATCH] fix udp send/recv in muliple queue --- src/lstack/api/lstack_wrap.c | 37 ++++++++++++++++++-- src/lstack/core/lstack_dpdk.c | 2 +- src/lstack/core/lstack_lwip.c | 36 +++++++++++++++----- src/lstack/core/lstack_protocol_stack.c | 39 ++++++++++++++++++++++ src/lstack/include/lstack_protocol_stack.h | 4 +++ 5 files changed, 106 insertions(+), 12 deletions(-) diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c index 7245873..1f33e13 100644 --- a/src/lstack/api/lstack_wrap.c +++ b/src/lstack/api/lstack_wrap.c @@ -236,7 +236,11 @@ static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen } } - return rpc_call_bind(s, name, namelen); + if (NETCONN_IS_UDP(sock) && get_global_cfg_params()->listen_shadow) { + return stack_broadcast_bind(s, name, namelen); + } else { + return stack_single_bind(s, name, namelen); + } } bool is_dst_ip_localhost(const struct sockaddr *addr) @@ -548,6 +552,31 @@ static inline ssize_t do_sendmsg(int32_t s, const struct msghdr *message, int32_ return posix_api->send_msg(s, message, flags); } +static inline ssize_t udp_recvfrom(struct lwip_sock *sock, int32_t sockfd, void *buf, size_t len, int32_t flags, + struct sockaddr *addr, socklen_t *addrlen) +{ + int32_t ret; + + do { + ret = read_stack_data(sockfd, buf, len, flags, addr, addrlen); + if (ret > 0) { + return ret; + } + if (ret <= 0 && errno != EAGAIN) { + return -1; + } + sock = sock->listen_next; + sockfd = sock->conn->socket; + } while (sock != NULL); + GAZELLE_RETURN(EAGAIN); +} + +static inline ssize_t tcp_recvfrom(struct lwip_sock *sock, int32_t sockfd, void *buf, size_t len, int32_t flags, + struct sockaddr *addr, socklen_t *addrlen) +{ + return read_stack_data(sockfd, buf, len, flags, addr, addrlen); +} + static inline ssize_t do_recvfrom(int32_t sockfd, void *buf, size_t len, int32_t flags, struct sockaddr *addr, socklen_t *addrlen) { @@ -561,7 +590,11 @@ static inline ssize_t do_recvfrom(int32_t sockfd, void *buf, size_t len, int32_t struct lwip_sock *sock = NULL; if (select_path(sockfd, &sock) == PATH_LWIP) { - return read_stack_data(sockfd, buf, len, flags, addr, addrlen); + if (NETCONN_IS_UDP(sock)) { + return udp_recvfrom(sock, sockfd, buf, len, flags, addr, addrlen); + } else { + return tcp_recvfrom(sock, sockfd, buf, len, flags, addr, addrlen); + } } return posix_api->recv_from(sockfd, buf, len, flags, addr, addrlen); diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index b321c18..169025c 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -362,7 +362,7 @@ uint64_t get_eth_params_tx_ol(void) static int eth_params_rss(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev_info) { int rss_enable = 0; - uint64_t def_rss_hf = ETH_RSS_TCP | ETH_RSS_IP; + uint64_t def_rss_hf = ETH_RSS_TCP | ETH_RSS_UDP | ETH_RSS_IP; struct rte_eth_rss_conf rss_conf = { g_default_rss_key, RSS_HASH_KEY_LEN, diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 2e7a67a..34b4aa7 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -1230,13 +1230,19 @@ static inline void clone_lwip_socket_opt(struct lwip_sock *dst_sock, struct lwip dst_sock->conn->pcb.ip->so_options = src_sock->conn->pcb.ip->so_options; dst_sock->conn->pcb.ip->ttl = src_sock->conn->pcb.ip->ttl; dst_sock->conn->pcb.ip->tos = src_sock->conn->pcb.ip->tos; - dst_sock->conn->pcb.tcp->netif_idx = src_sock->conn->pcb.tcp->netif_idx; - dst_sock->conn->pcb.tcp->flags = src_sock->conn->pcb.tcp->flags; - dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle; - dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle; - dst_sock->conn->pcb.tcp->keep_intvl = src_sock->conn->pcb.tcp->keep_intvl; - dst_sock->conn->pcb.tcp->keep_cnt = src_sock->conn->pcb.tcp->keep_cnt; dst_sock->conn->flags = src_sock->conn->flags; + if (NETCONN_IS_UDP(src_sock)) { + dst_sock->conn->pcb.udp->flags = src_sock->conn->pcb.udp->flags; + dst_sock->conn->pcb.udp->mcast_ifindex = src_sock->conn->pcb.udp->mcast_ifindex; + dst_sock->conn->pcb.udp->mcast_ttl = src_sock->conn->pcb.udp->mcast_ttl; + } else { + dst_sock->conn->pcb.tcp->netif_idx = src_sock->conn->pcb.tcp->netif_idx; + dst_sock->conn->pcb.tcp->flags = src_sock->conn->pcb.tcp->flags; + dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle; + dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle; + dst_sock->conn->pcb.tcp->keep_intvl = src_sock->conn->pcb.tcp->keep_intvl; + dst_sock->conn->pcb.tcp->keep_cnt = src_sock->conn->pcb.tcp->keep_cnt; + } } int32_t gazelle_socket(int domain, int type, int protocol) @@ -1265,16 +1271,28 @@ void create_shadow_fd(struct rpc_msg *msg) struct sockaddr *addr = msg->args[MSG_ARG_1].p; socklen_t addr_len = msg->args[MSG_ARG_2].socklen; - int32_t clone_fd = gazelle_socket(AF_INET, SOCK_STREAM, 0); + int32_t clone_fd = 0; + struct lwip_sock *sock = get_socket_by_fd(fd); + if (sock == NULL) { + LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd); + msg->result = -1; + return; + } + + if (NETCONN_IS_UDP(sock)) { + clone_fd = gazelle_socket(AF_INET, SOCK_DGRAM, 0); + } else { + clone_fd = gazelle_socket(AF_INET, SOCK_STREAM, 0); + } + if (clone_fd < 0) { LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno); msg->result = clone_fd; return; } - struct lwip_sock *sock = get_socket_by_fd(fd); struct lwip_sock *clone_sock = get_socket_by_fd(clone_fd); - if (sock == NULL || clone_sock == NULL) { + if (clone_sock == NULL) { LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd); msg->result = -1; return; diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 6c96555..52a0c8f 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -935,6 +935,45 @@ static void inline del_accept_in_event(struct lwip_sock *sock) pthread_spin_unlock(&sock->wakeup->event_list_lock); } +/* choice one stack bind */ +int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen) +{ + return rpc_call_bind(fd, name, namelen); +} + +/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */ +int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen) +{ + struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd); + struct protocol_stack *stack = NULL; + int32_t ret, clone_fd; + + struct lwip_sock *sock = get_socket(fd); + if (sock == NULL) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd); + GAZELLE_RETURN(EINVAL); + } + + ret = rpc_call_bind(fd, name, namelen); + if (ret < 0) { + close(fd); + return ret; + } + + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + for (int32_t i = 0; i < stack_group->stack_num; ++i) { + stack = stack_group->stacks[i]; + if (stack != cur_stack) { + clone_fd = rpc_call_shadow_fd(stack, fd, name, namelen); + if (clone_fd < 0) { + stack_broadcast_close(fd); + return clone_fd; + } + } + } + return 0; +} + /* ergodic the protocol stack thread to find the connection, because all threads are listening */ int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int flags) { diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index 3a447dc..a23ddff 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -128,6 +128,10 @@ int32_t stack_broadcast_close(int32_t fd); int32_t stack_broadcast_listen(int32_t fd, int backlog); int32_t stack_single_listen(int32_t fd, int32_t backlog); +/* bind sync to all protocol stack thread, only for udp protocol */ +int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen); +int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen); + /* ergodic the protocol stack thread to find the connection, because all threads are listening */ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen); int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int32_t flags); -- 2.23.0