807 lines
30 KiB
Diff
807 lines
30 KiB
Diff
From fadeb43a653ab5da503b7030b60b4e063f0b3aef Mon Sep 17 00:00:00 2001
|
|
From: wuchangsheng <wuchangsheng2@huawei.com>
|
|
Date: Sun, 13 Mar 2022 22:57:44 +0800
|
|
Subject: [PATCH 24/34] refactor event
|
|
|
|
---
|
|
src/common/gazelle_dfx_msg.h | 10 +++-
|
|
src/lstack/api/lstack_epoll.c | 73 +++++++++++++++++---------
|
|
src/lstack/core/lstack_control_plane.c | 13 +++++
|
|
src/lstack/core/lstack_lwip.c | 76 ++++++++++++++++++++-------
|
|
src/lstack/core/lstack_protocol_stack.c | 36 ++++++++++---
|
|
src/lstack/core/lstack_stack_stat.c | 4 ++
|
|
src/lstack/core/lstack_thread_rpc.c | 42 +++++++++++++++
|
|
src/lstack/include/lstack_lwip.h | 3 ++
|
|
src/lstack/include/lstack_protocol_stack.h | 2 +
|
|
src/lstack/include/lstack_thread_rpc.h | 3 ++
|
|
src/lstack/include/lstack_weakup.h | 84 +++++++++++++++++++++---------
|
|
src/ltran/ltran_dfx.c | 36 ++++++++-----
|
|
12 files changed, 290 insertions(+), 92 deletions(-)
|
|
|
|
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
|
|
index cea4200..ae20436 100644
|
|
--- a/src/common/gazelle_dfx_msg.h
|
|
+++ b/src/common/gazelle_dfx_msg.h
|
|
@@ -64,10 +64,12 @@ struct gazelle_stat_pkts {
|
|
uint64_t rx_drop;
|
|
uint64_t rx_allocmbuf_fail;
|
|
uint64_t tx_allocmbuf_fail;
|
|
- uint16_t weakup_ring_cnt;
|
|
uint64_t call_msg_cnt;
|
|
+ uint16_t weakup_ring_cnt;
|
|
uint16_t conn_num;
|
|
uint16_t send_idle_ring_cnt;
|
|
+ uint64_t event_list;
|
|
+ uint64_t wakeup_list;
|
|
uint64_t read_lwip_drop;
|
|
uint64_t read_lwip_cnt;
|
|
uint64_t write_lwip_drop;
|
|
@@ -89,6 +91,10 @@ struct gazelle_stat_pkts {
|
|
uint64_t send_self_rpc;
|
|
uint64_t call_null;
|
|
uint64_t arp_copy_fail;
|
|
+ uint64_t epoll_pending;
|
|
+ uint64_t epoll_pending_call;
|
|
+ uint64_t epoll_self_call;
|
|
+ uint64_t epoll_self_event;
|
|
};
|
|
|
|
/* same as define in lwip/stats.h - struct stats_mib2 */
|
|
@@ -162,6 +168,8 @@ struct gazelle_stat_lstack_conn_info {
|
|
uint32_t send_ring_cnt;
|
|
uint32_t recv_ring_cnt;
|
|
uint32_t tcp_sub_state;
|
|
+ uint32_t event_ring_cnt;
|
|
+ uint32_t self_ring_cnt;
|
|
};
|
|
|
|
struct gazelle_stat_lstack_conn {
|
|
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
|
|
index bcbb35e..a686ddb 100644
|
|
--- a/src/lstack/api/lstack_epoll.c
|
|
+++ b/src/lstack/api/lstack_epoll.c
|
|
@@ -87,19 +87,27 @@ void add_epoll_event(struct netconn *conn, uint32_t event)
|
|
return;
|
|
}
|
|
|
|
- sock->have_event = true;
|
|
- weakup_enqueue(sock->stack->weakup_ring, sock);
|
|
- sock->stack->stats.weakup_events++;
|
|
+ if (weakup_enqueue(sock->stack->weakup_ring, sock)) {
|
|
+ if (list_is_empty(&sock->event_list)) {
|
|
+ list_add_node(&sock->stack->event_list, &sock->event_list);
|
|
+ }
|
|
+ } else {
|
|
+ sock->have_event = true;
|
|
+ sock->stack->stats.weakup_events++;
|
|
+ }
|
|
}
|
|
|
|
static void raise_pending_events(struct lwip_sock *sock)
|
|
{
|
|
- if (!sock->conn) {
|
|
+ struct weakup_poll *wakeup = sock->weakup;
|
|
+ struct protocol_stack *stack = sock->stack;
|
|
+ struct netconn *conn = sock->conn;
|
|
+ if (wakeup == NULL || stack == NULL || conn == NULL) {
|
|
return;
|
|
}
|
|
|
|
struct lwip_sock *attach_sock = NULL;
|
|
- if (sock->attach_fd > 0 && sock->attach_fd != sock->conn->socket) {
|
|
+ if (sock->attach_fd > 0 && sock->attach_fd != conn->socket) {
|
|
attach_sock = get_socket_by_fd(sock->attach_fd);
|
|
if (attach_sock == NULL) {
|
|
return;
|
|
@@ -108,7 +116,10 @@ static void raise_pending_events(struct lwip_sock *sock)
|
|
attach_sock = sock;
|
|
}
|
|
|
|
- struct netconn *conn = attach_sock->conn;
|
|
+ conn = attach_sock->conn;
|
|
+ if (conn == NULL) {
|
|
+ return;
|
|
+ }
|
|
struct tcp_pcb *tcp = conn->pcb.tcp;
|
|
if ((tcp == NULL) || (tcp->state < ESTABLISHED)) {
|
|
return;
|
|
@@ -132,10 +143,17 @@ static void raise_pending_events(struct lwip_sock *sock)
|
|
event |= POLLERR | POLLIN;
|
|
}
|
|
|
|
- if (event != 0) {
|
|
- sock->events |= event;
|
|
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
|
|
- sem_post(&sock->weakup->event_sem);
|
|
+ if (event == 0) {
|
|
+ return;
|
|
+ }
|
|
+ sock->events |= event;
|
|
+ if (rte_ring_mp_enqueue(wakeup->event_ring, (void *)sock) == 0 ||
|
|
+ rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
|
|
+ sem_post(&wakeup->event_sem);
|
|
+ stack->stats.epoll_pending++;
|
|
+ } else {
|
|
+ rpc_call_addevent(stack, sock);
|
|
+ stack->stats.epoll_pending_call++;
|
|
}
|
|
}
|
|
|
|
@@ -168,6 +186,12 @@ int32_t lstack_epoll_create(int32_t size)
|
|
GAZELLE_RETURN(ENOMEM);
|
|
}
|
|
|
|
+ weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd);
|
|
+ if (weakup->self_ring == NULL) {
|
|
+ posix_api->close_fn(fd);
|
|
+ GAZELLE_RETURN(ENOMEM);
|
|
+ }
|
|
+
|
|
sock->weakup = weakup;
|
|
|
|
return fd;
|
|
@@ -247,11 +271,6 @@ static inline int32_t save_poll_event(struct pollfd *fds, uint32_t maxevents, st
|
|
|
|
static bool remove_event(enum POLL_TYPE etype, struct lwip_sock **sock_list, int32_t event_num, struct lwip_sock *sock)
|
|
{
|
|
- /* close sock */
|
|
- if (sock->stack == NULL) {
|
|
- return true;
|
|
- }
|
|
-
|
|
/* remove duplicate event */
|
|
for (uint32_t i = 0; i < event_num && etype == TYPE_EPOLL; i++) {
|
|
if (sock_list[i] == sock) {
|
|
@@ -267,29 +286,26 @@ static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t m
|
|
struct epoll_event *events = (struct epoll_event *)out;
|
|
struct pollfd *fds = (struct pollfd *)out;
|
|
|
|
- uint32_t events_cnt = rte_ring_count(weakup->event_ring);
|
|
- if (events_cnt == 0) {
|
|
- return 0;
|
|
- }
|
|
|
|
if (etype == TYPE_EPOLL) {
|
|
maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents);
|
|
}
|
|
- events_cnt = LWIP_MIN(events_cnt, maxevents);
|
|
int32_t event_num = 0;
|
|
struct lwip_sock *sock = NULL;
|
|
|
|
- while (event_num < events_cnt) {
|
|
- int32_t ret = rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock);
|
|
- if (ret != 0) {
|
|
+ while (event_num < maxevents) {
|
|
+ if (rte_ring_sc_dequeue(weakup->self_ring, (void **)&sock) &&
|
|
+ rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock)) {
|
|
break;
|
|
}
|
|
+ /* close sock */
|
|
+ if (sock->stack == NULL) {
|
|
+ return true;
|
|
+ }
|
|
sock->have_event = false;
|
|
|
|
if (remove_event(etype, weakup->sock_list, event_num, sock)) {
|
|
- if (sock->stack) {
|
|
- sock->stack->stats.remove_event++;
|
|
- }
|
|
+ sock->stack->stats.remove_event++;
|
|
continue;
|
|
}
|
|
|
|
@@ -390,6 +406,11 @@ static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *we
|
|
if (weakup->event_ring == NULL) {
|
|
GAZELLE_RETURN(ENOMEM);
|
|
}
|
|
+
|
|
+ weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid());
|
|
+ if (weakup->self_ring == NULL) {
|
|
+ GAZELLE_RETURN(ENOMEM);
|
|
+ }
|
|
}
|
|
|
|
for (uint32_t i = 0; i < nfds; i++) {
|
|
diff --git a/src/lstack/core/lstack_control_plane.c b/src/lstack/core/lstack_control_plane.c
|
|
index a7e084d..c782d51 100644
|
|
--- a/src/lstack/core/lstack_control_plane.c
|
|
+++ b/src/lstack/core/lstack_control_plane.c
|
|
@@ -38,6 +38,7 @@
|
|
#define RECONNECT_TO_LTRAN_DELAY (1)
|
|
#define GAZELLE_BADFD (-1)
|
|
#define GAZELLE_LISTEN_BACKLOG 5
|
|
+#define GAZELLE_10MS (10000)
|
|
|
|
static int32_t g_data_fd = -1;
|
|
static volatile bool g_register_state = true;
|
|
@@ -701,6 +702,12 @@ void control_server_thread(void *arg)
|
|
int32_t num, connfd;
|
|
struct epoll_event evt_array;
|
|
while (1) {
|
|
+ /* wait init finish */
|
|
+ if (posix_api->is_chld) {
|
|
+ usleep(GAZELLE_10MS);
|
|
+ continue;
|
|
+ }
|
|
+
|
|
num = posix_api->epoll_wait_fn(epfd, &evt_array, 1, -1);
|
|
if (num <= 0) {
|
|
continue;
|
|
@@ -741,6 +748,12 @@ void control_client_thread(void *arg)
|
|
}
|
|
|
|
while (1) {
|
|
+ /* wait init finish */
|
|
+ if (posix_api->is_chld) {
|
|
+ usleep(GAZELLE_10MS);
|
|
+ continue;
|
|
+ }
|
|
+
|
|
if (sockfd < 0) {
|
|
set_register_state(false);
|
|
sockfd = client_reg_proc_reconnect(epfd);
|
|
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
|
|
index b157517..d55f1e6 100644
|
|
--- a/src/lstack/core/lstack_lwip.c
|
|
+++ b/src/lstack/core/lstack_lwip.c
|
|
@@ -110,6 +110,8 @@ void gazelle_init_sock(int32_t fd)
|
|
init_list_node(&sock->recv_list);
|
|
init_list_node(&sock->attach_list);
|
|
init_list_node(&sock->listen_list);
|
|
+ init_list_node(&sock->event_list);
|
|
+ init_list_node(&sock->wakeup_list);
|
|
}
|
|
|
|
void gazelle_clean_sock(int32_t fd)
|
|
@@ -126,6 +128,8 @@ void gazelle_clean_sock(int32_t fd)
|
|
list_del_node_init(&sock->recv_list);
|
|
list_del_node_init(&sock->attach_list);
|
|
list_del_node_init(&sock->listen_list);
|
|
+ list_del_node_init(&sock->event_list);
|
|
+ list_del_node_init(&sock->wakeup_list);
|
|
}
|
|
|
|
void gazelle_free_pbuf(struct pbuf *pbuf)
|
|
@@ -266,6 +270,30 @@ ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags)
|
|
return (send_ret < 0) ? send_ret : send_len;
|
|
}
|
|
|
|
+void add_self_event(struct lwip_sock *sock, uint32_t events)
|
|
+{
|
|
+ struct weakup_poll *wakeup = sock->weakup;
|
|
+ struct protocol_stack *stack = sock->stack;
|
|
+ if (wakeup == NULL || stack == NULL) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ sock->events |= events;
|
|
+
|
|
+ if (sock->have_event) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
|
|
+ sock->have_event = true;
|
|
+ sem_post(&sock->weakup->event_sem);
|
|
+ stack->stats.epoll_self_event++;
|
|
+ } else {
|
|
+ rpc_call_addevent(stack, sock);
|
|
+ stack->stats.epoll_self_call++;
|
|
+ }
|
|
+}
|
|
+
|
|
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
|
|
{
|
|
uint32_t free_count = rte_ring_free_count(sock->send_ring);
|
|
@@ -303,14 +331,10 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
|
|
send_pkt++;
|
|
}
|
|
|
|
- if (!sock->have_event && (sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) {
|
|
- sock->have_event = true;
|
|
- sock->events |= EPOLLOUT;
|
|
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
|
|
- sem_post(&sock->weakup->event_sem);
|
|
+ if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) {
|
|
+ add_self_event(sock, EPOLLOUT);
|
|
sock->stack->stats.write_events++;
|
|
- }
|
|
- if (!NETCONN_IS_DATAOUT(sock)) {
|
|
+ } else {
|
|
sock->events &= ~EPOLLOUT;
|
|
}
|
|
|
|
@@ -507,14 +531,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
|
|
}
|
|
}
|
|
|
|
- if (!sock->have_event && (sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) {
|
|
- sock->have_event = true;
|
|
- sock->events |= EPOLLIN;
|
|
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
|
|
- sem_post(&sock->weakup->event_sem);
|
|
+ if ((sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) {
|
|
+ add_self_event(sock, EPOLLIN);
|
|
sock->stack->stats.read_events++;
|
|
- }
|
|
- if (!NETCONN_IS_DATAIN(sock)) {
|
|
+ } else {
|
|
sock->events &= ~EPOLLIN;
|
|
}
|
|
|
|
@@ -577,9 +597,14 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s
|
|
conn->recv_cnt = rte_ring_count(netconn->recvmbox->ring);
|
|
|
|
struct lwip_sock *sock = get_socket(netconn->socket);
|
|
- if (sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
|
|
+ if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
|
|
conn->recv_ring_cnt = rte_ring_count(sock->recv_ring);
|
|
conn->send_ring_cnt = rte_ring_count(sock->send_ring);
|
|
+ struct weakup_poll *weakup = sock->weakup;
|
|
+ if (weakup) {
|
|
+ conn->event_ring_cnt = rte_ring_count(weakup->event_ring);
|
|
+ conn->self_ring_cnt = rte_ring_count(weakup->self_ring);
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
@@ -696,10 +721,8 @@ void get_lwip_connnum(struct rpc_msg *msg)
|
|
msg->result = conn_num;
|
|
}
|
|
|
|
-void stack_recvlist_count(struct rpc_msg *msg)
|
|
+static uint32_t get_list_count(struct list_node *list)
|
|
{
|
|
- struct protocol_stack *stack = get_protocol_stack();
|
|
- struct list_node *list = &(stack->recv_list);
|
|
struct list_node *node, *temp;
|
|
uint32_t count = 0;
|
|
|
|
@@ -707,5 +730,20 @@ void stack_recvlist_count(struct rpc_msg *msg)
|
|
count++;
|
|
}
|
|
|
|
- msg->result = count;
|
|
+ return count;
|
|
+}
|
|
+
|
|
+void stack_wakeuplist_count(struct rpc_msg *msg)
|
|
+{
|
|
+ msg->result = get_list_count(get_protocol_stack()->wakeup_list);
|
|
+}
|
|
+
|
|
+void stack_eventlist_count(struct rpc_msg *msg)
|
|
+{
|
|
+ msg->result = get_list_count(&get_protocol_stack()->event_list);
|
|
+}
|
|
+
|
|
+void stack_recvlist_count(struct rpc_msg *msg)
|
|
+{
|
|
+ msg->result = get_list_count(&get_protocol_stack()->recv_list);
|
|
}
|
|
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
|
|
index c88f902..45649fe 100644
|
|
--- a/src/lstack/core/lstack_protocol_stack.c
|
|
+++ b/src/lstack/core/lstack_protocol_stack.c
|
|
@@ -193,6 +193,7 @@ int32_t init_protocol_stack(void)
|
|
|
|
init_list_node(&stack->recv_list);
|
|
init_list_node(&stack->listen_list);
|
|
+ init_list_node(&stack->event_list);
|
|
|
|
stack_group->stacks[i] = stack;
|
|
}
|
|
@@ -261,8 +262,14 @@ static void* gazelle_weakup_thread(void *arg)
|
|
thread_affinity_init(lcore_id);
|
|
LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
|
|
|
|
+ struct list_node wakeup_list;
|
|
+ init_list_node(&wakeup_list);
|
|
+ stack->wakeup_list = &wakeup_list;
|
|
+
|
|
for (;;) {
|
|
- weakup_thread(stack->weakup_ring);
|
|
+ wakeup_list_sock(&wakeup_list);
|
|
+
|
|
+ weakup_thread(stack->weakup_ring, &wakeup_list);
|
|
}
|
|
|
|
return NULL;
|
|
@@ -307,6 +314,24 @@ static void stack_thread_init(struct protocol_stack *stack)
|
|
LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
|
|
}
|
|
|
|
+static void report_stack_event(struct protocol_stack *stack)
|
|
+{
|
|
+ struct list_node *list = &(stack->event_list);
|
|
+ struct list_node *node, *temp;
|
|
+ struct lwip_sock *sock;
|
|
+
|
|
+ list_for_each_safe(node, temp, list) {
|
|
+ sock = container_of(node, struct lwip_sock, event_list);
|
|
+
|
|
+ if (weakup_enqueue(stack->weakup_ring, sock) == 0) {
|
|
+ list_del_node_init(&sock->event_list);
|
|
+ stack->stats.weakup_events++;
|
|
+ } else {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
static void* gazelle_stack_thread(void *arg)
|
|
{
|
|
struct protocol_stack *stack = (struct protocol_stack *)arg;
|
|
@@ -321,6 +346,8 @@ static void* gazelle_stack_thread(void *arg)
|
|
read_recv_list();
|
|
|
|
sys_timer_run();
|
|
+
|
|
+ report_stack_event(stack);
|
|
}
|
|
|
|
return NULL;
|
|
@@ -737,11 +764,8 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add
|
|
}
|
|
|
|
struct lwip_sock *sock = get_socket(head_fd);
|
|
- if (!sock->have_event && have_accept_event(head_fd)) {
|
|
- sock->have_event = true;
|
|
- sock->events |= EPOLLIN;
|
|
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
|
|
- sem_post(&sock->weakup->event_sem);
|
|
+ if (have_accept_event(head_fd)) {
|
|
+ add_self_event(sock, EPOLLIN);
|
|
sock->stack->stats.accept_events++;
|
|
}
|
|
|
|
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
|
|
index 41fe9bf..b7b94e2 100644
|
|
--- a/src/lstack/core/lstack_stack_stat.c
|
|
+++ b/src/lstack/core/lstack_stack_stat.c
|
|
@@ -109,6 +109,10 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
|
|
dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring);
|
|
dfx->data.pkts.call_msg_cnt = rpc_call_msgcnt(stack);
|
|
dfx->data.pkts.recv_list = rpc_call_recvlistcnt(stack);
|
|
+ dfx->data.pkts.event_list = rpc_call_eventlistcnt(stack);
|
|
+ if (stack->wakeup_list) {
|
|
+ dfx->data.pkts.wakeup_list = rpc_call_eventlistcnt(stack);
|
|
+ }
|
|
dfx->data.pkts.conn_num = stack->conn_num;
|
|
}
|
|
|
|
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
|
|
index b3665a7..2fb24b4 100644
|
|
--- a/src/lstack/core/lstack_thread_rpc.c
|
|
+++ b/src/lstack/core/lstack_thread_rpc.c
|
|
@@ -177,6 +177,26 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
|
|
return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
|
|
}
|
|
|
|
+int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack)
|
|
+{
|
|
+ struct rpc_msg *msg = rpc_msg_alloc(stack, stack_wakeuplist_count);
|
|
+ if (msg == NULL) {
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
|
|
+}
|
|
+
|
|
+int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
|
|
+{
|
|
+ struct rpc_msg *msg = rpc_msg_alloc(stack, stack_eventlist_count);
|
|
+ if (msg == NULL) {
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
|
|
+}
|
|
+
|
|
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
|
|
{
|
|
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count);
|
|
@@ -187,6 +207,28 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
|
|
return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
|
|
}
|
|
|
|
+void add_epoll_event(struct netconn *conn, uint32_t event);
|
|
+static void rpc_add_event(struct rpc_msg *msg)
|
|
+{
|
|
+ struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
|
|
+ if (sock->conn) {
|
|
+ add_epoll_event(sock->conn, sock->events);
|
|
+ }
|
|
+}
|
|
+
|
|
+void rpc_call_addevent(struct protocol_stack *stack, void *sock)
|
|
+{
|
|
+ struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_add_event);
|
|
+ if (msg == NULL) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ msg->args[MSG_ARG_0].p = sock;
|
|
+
|
|
+ msg->self_release = 0;
|
|
+ rpc_call(&stack->rpc_queue, msg);
|
|
+}
|
|
+
|
|
static void rpc_replenish_idlembuf(struct rpc_msg *msg)
|
|
{
|
|
struct protocol_stack *stack = get_protocol_stack();
|
|
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
|
|
index 581b9fe..87442cd 100644
|
|
--- a/src/lstack/include/lstack_lwip.h
|
|
+++ b/src/lstack/include/lstack_lwip.h
|
|
@@ -33,6 +33,8 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
|
|
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags);
|
|
void read_recv_list(void);
|
|
void add_recv_list(int32_t fd);
|
|
+void stack_eventlist_count(struct rpc_msg *msg);
|
|
+void stack_wakeuplist_count(struct rpc_msg *msg);
|
|
void get_lwip_conntable(struct rpc_msg *msg);
|
|
void get_lwip_connnum(struct rpc_msg *msg);
|
|
void stack_recvlist_count(struct rpc_msg *msg);
|
|
@@ -42,5 +44,6 @@ void gazelle_free_pbuf(struct pbuf *pbuf);
|
|
ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags);
|
|
ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags);
|
|
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags);
|
|
+void add_self_event(struct lwip_sock *sock, uint32_t events);
|
|
|
|
#endif
|
|
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
|
|
index f289465..dd7633b 100644
|
|
--- a/src/lstack/include/lstack_protocol_stack.h
|
|
+++ b/src/lstack/include/lstack_protocol_stack.h
|
|
@@ -51,6 +51,8 @@ struct protocol_stack {
|
|
|
|
struct list_node recv_list;
|
|
struct list_node listen_list;
|
|
+ struct list_node event_list;
|
|
+ struct list_node *wakeup_list;
|
|
|
|
struct gazelle_stat_pkts stats;
|
|
struct gazelle_stack_latency latency;
|
|
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
|
|
index 1365234..cffb273 100644
|
|
--- a/src/lstack/include/lstack_thread_rpc.h
|
|
+++ b/src/lstack/include/lstack_thread_rpc.h
|
|
@@ -50,9 +50,12 @@ struct rpc_msg {
|
|
struct protocol_stack;
|
|
void poll_rpc_msg(struct protocol_stack *stack);
|
|
void rpc_call_replenish_idlembuf(struct protocol_stack *stack);
|
|
+void rpc_call_addevent(struct protocol_stack *stack, void *sock);
|
|
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
|
|
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
|
|
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
|
|
+int32_t rpc_call_eventlistcnt(struct protocol_stack *stack);
|
|
+int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack);
|
|
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
|
|
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
|
|
int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn);
|
|
diff --git a/src/lstack/include/lstack_weakup.h b/src/lstack/include/lstack_weakup.h
|
|
index f334a0f..8f7fca2 100644
|
|
--- a/src/lstack/include/lstack_weakup.h
|
|
+++ b/src/lstack/include/lstack_weakup.h
|
|
@@ -22,55 +22,93 @@ struct weakup_poll {
|
|
sem_t event_sem;
|
|
struct lwip_sock *sock_list[EPOLL_MAX_EVENTS];
|
|
struct rte_ring *event_ring;
|
|
+ struct rte_ring *self_ring;
|
|
};
|
|
|
|
#define WEAKUP_MAX (32)
|
|
|
|
-static inline __attribute__((always_inline)) void weakup_attach_sock(struct lwip_sock *sock)
|
|
+static inline void wakeup_list_sock(struct list_node *wakeup_list)
|
|
{
|
|
- struct list_node *list = &(sock->attach_list);
|
|
struct list_node *node, *temp;
|
|
- struct lwip_sock *attach_sock;
|
|
- int32_t ret;
|
|
|
|
- list_for_each_safe(node, temp, list) {
|
|
- attach_sock = container_of(node, struct lwip_sock, attach_list);
|
|
- if (attach_sock->weakup == NULL) {
|
|
+ list_for_each_safe(node, temp, wakeup_list) {
|
|
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, wakeup_list);
|
|
+
|
|
+ struct weakup_poll *weakup = sock->weakup;
|
|
+ struct protocol_stack *stack = sock->stack;
|
|
+ if (weakup == NULL || stack == NULL) {
|
|
continue;
|
|
}
|
|
|
|
- ret = rte_ring_mp_enqueue(attach_sock->weakup->event_ring, (void *)attach_sock);
|
|
+ int32_t ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock);
|
|
if (ret == 0) {
|
|
- sem_post(&attach_sock->weakup->event_sem);
|
|
- attach_sock->stack->stats.lwip_events++;
|
|
+ list_del_node_init(&sock->event_list);
|
|
+ sem_post(&weakup->event_sem);
|
|
+ stack->stats.lwip_events++;
|
|
+ } else {
|
|
+ break;
|
|
}
|
|
}
|
|
}
|
|
|
|
-static inline __attribute__((always_inline)) void weakup_thread(struct rte_ring *weakup_ring)
|
|
+static inline int32_t weakup_attach_sock(struct list_node *attach_list)
|
|
+{
|
|
+ struct list_node *node, *temp;
|
|
+ int32_t wakeuped = -1;
|
|
+
|
|
+ list_for_each_safe(node, temp, attach_list) {
|
|
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, attach_list);
|
|
+
|
|
+ struct weakup_poll *weakup = sock->weakup;
|
|
+ struct protocol_stack *stack = sock->stack;
|
|
+ if (weakup == NULL || stack == NULL) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ int32_t ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock);
|
|
+ if (ret == 0) {
|
|
+ sem_post(&weakup->event_sem);
|
|
+ stack->stats.lwip_events++;
|
|
+ wakeuped = 0;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return wakeuped;
|
|
+}
|
|
+
|
|
+static inline void weakup_thread(struct rte_ring *weakup_ring, struct list_node *wakeup_list)
|
|
{
|
|
struct lwip_sock *sock;
|
|
- int32_t ret;
|
|
|
|
for (uint32_t i = 0; i < WEAKUP_MAX; ++i) {
|
|
- ret = rte_ring_sc_dequeue(weakup_ring, (void **)&sock);
|
|
+ int32_t ret = rte_ring_sc_dequeue(weakup_ring, (void **)&sock);
|
|
if (ret != 0) {
|
|
break;
|
|
}
|
|
|
|
- ret = rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
|
|
+ struct weakup_poll *weakup = sock->weakup;
|
|
+ struct protocol_stack *stack = sock->stack;
|
|
+ if (weakup == NULL || stack == NULL) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock);
|
|
if (ret == 0) {
|
|
- sem_post(&sock->weakup->event_sem);
|
|
- sock->stack->stats.lwip_events++;
|
|
+ sem_post(&weakup->event_sem);
|
|
+ stack->stats.lwip_events++;
|
|
}
|
|
|
|
/* listen notice attach sock */
|
|
+ int32_t wakeuped = -1;
|
|
if (!list_is_empty(&sock->attach_list)) {
|
|
- weakup_attach_sock(sock);
|
|
+ wakeuped = weakup_attach_sock(&sock->attach_list);
|
|
}
|
|
|
|
- /* event_ring of attach sock may have idle elem */
|
|
- if (ret != 0) {
|
|
+ /* notice any epoll enough */
|
|
+ if (ret != 0 && wakeuped != 0) {
|
|
+ if (list_is_empty(&sock->wakeup_list)) {
|
|
+ list_add_node(wakeup_list, &sock->wakeup_list);
|
|
+ }
|
|
break;
|
|
}
|
|
}
|
|
@@ -79,13 +117,7 @@ static inline __attribute__((always_inline)) void weakup_thread(struct rte_ring
|
|
static inline __attribute__((always_inline))
|
|
int weakup_enqueue(struct rte_ring *weakup_ring, struct lwip_sock *sock)
|
|
{
|
|
- int ret = rte_ring_sp_enqueue(weakup_ring, (void *)sock);
|
|
- if (ret < 0) {
|
|
- LSTACK_LOG(ERR, LSTACK, "tid %d, failed\n", gettid());
|
|
- return -1;
|
|
- }
|
|
-
|
|
- return 0;
|
|
+ return rte_ring_sp_enqueue(weakup_ring, (void *)sock);
|
|
}
|
|
|
|
#endif
|
|
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
|
|
index 451f527..66d6053 100644
|
|
--- a/src/ltran/ltran_dfx.c
|
|
+++ b/src/ltran/ltran_dfx.c
|
|
@@ -567,14 +567,20 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
|
|
printf("weakup_events: %-14"PRIu64" ", lstack_stat->data.pkts.weakup_events);
|
|
printf("lwip_events: %-16"PRIu64" ", lstack_stat->data.pkts.lwip_events);
|
|
printf("app_events: %-17"PRIu64"\n", lstack_stat->data.pkts.app_events);
|
|
+ printf("epoll_pending: %-14"PRIu64" ", lstack_stat->data.pkts.epoll_pending);
|
|
+ printf("epoll_self_event: %-11"PRIu64" ", lstack_stat->data.pkts.epoll_self_event);
|
|
+ printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event);
|
|
printf("read_events: %-16"PRIu64" ", lstack_stat->data.pkts.read_events);
|
|
printf("write_events: %-15"PRIu64" ", lstack_stat->data.pkts.write_events);
|
|
printf("accept_events: %-14"PRIu64" \n", lstack_stat->data.pkts.accept_events);
|
|
- printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
|
|
printf("read_null: %-18"PRIu64" ", lstack_stat->data.pkts.read_null);
|
|
- printf("call_alloc_fail: %-12"PRIu64" \n", lstack_stat->data.pkts.call_alloc_fail);
|
|
- printf("remove_event: %-15"PRIu64" ", lstack_stat->data.pkts.remove_event);
|
|
+ printf("wakeup_list: %-16"PRIu64" ", lstack_stat->data.pkts.wakeup_list);
|
|
+ printf("event_list: %-17"PRIu64" \n", lstack_stat->data.pkts.event_list);
|
|
printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc);
|
|
+ printf("epoll_pending_call: %-9"PRIu64" ", lstack_stat->data.pkts.epoll_pending_call);
|
|
+ printf("epoll_self_call: %-12"PRIu64" \n", lstack_stat->data.pkts.epoll_self_call);
|
|
+ printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
|
|
+ printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail);
|
|
printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null);
|
|
}
|
|
|
|
@@ -866,8 +872,7 @@ static void gazelle_print_lstack_stat_snmp(void *buf, const struct gazelle_stat_
|
|
|
|
static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_msg_request *req_msg)
|
|
{
|
|
- int32_t ret;
|
|
- uint32_t i, unread_pkts;
|
|
+ uint32_t i;
|
|
struct in_addr rip;
|
|
struct in_addr lip;
|
|
char str_ip[GAZELLE_SUBNET_LENGTH_MAX] = {0};
|
|
@@ -878,30 +883,33 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
|
|
printf("Active Internet connections (servers and established)\n");
|
|
do {
|
|
printf("\n------ stack tid: %6u ------\n", stat->tid);
|
|
- printf("No. Proto recv_cnt recv_ring in_send send_ring Local Address"
|
|
+ printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address"
|
|
" Foreign Address State\n");
|
|
- unread_pkts = 0;
|
|
+ uint32_t unread_pkts = 0;
|
|
+ uint32_t unsend_pkts = 0;
|
|
for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) {
|
|
struct gazelle_stat_lstack_conn_info *conn_info = &conn->conn_list[i];
|
|
|
|
rip.s_addr = conn_info->rip;
|
|
lip.s_addr = conn_info->lip;
|
|
if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) {
|
|
- printf("%-6utcp %-10u%-11u%-9u%-11u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
|
|
- conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt,
|
|
- inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, inet_ntop(AF_INET, &rip,
|
|
- str_rip, sizeof(str_rip)), conn_info->r_port, tcp_state_to_str(conn_info->tcp_sub_state));
|
|
+ printf("%-6utcp %-10u%-11u%-9u%-11u%-7u%-12u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
|
|
+ conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->event_ring_cnt,
|
|
+ conn_info->self_ring_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
|
|
+ inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port,
|
|
+ tcp_state_to_str(conn_info->tcp_sub_state));
|
|
} else if (conn_info->state == GAZELLE_LISTEN_LIST) {
|
|
- printf("%-6utcp %-41u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
|
|
+ printf("%-6utcp %-60u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
|
|
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
|
|
} else {
|
|
printf("Got unknow tcp conn::%s:%5hu, state:%u\n",
|
|
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, conn_info->state);
|
|
}
|
|
unread_pkts += conn_info->recv_ring_cnt;
|
|
+ unsend_pkts += conn_info->send_ring_cnt;
|
|
}
|
|
if (conn->conn_num > 0) {
|
|
- printf("Total unread pkts: %u \n", unread_pkts);
|
|
+ printf("Total unread pkts:%u unsend pkts:%u\n", unread_pkts, unsend_pkts);
|
|
}
|
|
|
|
if (i < conn->total_conn_num) {
|
|
@@ -912,7 +920,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
|
|
if (stat->eof != 0) {
|
|
break;
|
|
}
|
|
- ret = dfx_stat_read_from_ltran(buf, sizeof(struct gazelle_stack_dfx_data), req_msg->stat_mode);
|
|
+ int32_t ret = dfx_stat_read_from_ltran(buf, sizeof(struct gazelle_stack_dfx_data), req_msg->stat_mode);
|
|
if (ret != GAZELLE_OK) {
|
|
return;
|
|
}
|
|
--
|
|
1.8.3.1
|
|
|