From b87de3f10de1839e8dccc64ba01b3d4b7bd114c3 Mon Sep 17 00:00:00 2001 From: wu-changsheng Date: Sat, 8 Oct 2022 19:50:42 +0800 Subject: [PATCH 18/21] merge sendmsg write --- src/lstack/core/lstack_lwip.c | 92 ++++++++++++++++------ src/lstack/core/lstack_protocol_stack.c | 16 ---- src/lstack/core/lstack_thread_rpc.c | 32 +------- src/lstack/include/lstack_protocol_stack.h | 2 - src/lstack/include/lstack_thread_rpc.h | 2 - 5 files changed, 68 insertions(+), 76 deletions(-) diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index bb5a7e5..f4128d7 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -259,6 +259,22 @@ static inline void del_data_out_event(struct lwip_sock *sock) pthread_spin_unlock(&sock->wakeup->event_list_lock); } +void write_stack_over(struct lwip_sock *sock) +{ + if (sock->send_lastdata) { + sock->send_lastdata->tot_len = sock->send_lastdata->len = sock->send_datalen; + sock->send_lastdata = NULL; + } + + gazelle_ring_read_over(sock->send_ring); + + if (sock->wakeup) { + if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) { + del_data_out_event(sock); + } + } +} + ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) { if (sock->errevent > 0) { @@ -272,31 +288,37 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len) struct pbuf *pbuf = NULL; ssize_t send_len = 0; - size_t copy_len; uint32_t send_pkt = 0; while (send_len < len && send_pkt < free_count) { - if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) { - if (sock->wakeup) { - sock->wakeup->stat.app_write_idlefail++; + if (sock->send_lastdata) { + pbuf = sock->send_lastdata; + } else { + if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) { + if (sock->wakeup) { + sock->wakeup->stat.app_write_idlefail++; + } + break; } - break; + sock->send_lastdata = pbuf; + sock->send_datalen = 0; } - copy_len = (len - send_len > pbuf->len) ? pbuf->len : (len - send_len); - pbuf_take(pbuf, (char *)buf + send_len, copy_len); - pbuf->tot_len = pbuf->len = copy_len; + uint16_t remian_len = pbuf->len - sock->send_datalen; + uint16_t copy_len = (len - send_len > remian_len) ? remian_len : (len - send_len); + pbuf_take_at(pbuf, (char *)buf + send_len, copy_len, sock->send_datalen); + sock->send_datalen += copy_len; + if (sock->send_datalen >= pbuf->len) { + sock->send_lastdata = NULL; + pbuf->tot_len = pbuf->len = sock->send_datalen; + send_pkt++; + } send_len += copy_len; - send_pkt++; } - gazelle_ring_read_over(sock->send_ring); if (sock->wakeup) { sock->wakeup->stat.app_write_cnt += send_pkt; - if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) { - del_data_out_event(sock); - } } return send_len; @@ -500,6 +522,16 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags) return buflen; } +static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags) +{ + if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) { + __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); + if (rpc_call_send(fd, NULL, len, flags) != 0) { + __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE); + } + } +} + ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags) { if (buf == NULL) { @@ -516,18 +548,12 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags) } ssize_t send = write_stack_data(sock, buf, len); - if (send < 0) { - GAZELLE_RETURN(EAGAIN); - } else if (send == 0) { - return 0; + if (send <= 0) { + return send; } + write_stack_over(sock); - if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) { - __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE); - if (rpc_call_send(fd, NULL, send, flags) != 0) { - __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE); - } - } + notice_stack_send(sock, fd, send, flags); return send; } @@ -537,23 +563,37 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags) int32_t i; ssize_t buflen = 0; + struct lwip_sock *sock = get_socket(s); + if (sock == NULL) { + GAZELLE_RETURN(EINVAL); + } + if (check_msg_vaild(message)) { GAZELLE_RETURN(EINVAL); } for (i = 0; i < message->msg_iovlen; i++) { - ret = gazelle_send(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags); + if (message->msg_iov[i].iov_len == 0) { + continue; + } + + ret = write_stack_data(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len); if (ret < 0) { - return buflen == 0 ? ret : buflen; + buflen = (buflen == 0) ? ret : buflen; + break; } buflen += ret; if (ret < message->msg_iov[i].iov_len) { - return buflen; + break; } } + if (buflen > 0) { + write_stack_over(sock); + notice_stack_send(sock, s, buflen, flags); + } return buflen; } diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 6119975..fbeca62 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -608,22 +608,6 @@ void stack_recv(struct rpc_msg *msg) msg->args[MSG_ARG_3].i); } -void stack_sendmsg(struct rpc_msg *msg) -{ - msg->result = lwip_sendmsg(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].i); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_recvmsg(struct rpc_msg *msg) -{ - msg->result = lwip_recvmsg(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].i); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - /* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack) { diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index d1f7580..ad967e9 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -236,7 +236,8 @@ int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf) msg->self_release = 0; msg->args[MSG_ARG_0].p = mbuf; - lockless_queue_mpsc_push(&stack->rpc_queue, &msg->queue_node); + + rpc_call(&stack->rpc_queue, msg); return 0; } @@ -446,32 +447,3 @@ int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags) return 0; } -int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags) -{ - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendmsg); - if (msg == NULL) { - return -1; - } - - msg->args[MSG_ARG_0].i = fd; - msg->args[MSG_ARG_1].cp = msghdr; - msg->args[MSG_ARG_2].i = flags; - - return rpc_sync_call(&stack->rpc_queue, msg); -} - -int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags) -{ - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvmsg); - if (msg == NULL) { - return -1; - } - - msg->args[MSG_ARG_0].i = fd; - msg->args[MSG_ARG_1].p = msghdr; - msg->args[MSG_ARG_2].i = flags; - - return rpc_sync_call(&stack->rpc_queue, msg); -} diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index a791357..f58ae56 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -124,8 +124,6 @@ void stack_listen(struct rpc_msg *msg); void stack_accept(struct rpc_msg *msg); void stack_connect(struct rpc_msg *msg); void stack_recv(struct rpc_msg *msg); -void stack_sendmsg(struct rpc_msg *msg); -void stack_recvmsg(struct rpc_msg *msg); void stack_getpeername(struct rpc_msg *msg); void stack_getsockname(struct rpc_msg *msg); void stack_getsockopt(struct rpc_msg *msg); diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index e1223de..175c8c9 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -67,8 +67,6 @@ int32_t rpc_call_listen(int s, int backlog); int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen); int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen); int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags); -int32_t rpc_call_sendmsg(int fd, const struct msghdr *msg, int flags); -int32_t rpc_call_recvmsg(int fd, struct msghdr *msg, int flags); int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen); int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen); int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen); -- 2.23.0