From 795c15182d2dbeeb34982f274185b20920be195f Mon Sep 17 00:00:00 2001 From: jiangheng Date: Mon, 5 Sep 2022 09:49:10 +0800 Subject: [PATCH] rpc pool use dpdk mempool replace array --- src/lstack/core/lstack_dpdk.c | 20 ++++++++++++++++++++ src/lstack/core/lstack_thread_rpc.c | 22 +++++++++++++++------- src/lstack/include/lstack_dpdk.h | 2 ++ src/lstack/include/lstack_thread_rpc.h | 7 +++---- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index 169025c..1dab9a3 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -217,6 +217,26 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num) return 0; } +struct rte_mempool *create_mempool(const char *name, uint32_t count, uint32_t size, + uint32_t flags, int32_t idx) +{ + char pool_name [RTE_MEMPOOL_NAMESIZE]; + struct rte_mempool *mempool; + int32_t ret = snprintf_s(pool_name, sizeof(pool_name), RTE_MEMPOOL_NAMESIZE - 1, + "%s_%d", name, idx); + if (ret < 0) { + return NULL; + } + + mempool = rte_mempool_create(pool_name, count, size, + 0, 0, NULL, NULL, NULL, NULL, rte_socket_id(), flags); + if (mempool == NULL) { + LSTACK_LOG(ERR, LSTACK, "%s create failed. errno: %d.\n", name, rte_errno); + } + + return mempool; +} + struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id) { char ring_name[RTE_RING_NAMESIZE] = {0}; diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index fe3b757..1234bc6 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -27,15 +27,15 @@ static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL; static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool) { - uint32_t cons = __atomic_load_n(&rpc_pool->cons, __ATOMIC_ACQUIRE); - uint32_t prod = rpc_pool->prod + 1; - - if (prod - cons >= RPC_MSG_MAX) { + int ret; + struct rpc_msg *msg = NULL; + ret = rte_mempool_get(rpc_pool->rpc_pool, (void **)&msg); + if (ret < 0) { + LSTACK_LOG(INFO, LSTACK, "rpc pool empty!\n"); + errno = ENOMEM; return NULL; } - - rpc_pool->prod = prod; - return &rpc_pool->msgs[prod & RPC_MSG_MASK]; + return msg; } static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) @@ -49,6 +49,14 @@ static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func if (g_rpc_pool == NULL) { g_rpc_pool = calloc(1, sizeof(struct rpc_msg_pool)); if (g_rpc_pool == NULL) { + LSTACK_LOG(INFO, LSTACK, "g_rpc_pool calloc failed\n"); + get_protocol_stack_group()->call_alloc_fail++; + return NULL; + } + + g_rpc_pool->rpc_pool = create_mempool("rpc_pool", RPC_MSG_MAX, sizeof(struct rpc_msg), + 0, rte_gettid()); + if (g_rpc_pool->rpc_pool == NULL) { get_protocol_stack_group()->call_alloc_fail++; return NULL; } diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h index a189557..c233120 100644 --- a/src/lstack/include/lstack_dpdk.h +++ b/src/lstack/include/lstack_dpdk.h @@ -44,6 +44,8 @@ int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, ui int32_t dpdk_eal_init(void); int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num); struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id); +struct rte_mempool *create_mempool(const char *name, uint32_t count, uint32_t size, + uint32_t flags, int32_t idx); int32_t create_shared_ring(struct protocol_stack *stack); void lstack_log_level_init(void); int dpdk_ethdev_init(int port_id, bool bond_port); diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index 657ffa9..bcb40dd 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -15,6 +15,7 @@ #include #include +#include #include "lstack_lockless_queue.h" @@ -53,9 +54,7 @@ struct rpc_msg { }; struct rpc_msg_pool { - struct rpc_msg msgs[RPC_MSG_MAX]; - uint32_t prod __rte_cache_aligned; - uint32_t cons __rte_cache_aligned; + struct rte_mempool *rpc_pool; }; struct protocol_stack; @@ -99,7 +98,7 @@ static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *m msg->self_release = 0; - __atomic_fetch_add((_Atomic uint32_t *)&msg->pool->cons, 1, __ATOMIC_SEQ_CST); + rte_mempool_put(msg->pool->rpc_pool, (void *)msg); } #endif -- 2.27.0