276 lines
9.6 KiB
Diff
276 lines
9.6 KiB
Diff
From b87de3f10de1839e8dccc64ba01b3d4b7bd114c3 Mon Sep 17 00:00:00 2001
|
|
From: wu-changsheng <wuchangsheng2@huawei.com>
|
|
Date: Sat, 8 Oct 2022 19:50:42 +0800
|
|
Subject: [PATCH 18/21] merge sendmsg write
|
|
|
|
---
|
|
src/lstack/core/lstack_lwip.c | 92 ++++++++++++++++------
|
|
src/lstack/core/lstack_protocol_stack.c | 16 ----
|
|
src/lstack/core/lstack_thread_rpc.c | 32 +-------
|
|
src/lstack/include/lstack_protocol_stack.h | 2 -
|
|
src/lstack/include/lstack_thread_rpc.h | 2 -
|
|
5 files changed, 68 insertions(+), 76 deletions(-)
|
|
|
|
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
|
|
index bb5a7e5..f4128d7 100644
|
|
--- a/src/lstack/core/lstack_lwip.c
|
|
+++ b/src/lstack/core/lstack_lwip.c
|
|
@@ -259,6 +259,22 @@ static inline void del_data_out_event(struct lwip_sock *sock)
|
|
pthread_spin_unlock(&sock->wakeup->event_list_lock);
|
|
}
|
|
|
|
+void write_stack_over(struct lwip_sock *sock)
|
|
+{
|
|
+ if (sock->send_lastdata) {
|
|
+ sock->send_lastdata->tot_len = sock->send_lastdata->len = sock->send_datalen;
|
|
+ sock->send_lastdata = NULL;
|
|
+ }
|
|
+
|
|
+ gazelle_ring_read_over(sock->send_ring);
|
|
+
|
|
+ if (sock->wakeup) {
|
|
+ if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
|
|
+ del_data_out_event(sock);
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
|
|
{
|
|
if (sock->errevent > 0) {
|
|
@@ -272,31 +288,37 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
|
|
|
|
struct pbuf *pbuf = NULL;
|
|
ssize_t send_len = 0;
|
|
- size_t copy_len;
|
|
uint32_t send_pkt = 0;
|
|
|
|
while (send_len < len && send_pkt < free_count) {
|
|
- if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) {
|
|
- if (sock->wakeup) {
|
|
- sock->wakeup->stat.app_write_idlefail++;
|
|
+ if (sock->send_lastdata) {
|
|
+ pbuf = sock->send_lastdata;
|
|
+ } else {
|
|
+ if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) {
|
|
+ if (sock->wakeup) {
|
|
+ sock->wakeup->stat.app_write_idlefail++;
|
|
+ }
|
|
+ break;
|
|
}
|
|
- break;
|
|
+ sock->send_lastdata = pbuf;
|
|
+ sock->send_datalen = 0;
|
|
}
|
|
|
|
- copy_len = (len - send_len > pbuf->len) ? pbuf->len : (len - send_len);
|
|
- pbuf_take(pbuf, (char *)buf + send_len, copy_len);
|
|
- pbuf->tot_len = pbuf->len = copy_len;
|
|
+ uint16_t remian_len = pbuf->len - sock->send_datalen;
|
|
+ uint16_t copy_len = (len - send_len > remian_len) ? remian_len : (len - send_len);
|
|
+ pbuf_take_at(pbuf, (char *)buf + send_len, copy_len, sock->send_datalen);
|
|
+ sock->send_datalen += copy_len;
|
|
+ if (sock->send_datalen >= pbuf->len) {
|
|
+ sock->send_lastdata = NULL;
|
|
+ pbuf->tot_len = pbuf->len = sock->send_datalen;
|
|
+ send_pkt++;
|
|
+ }
|
|
|
|
send_len += copy_len;
|
|
- send_pkt++;
|
|
}
|
|
- gazelle_ring_read_over(sock->send_ring);
|
|
|
|
if (sock->wakeup) {
|
|
sock->wakeup->stat.app_write_cnt += send_pkt;
|
|
- if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
|
|
- del_data_out_event(sock);
|
|
- }
|
|
}
|
|
|
|
return send_len;
|
|
@@ -500,6 +522,16 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags)
|
|
return buflen;
|
|
}
|
|
|
|
+static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
|
|
+{
|
|
+ if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) {
|
|
+ __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
|
|
+ if (rpc_call_send(fd, NULL, len, flags) != 0) {
|
|
+ __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
|
|
{
|
|
if (buf == NULL) {
|
|
@@ -516,18 +548,12 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
|
|
}
|
|
|
|
ssize_t send = write_stack_data(sock, buf, len);
|
|
- if (send < 0) {
|
|
- GAZELLE_RETURN(EAGAIN);
|
|
- } else if (send == 0) {
|
|
- return 0;
|
|
+ if (send <= 0) {
|
|
+ return send;
|
|
}
|
|
+ write_stack_over(sock);
|
|
|
|
- if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) {
|
|
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
|
|
- if (rpc_call_send(fd, NULL, send, flags) != 0) {
|
|
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
|
|
- }
|
|
- }
|
|
+ notice_stack_send(sock, fd, send, flags);
|
|
return send;
|
|
}
|
|
|
|
@@ -537,23 +563,37 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
|
|
int32_t i;
|
|
ssize_t buflen = 0;
|
|
|
|
+ struct lwip_sock *sock = get_socket(s);
|
|
+ if (sock == NULL) {
|
|
+ GAZELLE_RETURN(EINVAL);
|
|
+ }
|
|
+
|
|
if (check_msg_vaild(message)) {
|
|
GAZELLE_RETURN(EINVAL);
|
|
}
|
|
|
|
for (i = 0; i < message->msg_iovlen; i++) {
|
|
- ret = gazelle_send(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags);
|
|
+ if (message->msg_iov[i].iov_len == 0) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ ret = write_stack_data(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len);
|
|
if (ret < 0) {
|
|
- return buflen == 0 ? ret : buflen;
|
|
+ buflen = (buflen == 0) ? ret : buflen;
|
|
+ break;
|
|
}
|
|
|
|
buflen += ret;
|
|
|
|
if (ret < message->msg_iov[i].iov_len) {
|
|
- return buflen;
|
|
+ break;
|
|
}
|
|
}
|
|
|
|
+ if (buflen > 0) {
|
|
+ write_stack_over(sock);
|
|
+ notice_stack_send(sock, s, buflen, flags);
|
|
+ }
|
|
return buflen;
|
|
}
|
|
|
|
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
|
|
index 6119975..fbeca62 100644
|
|
--- a/src/lstack/core/lstack_protocol_stack.c
|
|
+++ b/src/lstack/core/lstack_protocol_stack.c
|
|
@@ -608,22 +608,6 @@ void stack_recv(struct rpc_msg *msg)
|
|
msg->args[MSG_ARG_3].i);
|
|
}
|
|
|
|
-void stack_sendmsg(struct rpc_msg *msg)
|
|
-{
|
|
- msg->result = lwip_sendmsg(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].i);
|
|
- if (msg->result != 0) {
|
|
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
|
|
- }
|
|
-}
|
|
-
|
|
-void stack_recvmsg(struct rpc_msg *msg)
|
|
-{
|
|
- msg->result = lwip_recvmsg(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].i);
|
|
- if (msg->result != 0) {
|
|
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
|
|
- }
|
|
-}
|
|
-
|
|
/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
|
|
void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
|
|
{
|
|
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
|
|
index d1f7580..ad967e9 100644
|
|
--- a/src/lstack/core/lstack_thread_rpc.c
|
|
+++ b/src/lstack/core/lstack_thread_rpc.c
|
|
@@ -236,7 +236,8 @@ int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf)
|
|
|
|
msg->self_release = 0;
|
|
msg->args[MSG_ARG_0].p = mbuf;
|
|
- lockless_queue_mpsc_push(&stack->rpc_queue, &msg->queue_node);
|
|
+
|
|
+ rpc_call(&stack->rpc_queue, msg);
|
|
|
|
return 0;
|
|
}
|
|
@@ -446,32 +447,3 @@ int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
|
|
return 0;
|
|
}
|
|
|
|
-int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags)
|
|
-{
|
|
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
|
|
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendmsg);
|
|
- if (msg == NULL) {
|
|
- return -1;
|
|
- }
|
|
-
|
|
- msg->args[MSG_ARG_0].i = fd;
|
|
- msg->args[MSG_ARG_1].cp = msghdr;
|
|
- msg->args[MSG_ARG_2].i = flags;
|
|
-
|
|
- return rpc_sync_call(&stack->rpc_queue, msg);
|
|
-}
|
|
-
|
|
-int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags)
|
|
-{
|
|
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
|
|
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvmsg);
|
|
- if (msg == NULL) {
|
|
- return -1;
|
|
- }
|
|
-
|
|
- msg->args[MSG_ARG_0].i = fd;
|
|
- msg->args[MSG_ARG_1].p = msghdr;
|
|
- msg->args[MSG_ARG_2].i = flags;
|
|
-
|
|
- return rpc_sync_call(&stack->rpc_queue, msg);
|
|
-}
|
|
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
|
|
index a791357..f58ae56 100644
|
|
--- a/src/lstack/include/lstack_protocol_stack.h
|
|
+++ b/src/lstack/include/lstack_protocol_stack.h
|
|
@@ -124,8 +124,6 @@ void stack_listen(struct rpc_msg *msg);
|
|
void stack_accept(struct rpc_msg *msg);
|
|
void stack_connect(struct rpc_msg *msg);
|
|
void stack_recv(struct rpc_msg *msg);
|
|
-void stack_sendmsg(struct rpc_msg *msg);
|
|
-void stack_recvmsg(struct rpc_msg *msg);
|
|
void stack_getpeername(struct rpc_msg *msg);
|
|
void stack_getsockname(struct rpc_msg *msg);
|
|
void stack_getsockopt(struct rpc_msg *msg);
|
|
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
|
|
index e1223de..175c8c9 100644
|
|
--- a/src/lstack/include/lstack_thread_rpc.h
|
|
+++ b/src/lstack/include/lstack_thread_rpc.h
|
|
@@ -67,8 +67,6 @@ int32_t rpc_call_listen(int s, int backlog);
|
|
int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen);
|
|
int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen);
|
|
int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags);
|
|
-int32_t rpc_call_sendmsg(int fd, const struct msghdr *msg, int flags);
|
|
-int32_t rpc_call_recvmsg(int fd, struct msghdr *msg, int flags);
|
|
int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen);
|
|
int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen);
|
|
int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen);
|
|
--
|
|
2.23.0
|
|
|