gazelle/0084-add-examples.patch
wu-changsheng 076a4017d3 backport bugifx and doc
(cherry picked from commit e7c415a3eb5695fd1f2c2baadc77f4480765375c)
2022-09-05 16:26:12 +08:00

2669 lines
105 KiB
Diff
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 814b66143605ad409be0f8aace468386f4fd891e Mon Sep 17 00:00:00 2001
From: wu-changsheng <wuchangsheng2@huawei.com>
Date: Mon, 5 Sep 2022 15:39:12 +0800
Subject: [PATCH] sync examples code
---
examples/CMakeLists.txt | 2 +-
examples/README.md | 178 +++++++++++-
examples/inc/bussiness.h | 122 +++++++++
examples/inc/client.h | 121 ++++++++
examples/inc/parameter.h | 11 +-
examples/inc/server.h | 231 ++++++++++++++++
examples/inc/utilities.h | 82 +++++-
examples/main.c | 8 +
examples/src/bussiness.c | 234 ++++++++++++++++
examples/src/client.c | 387 ++++++++++++++++++++++++++
examples/src/parameter.c | 214 ++++++++-------
examples/src/server.c | 578 +++++++++++++++++++++++++++++++++++++++
examples/src/utilities.c | 128 +++++++++
13 files changed, 2185 insertions(+), 111 deletions(-)
create mode 100644 examples/inc/bussiness.h
create mode 100644 examples/inc/client.h
create mode 100644 examples/inc/server.h
create mode 100644 examples/src/bussiness.c
create mode 100644 examples/src/client.c
create mode 100644 examples/src/server.c
create mode 100644 examples/src/utilities.c
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index b1c2b07..c38e6cb 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -18,7 +18,7 @@ project(${PROJECT_NAME})
message(STATUS "PROJECT_SOURCE_DIR: " ${PROJECT_SOURCE_DIR})
message(STATUS "PROJECT_BINARY_DIR: " ${PROJECT_BINARY_DIR})
-set(CMAKE_C_FLAGS "-O2 -g -fstack-protector-strong -Wall -Werror -pthread")
+set(CMAKE_C_FLAGS "-O2 -g -fstack-protector-strong -Wall -Werror -pthread -lboundscheck")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D EXAMPLE_COMPILE")
include_directories(${PROJECT_SOURCE_DIR}/inc)
diff --git a/examples/README.md b/examples/README.md
index 6f82bb2..4e165a4 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -1,4 +1,6 @@
-# gazzle 示例程序
+# gazzlle 示例程序
+
+## 功能
* 支持 TCP 、 unix 非阻塞通讯。
* 支持多线程网络 IO 复用模型线程之间相互独立。TCP 的 `listen` 、`epoll` 、`read` 、`write` 、`connect` 等接口都在同一线程内。`connect` 连接数可配。
@@ -9,9 +11,93 @@
## 网络模型
* **单线程非阻塞**:采用同步非阻塞 IO 模型,在单线程中采用非阻塞的方式监听并发起 IO 请求,当内核中数据到达后读取数据、执行业务逻辑并发送。
+
+```
+ 单线程非阻塞模型
+ |
+ 创建套接字并监听
+ | <-------+
+ 读取数据 |
+ | |
+ 业务逻辑 |
+ | |
+ 发送数据 |
+ | |
+ +---------+
+```
+
* **多线程非阻塞IO复用**:基于 `epoll` 实现多线程非阻塞 IO 模型。每个线程之间互不干涉。通过 `epoll` 监控多个当前线程负责的 fd 当任何一个数据状态准备就绪时返回并执行读写操作和对应的业务逻辑。
+
+```
+ 多线程非阻塞IO复用模型
+ |
+ 创建套接字并监听
+ |
+ 创建若干个线程
+ |
+ +------------+------------+------------+------------+
+ | | | | |
+ 创建套接字并监听 ... ... 创建套接字并监听 ...
+ | |
+ 线程内部初始化 线程内部初始化
+ epoll并注册 epoll并注册
+ 套接字监听事件 套接字监听事件
+ | <---------+ | <----------+
+ +-----+-----+ | +-----+-----+ |
+ | | | | | |
+ (新连接) (新报文) | (新连接) (新报文) |
+ 建连并注 读取数据 | 建连并注 读取数据 |
+ 册新连接 业务逻辑 | 册新连接 业务逻辑 |
+ 监听事件 发送数据 | 监听事件 发送数据 |
+ | | | | | |
+ +-----+-----+ | +-----+-----+ |
+ | | | |
+ +-----------+ +-----------+
+```
+
* **多线程非阻塞非对称**:采用基于 `epoll` 的单线程多路 IO 复用监听连接事件,并采用多线程的方式完成后续读写监听业务。 server 在启动监听之前,开辟一定数量的线程,用线程池管理。主线程创建监听 `fd` 之后,采用多路 IO 复用机制 (`epoll`) 进行 IO 状态监控。当监听到客户端的连接请求时,建立连接并将相关 `fd` 分发给线程池的某个线程进行监听。线程池中的每个线程都采用多路 IO 复用机制 (`epoll`) ,用来监听主线程中建立成功并分发下来的 `socket` 。
+```
+ 多线程非阻塞非对称模型 +------------------------+
+ | | |
+ 创建监听线程 | +-------------+--- ... -----+
+ | | | | |
+ 创建套接字,初始化 | 初始化epoll ... ... 初始化epoll
+ eopll并且并注册套 | 并注册事件 并注册事件
+ 接字监听事件 | | <-- + | <-- +
+ | | 读取数据 | 读取数据 |
+ 当有新连接时,创建工作线程 业务逻辑 | 业务逻辑 |
+ | | 发送数据 | 发送数据 |
+ +----------------+ | | | |
+ +-----+ +-----+
+```
+
+* **客户端**:创建若干线程,每个线程创建若干 `socket` 与客户端建立连接,并使用 `epoll` 进行状态监控,建连后向服务端发送数据并等待服务端数据传回,当接受到服务端传回数据后进行校验,校验无误再次发送数据。
+
+```
+ 多线程非阻塞IO复用模型
+ |
+ 创建若干个线程
+ +------------+------------+------------+------------+
+ | | | | |
+ 线程内部初始化 线程内部初始化
+ epoll ... ... epoll ...
+ | |
+ 依次创建套接字, 依次创建套接字,
+ 建连并注册事件 建连并注册事件
+ | <---------+ | <---------+
+ 发送数据 | 发送数据 |
+ 接收数据并校验 | 接收数据并校验 |
+ | | | |
+ +------------+ | +------------+ |
+ | | | | | |
+ 成功 失败 | 成功 失败 |
+ | | | | | |
+ 发送数据 终止 | 发送数据 终止 |
+ | | | |
+ +-----------------+ +-----------------+
+```
+
## 程序接口
* `-a, --as [server | client]`:作为服务端还是客户端。
@@ -24,21 +110,103 @@
* `mud (multi thread, unblock, dissymmetric)`:多线程非阻塞非对称。
* `-t, --threadnum`:线程数设置。
* `-c, --connectnum`:连接数设置。
-* `-A, --api [unix | posix]`:内部实现的接口类型。
- * `unix`:基于 unix 接口实现。
- * `posix`:基于 posix 接口实现。
+* `-D, --domain [unix | posix]`:通信协议。
+ * `unix`:基于 unix 协议实现。
+ * `posix`:基于 posix 协议实现。
+* `-A, --api [readwrite | recvsend | recvsendmsg]`:内部实现的接口类型。
+ * `readwrite` :使用 `read` 和 `write` 接口。
+ * `recvsend` :使用 `recv` 和 `send` 接口。
+ * `recvsendmsg` :使用 `recvmsg` 和 `sendmsg` 接口。
* `-P, --pktlen [xxxx]`:报文长度配置。
* `-v, --verify`:是否校验报文。
* `-r, --ringpmd`是否基于dpdk ring PMD 收发环回。
+* `-d, --debug`:是否打印调试信息。
* `-h, --help`:获得帮助信息。
## 使用
+ * **环境配置**
+ * 参考 https://gitee.com/openeuler/libboundscheck 。
+
+ * **编译**
+
```
cd build
mkdir examples
cd examples
cmake ../../examples
make
-./examples --help
```
+
+ * **查看帮助信息**
+
+ ```
+ ./examples --help
+
+ -a, --as [server | client]: set programas server or client.
+ server: as server.
+ client: as client.
+-i, --ip [???.???.???.???]: set ip address.
+-p, --port [????]: set port number in range of 1024 - 65535.
+-m, --model [mum | mud]: set the network model.
+ mum: multi thread, unblock, multiplexing IO network model.
+ mud: multi thread, unblock, dissymmetric network model.
+-t, --threadnum [???]: set thread number in range of 1 - 1000.
+-c, --connectnum [???]: set connection number of each thread.
+-D, --domain [unix | posix]: set domain type is server or client.
+ unix: use unix's api.
+ posix: use posix api.
+-A, --api [readwrite | recvsend | recvsendmsg]: set api type is server or client.
+ readwrite: use `read` and `write`.
+ recvsend: use `recv and `send`.
+ recvsendmsg: use `recvmsg` and `sendmsg`.
+-P, --pktlen [????]: set packet length in range of 2 - 10485760.
+-v, --verify: set to verifying the message packet.
+-r, --ringpmd: set to use ringpmd.
+-d, --debug: set to print the debug information.
+-h, --help: see helps.
+ ```
+
+ * 创建服务端
+
+```
+./example --as server --verify
+
+[program parameters]:
+--> [as]: server
+--> [server ip]: 127.0.0.1
+--> [server port]: 5050
+--> [model]: mum
+--> [thread number]: 8
+--> [domain]: posix
+--> [api]: read & write
+--> [packet length]: 1024
+--> [verify]: on
+--> [ringpmd]: off
+--> [debug]: off
+
+[program informations]:
+--> <server>: [connect num]: 0, [receive]: 0.000 B/s
+```
+
+ * 创建客户端
+
+```
+./example --as client --verify
+
+[program parameters]:
+--> [as]: client
+--> [server ip]: 127.0.0.1
+--> [server port]: 5050
+--> [thread number]: 8
+--> [connection number]: 10
+--> [domain]: posix
+--> [api]: read & write
+--> [packet length]: 1024
+--> [verify]: on
+--> [ringpmd]: off
+--> [debug]: off
+
+[program informations]:
+--> <client>: [connect num]: 80, [send]: 357.959 MB/s
+```
\ No newline at end of file
diff --git a/examples/inc/bussiness.h b/examples/inc/bussiness.h
new file mode 100644
index 0000000..f16d771
--- /dev/null
+++ b/examples/inc/bussiness.h
@@ -0,0 +1,122 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#ifndef __EXAMPLES_BUSSINESS_H__
+#define __EXAMPLES_BUSSINESS_H__
+
+
+#include "utilities.h"
+#include "parameter.h"
+
+
+#define BUSSINESS_MESSAGE_SIZE 26 ///< the size of business message
+
+
+/**
+ * @brief server handler
+ * The server handler.
+ */
+struct ServerHandler
+{
+ int32_t fd; ///< socket file descriptor
+};
+
+/**
+ * @brief client handler
+ * The client handler.
+ */
+struct ClientHandler
+{
+ int32_t fd; ///< socket file descriptor
+ uint32_t msg_idx; ///< the start charactors index of message
+};
+
+
+/**
+ * @brief read by specify api
+ * This function processes the reading by specify api.
+ * @param fd the file descriptor
+ * @param buffer_in the input buffer
+ * @param length the length
+ * @param api the type of api
+ * @return the result
+ */
+ int32_t read_api(int32_t fd, char *buffer_in, const uint32_t length, const char *api);
+
+/**
+ * @brief write by specify api
+ * This function processes the writing by specify api.
+ * @param fd the file descriptor
+ * @param buffer_out the output buffer
+ * @param length the length
+ * @param api the type of api
+ * @return the result
+ */
+ int32_t write_api(int32_t fd, char *buffer_out, const uint32_t length, const char *api);
+
+/**
+ * @brief the business processsing of server
+ * This function processes the business of server.
+ * @param out the output string
+ * @param in the input string
+ * @param size the size of input and output
+ * @param verify if we verify or not
+ * @return the result
+ */
+void server_bussiness(char *out, const char *in, uint32_t size);
+
+/**
+ * @brief the business processsing of client
+ * This function processes the business of client.
+ * @param out the output string
+ * @param in the input string
+ * @param size the size of input and output
+ * @param verify if we verify or not
+ * @param msg_idx the start charactors index of message
+ * @return the result
+ */
+int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, uint32_t *msg_idx);
+
+/**
+ * @brief server checks the information and answers
+ * This function checks the information and answers.
+ * @param server_handler server handler
+ * @param pktlen the length of package
+ * @param api the api
+ * @return the result
+ */
+int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api);
+
+/**
+ * @brief client asks server
+ * This function asks server.
+ * @param client_handler client handler
+ * @param pktlen the length of package
+ * @param api the api
+ * @return the result
+ */
+int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api);
+
+/**
+ * @brief client checks the information and answers
+ * This function checks the information and answers.
+ * @param client_handler client handler
+ * @param pktlen the length of package
+ * @param verify verify or not
+ * @param api the api
+ * @return the result
+ */
+int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api);
+
+
+#endif // __EXAMPLES_BUSSINESS_H__
diff --git a/examples/inc/client.h b/examples/inc/client.h
new file mode 100644
index 0000000..d3ae017
--- /dev/null
+++ b/examples/inc/client.h
@@ -0,0 +1,121 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#ifndef __EXAMPLES_CLIENT_H__
+#define __EXAMPLES_CLIENT_H__
+
+
+#include "utilities.h"
+#include "parameter.h"
+#include "bussiness.h"
+
+
+/**
+ * @brief client unit
+ * The information of each thread of client.
+ */
+struct ClientUnit
+{
+ struct ClientHandler *handlers; ///< the handlers
+ int32_t epfd; ///< the connect epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint32_t curr_connect; ///< current connection number
+ uint64_t send_bytes; ///< total send bytes
+ in_addr_t ip; ///< server ip
+ uint16_t port; ///< server port
+ uint32_t connect_num; ///< total connection number
+ uint32_t pktlen; ///< the length of peckage
+ bool verify; ///< if we verify or not
+ char* domain; ///< the communication domain
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+ struct ClientUnit *next; ///< next pointer
+};
+
+/**
+ * @brief client
+ * The information of client.
+ */
+struct Client
+{
+ struct ClientUnit *uints; ///< the server mum unit
+ bool debug; ///< if we print the debug information
+};
+
+
+/**
+ * @brief the single thread, client prints informations
+ * The single thread, client prints informations.
+ * @param ch_str the charactor string
+ * @param act_str the action string
+ * @param ip the ip address
+ * @param port the port
+ * @param debug if debug or not
+ * @return the result pointer
+ */
+void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug);
+
+/**
+ * @brief the client prints informations
+ * The client prints informations.
+ * @param client the client information
+ */
+void client_info_print(struct Client *client);
+
+/**
+ * @brief the single thread, client try to connect to server, register to epoll
+ * The single thread, client try to connect to server, register to epoll.
+ * @param client_handler the client handler
+ * @param epoll_fd the epoll file descriptor
+ * @param ip ip address
+ * @param port port
+ * @param domain domain
+ * @return the result pointer
+ */
+int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, uint16_t port, const char *api);
+
+/**
+ * @brief the single thread, client retry to connect to server, register to epoll
+ * The single thread, client retry to connect to server, register to epoll.
+ * @param client_unit the client unit
+ * @param client_handler the client handler
+ * @return the result pointer
+ */
+int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct ClientHandler *client_handler);
+
+/**
+ * @brief the single thread, client connects and gets epoll feature descriptors
+ * The single thread, client connects and gets epoll feature descriptors.
+ * @param client_unit the client unit
+ * @return the result pointer
+ */
+int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit);
+
+/**
+ * @brief create client of single thread and run
+ * This function creates client of single thread and run.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *client_s_create_and_run(void *arg);
+
+/**
+ * @brief create client and run
+ * This function create the client and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t client_create_and_run(struct ProgramParams *params);
+
+
+#endif // __EXAMPLES_CLIENT_H__
diff --git a/examples/inc/parameter.h b/examples/inc/parameter.h
index d25a13a..ee8fe4e 100644
--- a/examples/inc/parameter.h
+++ b/examples/inc/parameter.h
@@ -24,9 +24,11 @@
#define PARAM_DEFAULT_MODEL ("mum") ///< default model type
#define PARAM_DEFAULT_CONNECT_NUM (10) ///< default connection number
#define PARAM_DEFAULT_THREAD_NUM (8) ///< default thread number
-#define PARAM_DEFAULT_API ("posix") ///< default API type
+#define PARAM_DEFAULT_DOMAIN ("posix") ///< default communication domain
+#define PARAM_DEFAULT_API ("readwrite") ///< default API type
#define PARAM_DEFAULT_PKTLEN (1024) ///< default packet length of message
#define PARAM_DEFAULT_VERIFY (false) ///< default flag of message verifying
+#define PARAM_DEFAULT_DEBUG (false) ///< default flag of debug
#define PARAM_DEFAULT_RINGPMD (false) ///< default flag of ring PMD of dpdk
enum {
@@ -42,6 +44,8 @@ enum {
PARAM_NUM_CONNECT_NUM = 'c',
#define PARAM_NAME_THREAD_NUM ("threadnum") ///< name of parameter thread number
PARAM_NUM_THREAD_NUM = 't',
+#define PARAM_NAME_DOMAIN ("domain") ///< name of parameter domain
+ PARAM_NUM_DOMAIN = 'D',
#define PARAM_NAME_API ("api") ///< name of parameter API type
PARAM_NUM_API = 'A',
#define PARAM_NAME_PKTLEN ("pktlen") ///< name of parameter packet length of message
@@ -50,6 +54,8 @@ enum {
PARAM_NUM_VERIFY = 'v',
#define PARAM_NAME_RINGPMD ("ringpmd") ///< name of parameter flag of ring PMD of dpdk
PARAM_NUM_RINGPMD = 'r',
+#define PARAM_NAME_DEBUG ("debug") ///< name of parameter flag of debug
+ PARAM_NUM_DEBUG = 'd',
#define PARAM_NAME_HELP ("help") ///< name of parameter help
PARAM_NUM_HELP = 'h',
};
@@ -81,13 +87,14 @@ struct ProgramParams {
char* model; ///< model type
uint32_t thread_num; ///< the number of threads
uint32_t connect_num; ///< the connection number
+ char* domain; ///< the communication dimain
char* api; ///< the type of api
uint32_t pktlen; ///< the packet length
bool verify; ///< if we verify the message or not
+ bool debug; ///< if we print the debug information or not
bool ringpmd; ///< if we use ring PMD or not
};
-
/**
* @brief initialize the parameters
* This function initializes the parameters of main function.
diff --git a/examples/inc/server.h b/examples/inc/server.h
new file mode 100644
index 0000000..fa9096b
--- /dev/null
+++ b/examples/inc/server.h
@@ -0,0 +1,231 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#ifndef __EXAMPLES_SERVER_H__
+#define __EXAMPLES_SERVER_H__
+
+
+#include "utilities.h"
+#include "parameter.h"
+#include "bussiness.h"
+
+
+/**
+ * @brief server unit of model mum
+ * The information of each thread of server of model mum.
+ */
+struct ServerMumUnit
+{
+ struct ServerHandler listener; ///< the listen handler
+ int32_t epfd; ///< the listen epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint32_t curr_connect; ///< current connection number
+ uint64_t recv_bytes; ///< total receive bytes
+ in_addr_t ip; ///< server ip
+ uint16_t port; ///< server port
+ uint32_t pktlen; ///< the length of peckage
+ char* domain; ///< communication domain
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+ struct ServerMumUnit *next; ///< next pointer
+};
+
+/**
+ * @brief server model mum
+ * The information of server model mum.
+ */
+struct ServerMum
+{
+ struct ServerMumUnit *uints; ///< the server mum unit
+ bool debug; ///< if we print the debug information
+};
+
+/**
+ * @brief server unit of model mud worker unit
+ * The information of worker unit of server of model mud.
+ */
+struct ServerMudWorker
+{
+ struct ServerHandler worker; ///< the worker handler
+ int32_t epfd; ///< the worker epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint64_t recv_bytes; ///< total receive bytes
+ uint32_t pktlen; ///< the length of peckage
+ in_addr_t ip; ///< client ip
+ uint16_t port; ///< client port
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+ struct ServerMudWorker *next; ///< next pointer
+};
+
+/**
+ * @brief server model mud
+ * The information of server model mud.
+ */
+struct ServerMud
+{
+ struct ServerHandler listener; ///< the listen handler
+ struct ServerMudWorker *workers; ///< the workers
+ int32_t epfd; ///< the listen epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint32_t curr_connect; ///< current connection number
+ in_addr_t ip; ///< server ip
+ uint16_t port; ///< server port
+ uint32_t pktlen; ///< the length of peckage
+ char* domain; ///< communication domain
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+};
+
+
+/**
+ * @brief the worker thread, unblock, dissymmetric server prints debug informations
+ * The worker thread, unblock, dissymmetric server prints debug informations.
+ * @param ch_str the charactor string
+ * @param act_str the action string
+ * @param ip the ip address
+ * @param port the port
+ * @param debug if debug or not
+ * @return the result pointer
+ */
+void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug);
+
+/**
+ * @brief the multi thread, unblock, dissymmetric server prints informations
+ * The multi thread, unblock, dissymmetric server prints informations.
+ * @param server_mud the server information
+ */
+void sermud_info_print(struct ServerMud *server_mud);
+
+/**
+ * @brief the worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+ * The worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors.
+ * @param worker_unit the server worker
+ * @return the result pointer
+ */
+int32_t sermud_worker_create_epfd_and_reg(struct ServerMudWorker *worker_unit);
+
+/**
+ * @brief the listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+ * The listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors.
+ * @param server_mud the server unit
+ * @return the result pointer
+ */
+int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud);
+
+/**
+ * @brief the listener thread, unblock, dissymmetric server accepts the connections
+ * The listener thread, unblock, dissymmetric server accepts the connections.
+ * @param server_mud the server unit
+ * @return the result pointer
+ */
+int32_t sermud_listener_accept_connects(struct ServerMud *server_mud);
+
+/**
+ * @brief the worker thread, unblock, dissymmetric server processes the events
+ * The worker thread, unblock, dissymmetric server processes the events.
+ * @param worker_unit the server worker
+ * @return the result pointer
+ */
+int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit);
+
+/**
+ * @brief the listener thread, unblock, dissymmetric server processes the events
+ * The listener thread, unblock, dissymmetric server processes the events.
+ * @param server_mud the server unit
+ * @return the result pointer
+ */
+int32_t sermud_listener_proc_epevs(struct ServerMud *server_mud);
+
+/**
+ * @brief create the worker thread, unblock, dissymmetric server and run
+ * This function creates the worker thread, unblock, dissymmetric server and run.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *sermud_worker_create_and_run(void *arg);
+
+/**
+ * @brief create the listener thread, unblock, dissymmetric server and run
+ * This function creates the listener thread, unblock, dissymmetric server and run.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *sermud_listener_create_and_run(void *arg);
+
+/**
+ * @brief create the multi thread, unblock, dissymmetric server and run
+ * This function creates the multi thread, unblock, dissymmetric server and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t sermud_create_and_run(struct ProgramParams *params);
+
+/**
+ * @brief the multi thread, unblock, mutliplexing IO server prints informations
+ * The multi thread, unblock, mutliplexing IO server prints informations.
+ * @param server_mum the server information
+ */
+void sermum_info_print(struct ServerMum *server_mum);
+
+/**
+ * @brief the single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors
+ * The single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors.
+ * @param server_unit the server unit
+ * @return the result pointer
+ */
+int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit);
+
+/**
+ * @brief the single thread, unblock, mutliplexing IO server accepts the connections
+ * The single thread, unblock, mutliplexing IO server accepts the connections.
+ * @param server_unit the server unit
+ * @param server_handler the server handler
+ * @return the result pointer
+ */
+int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler);
+
+/**
+ * @brief the single thread, unblock, mutliplexing IO server processes the events
+ * The single thread, unblock, mutliplexing IO server processes the events.
+ * @param server_unit the server unit
+ * @return the result pointer
+ */
+int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit);
+
+/**
+ * @brief create the single thread, unblock, mutliplexing IO server
+ * This function creates the single thread, unblock, mutliplexing IO server.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *sersum_create_and_run(void *arg);
+
+/**
+ * @brief create the multi thread, unblock, mutliplexing IO server and run
+ * This function creates the multi thread, unblock, mutliplexing IO server and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t sermum_create_and_run(struct ProgramParams *params);
+
+/**
+ * @brief create server and run
+ * This function create the specify server and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t server_create_and_run(struct ProgramParams *params);
+
+
+#endif // __EXAMPLES_SERVER_H__
diff --git a/examples/inc/utilities.h b/examples/inc/utilities.h
index f9064c5..a684d35 100644
--- a/examples/inc/utilities.h
+++ b/examples/inc/utilities.h
@@ -24,16 +24,23 @@
#include <unistd.h>
#include <ctype.h>
+#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
+#include "securec.h"
+#include "securectype.h"
+
#define PRINT_ERROR(format, ...) do \
{ \
@@ -59,19 +66,76 @@
printf(format, ##__VA_ARGS__); \
printf("\n"); \
} while(0)
+#define PRINT_SERVER_DATAFLOW(format, ...) do \
+ { \
+ printf("\033[?25l\033[A\033[K"); \
+ printf("--> <server>: "); \
+ printf(format, ##__VA_ARGS__); \
+ printf("\033[?25h\n"); \
+ } while(0)
+#define PRINT_CLIENT_DATAFLOW(format, ...) do \
+ { \
+ printf("\033[?25l\033[A\033[K"); \
+ printf("--> <client>: "); \
+ printf(format, ##__VA_ARGS__); \
+ printf("\033[?25h\n"); \
+ } while(0)
#define LIMIT_VAL_RANGE(val, min, max) ((val) < (min) ? (min) : ((val) > (max) ? (max) : (val)))
#define CHECK_VAL_RANGE(val, min, max) ((val) < (min) ? (false) : ((val) > (max) ? (false) : (true)))
-#define PROGRAM_OK (0) ///< program ok flag
-#define PROGRAM_ABORT (1) ///< program abort flag
-#define PROGRAM_FAULT (-1) ///< program fault flag
+#define PROGRAM_OK (0) ///< program ok flag
+#define PROGRAM_ABORT (1) ///< program abort flag
+#define PROGRAM_FAULT (-1) ///< program fault flag
+#define PROGRAM_INPROGRESS (-2) ///< program in progress flag
+
+#define UNIX_TCP_PORT_MIN (1024) ///< TCP minimum port number in unix
+#define UNIX_TCP_PORT_MAX (65535) ///< TCP minimum port number in unix
+#define THREAD_NUM_MIN (1) ///< minimum number of thead
+#define THREAD_NUM_MAX (1000) ///< maximum number of thead
+#define MESSAGE_PKTLEN_MIN (2) ///< minimum length of message (1 byte)
+#define MESSAGE_PKTLEN_MAX (1024 * 1024 * 10) ///< maximum length of message (10 Mb)
+
+#define SERVER_SOCKET_LISTEN_BACKLOG (128) ///< the queue of socket
+#define SERVER_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll
+#define SERVER_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll
+
+#define CLIENT_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll
+#define CLIENT_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll
+
+#define TERMINAL_REFRESH_MS (100) ///< the time cut off between of terminal refresh
+
+#define SOCKET_UNIX_DOMAIN_FILE "unix_domain_file" ///< socket unix domain file
+
+
+/**
+ * @brief create the socket and listen
+ * Thi function creates the socket and listen.
+ * @param socket_fd the socket file descriptor
+ * @param ip ip address
+ * @param port port number
+ * @param domain domain
+ * @return the result
+ */
+int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain);
+
+/**
+ * @brief create the socket and connect
+ * Thi function creates the socket and connect.
+ * @param socket_fd the socket file descriptor
+ * @param ip ip address
+ * @param port port number
+ * @param domain domain
+ * @return the result
+ */
+int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain);
-#define UNIX_TCP_PORT_MIN (1024) ///< TCP minimum port number in unix
-#define UNIX_TCP_PORT_MAX (65535) ///< TCP minimum port number in unix
-#define THREAD_NUM_MIN (1) ///< minimum number of thead
-#define THREAD_NUM_MAX (1000) ///< maximum number of thead
-#define MESSAGE_PKTLEN_MIN (1) ///< minimum length of message (1 byte)
-#define MESSAGE_PKTLEN_MAX (10485760) ///< maximum length of message (10 Mb)
+/**
+ * @brief set the socket to unblock
+ * Thi function sets the socket to unblock.
+ * @param socket_fd the socket file descriptor
+ * @return the result
+ */
+int32_t set_socket_unblock(int32_t socket_fd);
#endif // __EXAMPLES_UTILITIES_H__
diff --git a/examples/main.c b/examples/main.c
index f050dc5..5338572 100644
--- a/examples/main.c
+++ b/examples/main.c
@@ -12,6 +12,8 @@
#include "utilities.h"
#include "parameter.h"
+#include "server.h"
+#include "client.h"
static struct ProgramParams prog_params;
@@ -27,5 +29,11 @@ int32_t main(int argc, char *argv[])
}
program_params_print(&prog_params);
+ if (strcmp(prog_params.as, "server") == 0) {
+ server_create_and_run(&prog_params);
+ } else {
+ client_create_and_run(&prog_params);
+ }
+
return ret;
}
diff --git a/examples/src/bussiness.c b/examples/src/bussiness.c
new file mode 100644
index 0000000..f55a37b
--- /dev/null
+++ b/examples/src/bussiness.c
@@ -0,0 +1,234 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "bussiness.h"
+
+
+static const char bussiness_messages_low[] = "abcdefghijklmnopqrstuvwxyz"; // the lower charactors of business message
+static const char bussiness_messages_cap[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; // the capital charactors of business message
+
+
+// read by specify api
+ int32_t read_api(int32_t fd, char *buffer_in, const uint32_t length, const char *api)
+ {
+ if (strcmp(api, "readwrite") == 0) {
+ return read(fd, buffer_in, length);
+ } else if (strcmp(api, "recvsend") == 0) {
+ return recv(fd, buffer_in, length, 0);
+ } else {
+ struct msghdr msg_recv;
+ struct iovec iov;
+
+ msg_recv.msg_name = NULL;
+ msg_recv.msg_namelen = 0;
+ msg_recv.msg_iov = &iov;
+ msg_recv.msg_iovlen = 1;
+ msg_recv.msg_iov->iov_base = buffer_in;
+ msg_recv.msg_iov->iov_len = length;
+ msg_recv.msg_control = 0;
+ msg_recv.msg_controllen = 0;
+ msg_recv.msg_flags = 0;
+
+ return recvmsg(fd, &msg_recv, 0);
+ }
+ }
+
+// write by specify api
+ int32_t write_api(int32_t fd, char *buffer_out, const uint32_t length, const char *api)
+ {
+ if (strcmp(api, "readwrite") == 0) {
+ return write(fd, buffer_out, length);
+ } else if (strcmp(api, "recvsend") == 0) {
+ return send(fd, buffer_out, length, 0);
+ } else {
+ struct msghdr msg_send;
+ struct iovec iov;
+
+ msg_send.msg_name = NULL;
+ msg_send.msg_namelen = 0;
+ msg_send.msg_iov = &iov;
+ msg_send.msg_iovlen = 1;
+ msg_send.msg_iov->iov_base = buffer_out;
+ msg_send.msg_iov->iov_len = length;
+ msg_send.msg_control = 0;
+ msg_send.msg_controllen = 0;
+ msg_send.msg_flags = 0;
+
+ return sendmsg(fd, &msg_send, 0);
+ }
+ }
+
+// the business processsing of server
+void server_bussiness(char *out, const char *in, uint32_t size)
+{
+ char diff = 'a' - 'A';
+ for (uint32_t i = 0; i < size; ++i) {
+ if (in[i] != '\0') {
+ out[i] = in[i] - diff;
+ } else {
+ out[i] = '\0';
+ }
+ }
+}
+
+// the business processsing of client
+int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, uint32_t *msg_idx)
+{
+ if (verify == false) {
+ for (uint32_t i = 0; i < (size - 1); ++i) {
+ out[i] = bussiness_messages_low[(*msg_idx + i) % BUSSINESS_MESSAGE_SIZE];
+ }
+ } else {
+ uint32_t verify_start_idx = (*msg_idx == 0) ? (BUSSINESS_MESSAGE_SIZE - 1) : (*msg_idx - 1);
+ for (uint32_t i = 0; i < (size - 1); ++i) {
+ if (in[i] != bussiness_messages_cap[(verify_start_idx + i) % BUSSINESS_MESSAGE_SIZE]) {
+ return PROGRAM_FAULT;
+ }
+ out[i] = bussiness_messages_low[(*msg_idx + i) % BUSSINESS_MESSAGE_SIZE];
+ }
+ }
+ out[size - 1] = '\0';
+
+ ++(*msg_idx);
+ *msg_idx = (*msg_idx) % BUSSINESS_MESSAGE_SIZE;
+
+ return PROGRAM_OK;
+}
+
+// server answers
+int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api)
+{
+ const uint32_t length = pktlen;
+ char *buffer_in = (char *)malloc(length * sizeof(char));
+ char *buffer_out = (char *)malloc(length * sizeof(char));
+
+ int32_t cread = 0;
+ int32_t sread = length;
+ while (cread < sread) {
+ int32_t nread = read_api(server_handler->fd, buffer_in, length, api);
+ if (nread == 0) {
+ return PROGRAM_ABORT;
+ } else if (nread < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cread += nread;
+ continue;
+ }
+ }
+
+ server_bussiness(buffer_out, buffer_in, length);
+
+ int32_t cwrite = 0;
+ int32_t swrite = length;
+ while (cwrite < swrite) {
+ int32_t nwrite = write_api(server_handler->fd, buffer_out, length, api);
+ if (nwrite == 0) {
+ return PROGRAM_ABORT;
+ } else if (nwrite < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cwrite += nwrite;
+ continue;
+ }
+ }
+
+ free(buffer_in);
+ free(buffer_out);
+
+ return PROGRAM_OK;
+}
+
+// client asks
+int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api)
+{
+ const uint32_t length = pktlen;
+ char *buffer_in = (char *)malloc(length * sizeof(char));
+ char *buffer_out = (char *)malloc(length * sizeof(char));
+
+ client_bussiness(buffer_out, buffer_in, length, false, &(client_handler->msg_idx));
+
+ int32_t cwrite = 0;
+ int32_t swrite = length;
+ while (cwrite < swrite) {
+ int32_t nwrite = write_api(client_handler->fd, buffer_out, length, api);
+ if (nwrite == 0) {
+ return PROGRAM_ABORT;
+ } else if (nwrite < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cwrite += nwrite;
+ continue;
+ }
+ }
+
+ free(buffer_in);
+ free(buffer_out);
+
+ return PROGRAM_OK;
+}
+
+// client checks
+int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api)
+{
+ const uint32_t length = pktlen;
+ char *buffer_in = (char *)malloc(length * sizeof(char));
+ char *buffer_out = (char *)malloc(length * sizeof(char));
+
+ int32_t cread = 0;
+ int32_t sread = length;
+ while (cread < sread) {
+ int32_t nread = read_api(client_handler->fd, buffer_in, length, api);
+ if (nread == 0) {
+ return PROGRAM_ABORT;
+ } else if (nread < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cread += nread;
+ continue;
+ }
+ }
+
+ if (client_bussiness(buffer_out, buffer_in, length, verify, &(client_handler->msg_idx)) < 0) {
+ PRINT_ERROR("message verify fault! ");
+ getchar();
+ }
+
+ int32_t cwrite = 0;
+ int32_t swrite = length;
+ while (cwrite < swrite) {
+ int32_t nwrite = write_api(client_handler->fd, buffer_out, length, api);
+ if (nwrite == 0) {
+ return PROGRAM_ABORT;
+ } else if (nwrite < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cwrite += nwrite;
+ continue;
+ }
+ }
+
+ free(buffer_in);
+ free(buffer_out);
+
+ return PROGRAM_OK;
+}
\ No newline at end of file
diff --git a/examples/src/client.c b/examples/src/client.c
new file mode 100644
index 0000000..aafcd00
--- /dev/null
+++ b/examples/src/client.c
@@ -0,0 +1,387 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "client.h"
+
+
+static pthread_mutex_t client_debug_mutex; // the client mutex for printf
+
+
+// the single thread, client prints informations
+void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug)
+{
+ if (debug == true) {
+ pthread_mutex_lock(&client_debug_mutex);
+ struct in_addr sin_addr;
+ sin_addr.s_addr = ip;
+ PRINT_CLIENT("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \
+ ch_str, \
+ getpid(), \
+ pthread_self(), \
+ act_str, \
+ inet_ntoa(sin_addr), \
+ ntohs(port));
+ pthread_mutex_unlock(&client_debug_mutex);
+ }
+}
+
+// the client prints informations
+void client_info_print(struct Client *client)
+{
+ if (client->debug == false) {
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
+ uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000;
+
+ uint32_t curr_connect = 0;
+ double bytes_ps = 0;
+ uint64_t begin_send_bytes = 0;
+ struct ClientUnit *begin_uint = client->uints;
+ while (begin_uint != NULL) {
+ curr_connect += begin_uint->curr_connect;
+ begin_send_bytes += begin_uint->send_bytes;
+ begin_uint = begin_uint->next;
+ }
+
+ struct timeval delay;
+ delay.tv_sec = 0;
+ delay.tv_usec = TERMINAL_REFRESH_MS * 1000;
+ select(0, NULL, NULL, NULL, &delay);
+
+ uint64_t end_send_bytes = 0;
+ struct ClientUnit *end_uint = client->uints;
+ while (end_uint != NULL) {
+ end_send_bytes += end_uint->send_bytes;
+ end_uint = end_uint->next;
+ }
+
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000;
+
+ double bytes_sub = end_send_bytes > begin_send_bytes ? (double)(end_send_bytes - begin_send_bytes) : 0;
+ double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0;
+
+ bytes_ps = bytes_sub / time_sub;
+
+ if (bytes_ps < 1024) {
+ PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f B/s", curr_connect, bytes_ps);
+ } else if (bytes_ps < (1024 * 1024)) {
+ PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f KB/s", curr_connect, bytes_ps / 1024);
+ } else {
+ PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024));
+ }
+ }
+}
+
+// the single thread, client try to connect to server, register to epoll
+int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, uint16_t port, const char *domain)
+{
+ int32_t create_socket_and_connect_ret = create_socket_and_connect(&(client_handler->fd), ip, port, domain);
+ if (create_socket_and_connect_ret == PROGRAM_INPROGRESS) {
+ struct epoll_event ep_ev;
+ ep_ev.events = EPOLLOUT;
+ ep_ev.data.ptr = (void *)client_handler;
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client cant't set epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+ return PROGRAM_OK;
+}
+
+// the single thread, client retry to connect to server, register to epoll
+int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct ClientHandler *client_handler)
+{
+ int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_handler, client_unit->epfd, client_unit->ip, client_unit->port, client_unit->domain);
+ if (clithd_try_cnntask_ret < 0) {
+ if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) {
+ return PROGRAM_OK;
+ }
+ return PROGRAM_FAULT;
+ }
+ struct epoll_event ep_ev;
+ ep_ev.events = EPOLLIN | EPOLLET;
+ ep_ev.data.ptr = (void *)client_handler;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_ADD, client_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client cant't set epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ ++(client_unit->curr_connect);
+
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+
+ int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api);
+ if (client_ask_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, client_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_handler->fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_ask_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close(client_handler->fd) < 0) {
+ PRINT_ERROR("client can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+
+ return PROGRAM_OK;
+}
+
+// the single thread, client connects and gets epoll feature descriptors
+int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit)
+{
+ const uint32_t connect_num = client_unit->connect_num;
+
+ client_unit->epfd = epoll_create(CLIENT_EPOLL_SIZE_MAX);
+ if (client_unit->epfd < 0) {
+ PRINT_ERROR("client can't create epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (uint32_t i = 0; i < connect_num; ++i) {
+ int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_unit->handlers + i, client_unit->epfd, client_unit->ip, client_unit->port, client_unit->domain);
+ if (clithd_try_cnntask_ret < 0) {
+ if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) {
+ continue;
+ }
+ return PROGRAM_FAULT;
+ } else {
+ struct epoll_event ep_ev;
+ ep_ev.events = EPOLLIN | EPOLLET;
+ ep_ev.data.ptr = (struct ClientHandler *)(client_unit->handlers + i);
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_ADD, (client_unit->handlers + i)->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client cant't set epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ ++(client_unit->curr_connect);
+
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ if (getpeername((client_unit->handlers + i)->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+
+ int32_t client_ask_ret = client_ask(client_unit->handlers + i, client_unit->pktlen, client_unit->api);
+ if (client_ask_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, (client_unit->handlers + i)->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_unit->epevs[i].data.fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_ask_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close((client_unit->handlers + i)->fd) < 0) {
+ PRINT_ERROR("client can't close the socket! ");
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// the single thread, client processes epoll events
+int32_t clithd_proc_epevs(struct ClientUnit *client_unit)
+{
+ int32_t epoll_nfds = epoll_wait(client_unit->epfd, client_unit->epevs, CLIENT_EPOLL_SIZE_MAX, CLIENT_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("client epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = client_unit->epevs + i;
+
+ if (curr_epev->events == EPOLLERR) {
+ PRINT_ERROR("client epoll wait error! %d", curr_epev->events);
+ return PROGRAM_FAULT;
+ } else if (curr_epev->events == EPOLLOUT) {
+ int32_t connect_error = 0;
+ socklen_t connect_error_len = sizeof(connect_error);
+ struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr;
+ if (getsockopt(client_handler->fd, SOL_SOCKET, SO_ERROR, (void *)(&connect_error), &connect_error_len) < 0) {
+ PRINT_ERROR("client can't get socket option %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ if (connect_error < 0) {
+ if (connect_error == ETIMEDOUT) {
+ if (client_thread_retry_connect(client_unit, client_handler) < 0) {
+ return PROGRAM_FAULT;
+ }
+ continue;
+ }
+ PRINT_ERROR("client connect error %d! ", connect_error);
+ return PROGRAM_FAULT;
+ } else {
+ ++(client_unit->curr_connect);
+
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+
+ int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api);
+ if (client_ask_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_ask_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close(curr_epev->data.fd) < 0) {
+ PRINT_ERROR("client can't close the socket! ");
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+ }
+ } else if (curr_epev->events == EPOLLIN) {
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr;
+ if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ int32_t client_chkans_ret = client_chkans((struct ClientHandler *)curr_epev->data.ptr, client_unit->pktlen, client_unit->verify, client_unit->api);
+ if (client_chkans_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_chkans_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close(curr_epev->data.fd) < 0) {
+ PRINT_ERROR("client can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "receive", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create client of single thread and run
+void *client_s_create_and_run(void *arg)
+{
+ struct ClientUnit *client_unit = (struct ClientUnit *)arg;
+
+ if (client_thread_create_epfd_and_reg(client_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (clithd_proc_epevs(client_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+ for (int i = 0; i < client_unit->connect_num; ++i) {
+ close((client_unit->handlers + i)->fd);
+ }
+ close(client_unit->epfd);
+
+ return (void *)PROGRAM_OK;
+}
+
+// create client and run
+int32_t client_create_and_run(struct ProgramParams *params)
+{
+ const uint32_t connect_num = params->connect_num;
+ const uint32_t thread_num = params->thread_num;
+ pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t));
+ struct Client *client = (struct Client *)malloc(sizeof(struct Client));
+ struct ClientUnit *client_unit = (struct ClientUnit *)malloc(sizeof(struct ClientUnit));
+
+ if (pthread_mutex_init(&client_debug_mutex, NULL) < 0) {
+ PRINT_ERROR("client can't init posix mutex %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ client->uints = client_unit;
+ client->debug = params->debug;
+
+ for (uint32_t i = 0; i < thread_num; ++i) {
+ client_unit->handlers = (struct ClientHandler *)malloc(connect_num * sizeof(struct ClientHandler));
+ for (uint32_t j = 0; j < connect_num; ++j) {
+ client_unit->handlers[j].fd = -1;
+ client_unit->handlers[j].msg_idx = 0;
+ }
+ client_unit->epfd = -1;
+ client_unit->epevs = (struct epoll_event *)malloc(CLIENT_EPOLL_SIZE_MAX * sizeof(struct epoll_event));
+ client_unit->curr_connect = 0;
+ client_unit->send_bytes = 0;
+ client_unit->ip = inet_addr(params->ip);
+ client_unit->port = htons(params->port);
+ client_unit->connect_num = params->connect_num;
+ client_unit->pktlen = params->pktlen;
+ client_unit->verify = params->verify;
+ client_unit->domain = params->domain;
+ client_unit->api = params->api;
+ client_unit->debug = params->debug;
+ client_unit->next = (struct ClientUnit *)malloc(sizeof(struct ClientUnit));
+
+ if (pthread_create((tids + i), NULL, client_s_create_and_run, client_unit) < 0) {
+ PRINT_ERROR("client can't create thread of poisx %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_unit = client_unit->next;
+ }
+
+ if (client->debug == false) {
+ printf("[program informations]: \n\n");
+ }
+ while (true) {
+ client_info_print(client);
+ }
+
+ pthread_mutex_destroy(&client_debug_mutex);
+
+ return PROGRAM_OK;
+}
diff --git a/examples/src/parameter.c b/examples/src/parameter.c
index 996188b..100ee11 100644
--- a/examples/src/parameter.c
+++ b/examples/src/parameter.c
@@ -22,148 +22,141 @@ const char prog_short_opts[] = \
"m:" // model
"t:" // thread number
"c:" // connect number
+ "D:" // communication domain
"A:" // api
"P:" // pktlen
"v" // verify
"r" // ringpmd
+ "d" // debug
"h" // help
;
// program long options
-const struct ProgramOption prog_long_opts[] = { \
+const struct ProgramOption prog_long_opts[] = \
+{
{PARAM_NAME_AS, REQUIRED_ARGUMETN, NULL, PARAM_NUM_AS},
{PARAM_NAME_IP, REQUIRED_ARGUMETN, NULL, PARAM_NUM_IP},
{PARAM_NAME_PORT, REQUIRED_ARGUMETN, NULL, PARAM_NUM_PORT},
{PARAM_NAME_MODEL, REQUIRED_ARGUMETN, NULL, PARAM_NUM_MODEL},
{PARAM_NAME_THREAD_NUM, REQUIRED_ARGUMETN, NULL, PARAM_NUM_THREAD_NUM},
{PARAM_NAME_CONNECT_NUM, REQUIRED_ARGUMETN, NULL, PARAM_NUM_CONNECT_NUM},
+ {PARAM_NAME_DOMAIN, REQUIRED_ARGUMETN, NULL, PARAM_NUM_DOMAIN},
{PARAM_NAME_API, REQUIRED_ARGUMETN, NULL, PARAM_NUM_API},
{PARAM_NAME_PKTLEN, REQUIRED_ARGUMETN, NULL, PARAM_NUM_PKTLEN},
{PARAM_NAME_VERIFY, NO_ARGUMENT, NULL, PARAM_NUM_VERIFY},
{PARAM_NAME_RINGPMD, NO_ARGUMENT, NULL, PARAM_NUM_RINGPMD},
+ {PARAM_NAME_DEBUG, NO_ARGUMENT, NULL, PARAM_NUM_DEBUG},
{PARAM_NAME_HELP, NO_ARGUMENT, NULL, PARAM_NUM_HELP},
};
// get long options
-int getopt_long(int argc, char * const argv[], const char *optstring, const struct ProgramOption *long_opts,
- int *long_idx);
+int getopt_long(int argc, char * const argv[], const char *optstring, const struct ProgramOption *long_opts, int *long_idx);
// set `as` parameter
-int32_t program_param_prase_as(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_as(struct ProgramParams *params)
{
- if (strcmp(arg, "server") == 0 || strcmp(arg, "client") == 0) {
- params->as = arg;
- }
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
+ if (strcmp(optarg, "server") == 0 || strcmp(optarg, "client") == 0) {
+ params->as = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
-
- return PROGRAM_OK;
}
// set `ip` parameter
-int32_t program_param_prase_ip(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_ip(struct ProgramParams *params)
{
- if (inet_addr(arg) != INADDR_NONE) {
- params->ip = arg;
- }
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
+ if (inet_addr(optarg) != INADDR_NONE) {
+ params->ip = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
-
- return PROGRAM_OK;
}
// set `port` parameter
-int32_t program_param_prase_port(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_port(struct ProgramParams *params)
{
- int32_t port_arg = atoi(optarg);
+ int32_t port_arg = strtol(optarg, NULL, 0);
+ printf("%d\n", port_arg);
if (CHECK_VAL_RANGE(port_arg, UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX) == true) {
params->port = (uint32_t)port_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `model` parameter
-int32_t program_param_prase_model(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_model(struct ProgramParams *params)
{
if (strcmp(optarg, "mum") == 0 || strcmp(optarg, "mud") == 0) {
params->model = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `connect_num` parameter
-int32_t program_param_prase_connectnum(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_connectnum(struct ProgramParams *params)
{
- int32_t connectnum_arg = atoi(optarg);
+ int32_t connectnum_arg = strtol(optarg, NULL, 0);
if (connectnum_arg > 0) {
params->connect_num = (uint32_t)connectnum_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `thread_num` parameter
-int32_t program_param_prase_threadnum(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_threadnum(struct ProgramParams *params)
{
- int32_t threadnum_arg = atoi(optarg);
+ int32_t threadnum_arg = strtol(optarg, NULL, 0);
if (CHECK_VAL_RANGE(threadnum_arg, THREAD_NUM_MIN, THREAD_NUM_MAX) == true) {
params->thread_num = (uint32_t)threadnum_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
+}
- return PROGRAM_OK;
+// set `domain` parameter
+void program_param_parse_domain(struct ProgramParams *params)
+{
+ if (strcmp(optarg, "unix") == 0 || strcmp(optarg, "posix") == 0) {
+ params->domain = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
+ }
}
// set `api` parameter
-int32_t program_param_prase_api(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_api(struct ProgramParams *params)
{
- if (strcmp(optarg, "unix") == 0 || strcmp(optarg, "posix") == 0) {
+ printf("aaaaaa %s\n", optarg);
+ if (strcmp(optarg, "readwrite") == 0 || strcmp(optarg, "recvsend") == 0 || strcmp(optarg, "recvsendmsg") == 0) {
params->api = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `pktlen` parameter
-int32_t program_param_prase_pktlen(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_pktlen(struct ProgramParams *params)
{
- int32_t pktlen_arg = atoi(optarg);
+ int32_t pktlen_arg = strtol(optarg, NULL, 0);
if (CHECK_VAL_RANGE(pktlen_arg, MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX) == true) {
params->pktlen = (uint32_t)pktlen_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// initialize the parameters
@@ -175,10 +168,12 @@ void program_params_init(struct ProgramParams *params)
params->model = PARAM_DEFAULT_MODEL;
params->thread_num = PARAM_DEFAULT_THREAD_NUM;
params->connect_num = PARAM_DEFAULT_CONNECT_NUM;
+ params->domain = PARAM_DEFAULT_DOMAIN;
params->api = PARAM_DEFAULT_API;
params->pktlen = PARAM_DEFAULT_PKTLEN;
params->verify = PARAM_DEFAULT_VERIFY;
params->ringpmd = PARAM_DEFAULT_RINGPMD;
+ params->debug = PARAM_DEFAULT_DEBUG;
}
// print program helps
@@ -188,19 +183,24 @@ void program_params_help(void)
printf("-a, --as [server | client]: set programas server or client. \n");
printf(" server: as server. \n");
printf(" client: as client. \n");
- printf("-i, --ip [xxx.xxx.xxx.xxx]: set ip address. \n");
- printf("-p, --port [xxxx]: set port number in range of %d - %d. \n", UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX);
+ printf("-i, --ip [???.???.???.???]: set ip address. \n");
+ printf("-p, --port [????]: set port number in range of %d - %d. \n", UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX);
printf("-m, --model [mum | mud]: set the network model. \n");
printf(" mum: multi thread, unblock, multiplexing IO network model. \n");
printf(" mud: multi thread, unblock, dissymmetric network model. \n");
- printf("-t, --threadnum [xxxx]: set thread number in range of %d - %d. \n", THREAD_NUM_MIN, THREAD_NUM_MAX);
- printf("-c, --connectnum [xxxx]: set thread number of connection. \n");
- printf("-A, --api [unix | posix]: set api type is server or client. \n");
+ printf("-t, --threadnum [???]: set thread number in range of %d - %d. \n", THREAD_NUM_MIN, THREAD_NUM_MAX);
+ printf("-c, --connectnum [???]: set connection number of each thread. \n");
+ printf("-D, --domain [unix | posix]: set domain type is server or client. \n");
printf(" unix: use unix's api. \n");
printf(" posix: use posix api. \n");
- printf("-P, --pktlen [xxxx]: set packet length in range of %d - %d. \n", MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX);
+ printf("-A, --api [readwrite | recvsend | recvsendmsg]: set api type is server or client. \n");
+ printf(" readwrite: use `read` and `write`. \n");
+ printf(" recvsend: use `recv and `send`. \n");
+ printf(" recvsendmsg: use `recvmsg` and `sendmsg`. \n");
+ printf("-P, --pktlen [????]: set packet length in range of %d - %d. \n", MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX);
printf("-v, --verify: set to verifying the message packet. \n");
- printf("-r, --ringpmd: set use ringpmd. \n");
+ printf("-r, --ringpmd: set to use ringpmd. \n");
+ printf("-d, --debug: set to print the debug information. \n");
printf("-h, --help: see helps. \n");
printf("\n");
}
@@ -208,40 +208,44 @@ void program_params_help(void)
// parse the parameters
int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char *argv[])
{
- int32_t ret = PROGRAM_OK;
+ int32_t c;
- while (ret == PROGRAM_OK) {
+ while (true) {
int32_t opt_idx = 0;
- int32_t c = getopt_long(argc, argv, prog_short_opts, prog_long_opts, &opt_idx);
+ c = getopt_long(argc, argv, prog_short_opts, prog_long_opts, &opt_idx);
+
if (c == -1) {
break;
}
switch (c) {
case (PARAM_NUM_AS):
- ret = program_param_prase_as(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_as(params);
break;
case (PARAM_NUM_IP):
- ret = program_param_prase_ip(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_ip(params);
break;
case (PARAM_NUM_PORT):
- ret = program_param_prase_port(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_port(params);
break;
case (PARAM_NUM_MODEL):
- ret = program_param_prase_model(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_model(params);
break;
case (PARAM_NUM_CONNECT_NUM):
- ret = program_param_prase_connectnum(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_connectnum(params);
break;
case (PARAM_NUM_THREAD_NUM):
- ret = program_param_prase_threadnum(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_threadnum(params);
+ break;
+ case (PARAM_NUM_DOMAIN):
+ program_param_parse_domain(params);
break;
case (PARAM_NUM_API):
- ret = program_param_prase_api(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_api(params);
break;
case (PARAM_NUM_PKTLEN):
- ret = program_param_prase_pktlen(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_pktlen(params);
break;
case (PARAM_NUM_VERIFY):
params->verify = true;
@@ -249,6 +253,9 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char *
case (PARAM_NUM_RINGPMD):
params->ringpmd = true;
break;
+ case (PARAM_NUM_DEBUG):
+ params->debug = true;
+ break;
case (PARAM_NUM_HELP):
program_params_help();
return PROGRAM_ABORT;
@@ -260,7 +267,12 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char *
}
}
- return ret;
+ if (strcmp(params->domain, "unix") == 0) {
+ params->thread_num = 1;
+ params->connect_num = 1;
+ }
+
+ return PROGRAM_OK;
}
// print the parameters
@@ -269,14 +281,28 @@ void program_params_print(struct ProgramParams *params)
printf("\n");
printf("[program parameters]: \n");
printf("--> [as]: %s \n", params->as);
- printf("--> [ip]: %s \n", params->ip);
- printf("--> [port]: %u \n", params->port);
- printf("--> [model]: %s \n", params->model);
- printf("--> [thread number]: %u \n", params->thread_num);
- printf("--> [connection number]: %u \n", params->connect_num);
- printf("--> [api]: %s \n", params->api);
+ printf("--> [server ip]: %s \n", params->ip);
+ printf("--> [server port]: %u \n", params->port);
+ if (strcmp(params->as, "server") == 0) {
+ printf("--> [model]: %s \n", params->model);
+ }
+ if ((strcmp(params->as, "server") == 0 && strcmp(params->model, "mum") == 0) || strcmp(params->as, "client") == 0) {
+ printf("--> [thread number]: %u \n", params->thread_num);
+ }
+ if (strcmp(params->as, "client") == 0) {
+ printf("--> [connection number]: %u \n", params->connect_num);
+ }
+ printf("--> [domain]: %s \n", params->domain);
+ if (strcmp(params->api, "readwrite") == 0) {
+ printf("--> [api]: read & write \n");
+ } else if (strcmp(params->api, "recvsend") == 0) {
+ printf("--> [api]: recv & send \n");
+ } else {
+ printf("--> [api]: recvmsg & sendmsg \n");
+ }
printf("--> [packet length]: %u \n", params->pktlen);
- printf("--> [verify]: %s \n", (true == params->verify) ? "on" : "off");
- printf("--> [ringpmd]: %s \n", (true == params->ringpmd) ? "on" : "off");
+ printf("--> [verify]: %s \n", (params->verify == true) ? "on" : "off");
+ printf("--> [ringpmd]: %s \n", (params->ringpmd == true) ? "on" : "off");
+ printf("--> [debug]: %s \n", (params->debug == true) ? "on" : "off");
printf("\n");
}
diff --git a/examples/src/server.c b/examples/src/server.c
new file mode 100644
index 0000000..d1dab72
--- /dev/null
+++ b/examples/src/server.c
@@ -0,0 +1,578 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "server.h"
+
+
+static pthread_mutex_t server_debug_mutex; // the server mutex for debug
+
+// server debug information print
+void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug)
+{
+ if (debug == true) {
+ pthread_mutex_lock(&server_debug_mutex);
+ struct in_addr sin_addr;
+ sin_addr.s_addr = ip;
+ PRINT_SERVER("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \
+ ch_str, \
+ getpid(), \
+ pthread_self(), \
+ act_str, \
+ inet_ntoa(sin_addr), \
+ ntohs(port));
+ pthread_mutex_unlock(&server_debug_mutex);
+ }
+}
+
+// the multi thread, unblock, dissymmetric server prints informations
+void sermud_info_print(struct ServerMud *server_mud)
+{
+ if (server_mud->debug == false) {
+ uint32_t curr_connect = server_mud->curr_connect;
+
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
+ uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000;
+
+ double bytes_ps = 0;
+ uint64_t begin_recv_bytes = 0;
+ struct ServerMudWorker *begin_uint = server_mud->workers;
+ while (begin_uint != NULL) {
+ begin_recv_bytes += begin_uint->recv_bytes;
+ begin_uint = begin_uint->next;
+ }
+
+ struct timeval delay;
+ delay.tv_sec = 0;
+ delay.tv_usec = TERMINAL_REFRESH_MS * 1000;
+ select(0, NULL, NULL, NULL, &delay);
+
+ uint64_t end_recv_bytes = 0;
+ struct ServerMudWorker *end_uint = server_mud->workers;
+ while (end_uint != NULL) {
+ end_recv_bytes += end_uint->recv_bytes;
+ end_uint = end_uint->next;
+ }
+
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000;
+
+ double bytes_sub = end_recv_bytes > begin_recv_bytes ? (double)(end_recv_bytes - begin_recv_bytes) : 0;
+ double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0;
+
+ bytes_ps = bytes_sub / time_sub;
+
+ if (bytes_ps < 1024) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f B/s", curr_connect, bytes_ps);
+ } else if (bytes_ps < (1024 * 1024)) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f KB/s", curr_connect, bytes_ps / 1024);
+ } else {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024));
+ }
+ }
+}
+
+// the worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+int32_t sermud_worker_create_epfd_and_reg(struct ServerMudWorker *worker_unit)
+{
+ worker_unit->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX);
+ if (worker_unit->epfd < 0) {
+ PRINT_ERROR("server can't create epoll %d! ", worker_unit->epfd);
+ return PROGRAM_FAULT;
+ }
+
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)&(worker_unit->worker);
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(worker_unit->epfd, EPOLL_CTL_ADD, worker_unit->worker.fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't control epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ return PROGRAM_OK;
+}
+
+// the listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud)
+{
+ server_mud->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX);
+ if (server_mud->epfd < 0) {
+ PRINT_ERROR("server can't create epoll %d! ", server_mud->epfd);
+ return PROGRAM_FAULT;
+ }
+
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)&(server_mud->listener);
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(server_mud->epfd, EPOLL_CTL_ADD, server_mud->listener.fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't control epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_debug_print("server mud listener", "waiting", server_mud->ip, server_mud->port, server_mud->debug);
+
+ return PROGRAM_OK;
+}
+
+// the listener thread, unblock, dissymmetric server accepts the connections
+int32_t sermud_listener_accept_connects(struct ServerMud *server_mud)
+{
+ while (true) {
+ struct sockaddr_in accept_addr;
+ uint32_t sockaddr_in_len = sizeof(struct sockaddr_in);
+ int32_t accept_fd = accept(server_mud->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len);
+ if (accept_fd < 0) {
+ break;
+ }
+
+ if (set_socket_unblock(accept_fd) < 0) {
+ PRINT_ERROR("server can't set the connect socket to unblock! ");
+ return PROGRAM_FAULT;
+ }
+
+ ++(server_mud->curr_connect);
+
+ pthread_t *tid = (pthread_t *)malloc(sizeof(pthread_t));
+ struct ServerMudWorker *worker = (struct ServerMudWorker *)malloc(sizeof(struct ServerMudWorker));
+ worker->worker.fd = accept_fd;
+ worker->epfd = -1;
+ worker->epevs = (struct epoll_event *)malloc(sizeof(struct epoll_event));
+ worker->recv_bytes = 0;
+ worker->pktlen = server_mud->pktlen;
+ worker->ip = accept_addr.sin_addr.s_addr;
+ worker->port = accept_addr.sin_port;
+ worker->api = server_mud->api;
+ worker->debug = server_mud->debug;
+ worker->next = server_mud->workers;
+
+ server_mud->workers = worker;
+
+ if (pthread_create(tid, NULL, sermud_worker_create_and_run, server_mud->workers) < 0) {
+ PRINT_ERROR("server can't create poisx thread %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_debug_print("server mud listener", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_mud->debug);
+ }
+
+ return PROGRAM_OK;
+}
+
+// the worker thread, unblock, dissymmetric server processes the events
+int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit)
+{
+ int32_t epoll_nfds = epoll_wait(worker_unit->epfd, worker_unit->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("server epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = worker_unit->epevs + i;
+
+ if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) {
+ PRINT_ERROR("server epoll wait error %d! ", curr_epev->events);
+ return PROGRAM_FAULT;
+ }
+
+ if (curr_epev->events == EPOLLIN) {
+ struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr;
+
+ int32_t server_ans_ret = server_ans(server_handler, worker_unit->pktlen, worker_unit->api);
+ if (server_ans_ret == PROGRAM_FAULT) {
+ struct epoll_event ep_ev;
+ if (epoll_ctl(worker_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (server_ans_ret == PROGRAM_ABORT) {
+ if (close(server_handler->fd) < 0) {
+ PRINT_ERROR("server can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ server_debug_print("server mud worker", "close", worker_unit->ip, worker_unit->port, worker_unit->debug);
+ } else {
+ worker_unit->recv_bytes += worker_unit->pktlen;
+ server_debug_print("server mud worker", "receive", worker_unit->ip, worker_unit->port, worker_unit->debug);
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// the listener thread, unblock, dissymmetric server processes the events
+int32_t sermud_listener_proc_epevs(struct ServerMud *server_mud)
+{
+ int32_t epoll_nfds = epoll_wait(server_mud->epfd, server_mud->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("server epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = server_mud->epevs + i;
+
+ if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) {
+ PRINT_ERROR("server epoll wait error %d! ", curr_epev->events);
+ return PROGRAM_FAULT;
+ }
+
+ if (curr_epev->events == EPOLLIN) {
+ int32_t sermud_listener_accept_connects_ret = sermud_listener_accept_connects(server_mud);
+ if (sermud_listener_accept_connects_ret < 0) {
+ PRINT_ERROR("server try accept error %d! ", sermud_listener_accept_connects_ret);
+ return PROGRAM_FAULT;
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create the worker thread, unblock, dissymmetric server and run
+void *sermud_worker_create_and_run(void *arg)
+{
+ pthread_detach(pthread_self());
+
+ struct ServerMudWorker *worker_unit = (struct ServerMudWorker *)arg;
+
+ if (sermud_worker_create_epfd_and_reg(worker_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (sermud_worker_proc_epevs(worker_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+
+ close(worker_unit->worker.fd);
+ close(worker_unit->epfd);
+
+ return (void *)PROGRAM_OK;
+}
+
+// create the listener thread, unblock, dissymmetric server and run
+void *sermud_listener_create_and_run(void *arg)
+{
+ struct ServerMud *server_mud = (struct ServerMud *)arg;
+
+ if (create_socket_and_listen(&(server_mud->listener.fd), server_mud->ip, server_mud->port, server_mud->domain) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ if (sermud_listener_create_epfd_and_reg(server_mud) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (sermud_listener_proc_epevs(server_mud) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+ if (close(server_mud->listener.fd) < 0 || close(server_mud->epfd) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+
+ return (void *)PROGRAM_OK;
+}
+
+// create the multi thread, unblock, dissymmetric server and run
+int32_t sermud_create_and_run(struct ProgramParams *params)
+{
+ pthread_t *tid = (pthread_t *)malloc(sizeof(pthread_t));
+ struct ServerMud *server_mud = (struct ServerMud *)malloc(sizeof(struct ServerMud));
+
+ if (pthread_mutex_init(&server_debug_mutex, NULL) < 0) {
+ PRINT_ERROR("server can't init posix mutex %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_mud->listener.fd = -1;
+ server_mud->workers = NULL;
+ server_mud->epfd = -1;
+ server_mud->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event));
+ server_mud->curr_connect = 0;
+ server_mud->ip = inet_addr(params->ip);
+ server_mud->port = htons(params->port);
+ server_mud->pktlen = params->pktlen;
+ server_mud->domain = params->domain;
+ server_mud->api = params->api;
+ server_mud->debug = params->debug;
+
+ if (pthread_create(tid, NULL, sermud_listener_create_and_run, server_mud) < 0) {
+ PRINT_ERROR("server can't create poisx thread %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (server_mud->debug == false) {
+ printf("[program informations]: \n\n");
+ }
+ while (true) {
+ sermud_info_print(server_mud);
+ }
+
+ pthread_mutex_destroy(&server_debug_mutex);
+
+ return PROGRAM_OK;
+}
+
+// the multi thread, unblock, mutliplexing IO server prints informations
+void sermum_info_print(struct ServerMum *server_mum)
+{
+ if (server_mum->debug == false) {
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
+ uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000;
+
+ uint32_t curr_connect = 0;
+ double bytes_ps = 0;
+ uint64_t begin_recv_bytes = 0;
+ struct ServerMumUnit *begin_uint = server_mum->uints;
+ while (begin_uint != NULL) {
+ curr_connect += begin_uint->curr_connect;
+ begin_recv_bytes += begin_uint->recv_bytes;
+ begin_uint = begin_uint->next;
+ }
+
+ struct timeval delay;
+ delay.tv_sec = 0;
+ delay.tv_usec = TERMINAL_REFRESH_MS * 1000;
+ select(0, NULL, NULL, NULL, &delay);
+
+ uint64_t end_recv_bytes = 0;
+ struct ServerMumUnit *end_uint = server_mum->uints;
+ while (end_uint != NULL) {
+ end_recv_bytes += end_uint->recv_bytes;
+ end_uint = end_uint->next;
+ }
+
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000;
+
+ double bytes_sub = end_recv_bytes > begin_recv_bytes ? (double)(end_recv_bytes - begin_recv_bytes) : 0;
+ double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0;
+
+ bytes_ps = bytes_sub / time_sub;
+
+ if (bytes_ps < 1024) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f B/s", curr_connect, bytes_ps);
+ } else if (bytes_ps < (1024 * 1024)) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f KB/s", curr_connect, bytes_ps / 1024);
+ } else {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024));
+ }
+ }
+}
+
+// the single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors
+int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit)
+{
+ server_unit->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX);
+ if (server_unit->epfd < 0) {
+ PRINT_ERROR("server can't create epoll %d! ", server_unit->epfd);
+ return PROGRAM_FAULT;
+ }
+
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)&(server_unit->listener);
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, server_unit->listener.fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't control epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_debug_print("server mum unit", "waiting", server_unit->ip, server_unit->port, server_unit->debug);
+
+ return PROGRAM_OK;
+}
+
+// the single thread, unblock, mutliplexing IO server accepts the connections
+int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler)
+{
+ while (true) {
+ struct sockaddr_in accept_addr;
+ uint32_t sockaddr_in_len = sizeof(struct sockaddr_in);
+ int32_t accept_fd = accept(server_unit->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len);
+ if (accept_fd < 0) {
+ break;
+ }
+
+ if (set_socket_unblock(accept_fd) < 0) {
+ PRINT_ERROR("server can't set the connect socket to unblock! ");
+ return PROGRAM_FAULT;
+ }
+
+ struct ServerHandler *server_handler = (struct ServerHandler *)malloc(sizeof(struct ServerHandler));
+ server_handler->fd = accept_fd;
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)server_handler;
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, accept_fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't add socket '%d' to control epoll %d! ", accept_fd, errno);
+ return PROGRAM_FAULT;
+ }
+
+ ++server_unit->curr_connect;
+
+ server_debug_print("server mum unit", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_unit->debug);
+ }
+
+ return PROGRAM_OK;
+}
+
+// the single thread, unblock, mutliplexing IO server processes the events
+int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit)
+{
+ int32_t epoll_nfds = epoll_wait(server_unit->epfd, server_unit->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("server epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = server_unit->epevs + i;
+
+ if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) {
+ PRINT_ERROR("server epoll wait error %d! ", curr_epev->events);
+ return PROGRAM_FAULT;
+ }
+
+ if (curr_epev->events == EPOLLIN) {
+ if (curr_epev->data.ptr == (void *)&(server_unit->listener)) {
+ int32_t sersum_accept_connects_ret = sersum_accept_connects(server_unit, &(server_unit->listener));
+ if (sersum_accept_connects_ret < 0) {
+ PRINT_ERROR("server try accept error %d! ", sersum_accept_connects_ret);
+ return PROGRAM_FAULT;
+ }
+ continue;
+ } else {
+ struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr;
+ struct sockaddr_in connect_addr;
+ socklen_t connect_addr_len = sizeof(connect_addr);
+ if (getpeername(server_handler->fd, (struct sockaddr *)&connect_addr, &connect_addr_len) < 0) {
+ PRINT_ERROR("server can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ int32_t server_ans_ret = server_ans(server_handler, server_unit->pktlen, server_unit->api);
+ if (server_ans_ret == PROGRAM_FAULT) {
+ --server_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(server_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (server_ans_ret == PROGRAM_ABORT) {
+ --server_unit->curr_connect;
+ if (close(server_handler->fd) < 0) {
+ PRINT_ERROR("server can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ server_debug_print("server mum unit", "close", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug);
+ } else {
+ server_unit->recv_bytes += server_unit->pktlen;
+ server_debug_print("server mum unit", "receive", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug);
+ }
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create the single thread, unblock, mutliplexing IO server
+void *sersum_create_and_run(void *arg)
+{
+ struct ServerMumUnit *server_unit = (struct ServerMumUnit *)arg;
+
+ if (create_socket_and_listen(&(server_unit->listener.fd), server_unit->ip, server_unit->port, server_unit->domain) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ if (sersum_create_epfd_and_reg(server_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (sersum_proc_epevs(server_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+
+ close(server_unit->listener.fd);
+ close(server_unit->epfd);
+
+ return (void *)PROGRAM_OK;
+}
+
+// create the multi thread, unblock, mutliplexing IO server
+int32_t sermum_create_and_run(struct ProgramParams *params)
+{
+ const uint32_t thread_num = params->thread_num;
+ pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t));
+ struct ServerMum *server_mum = (struct ServerMum *)malloc(sizeof(struct ServerMum));
+ struct ServerMumUnit *server_unit = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit));
+
+ if (pthread_mutex_init(&server_debug_mutex, NULL) < 0) {
+ PRINT_ERROR("server can't init posix mutex %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_mum->uints = server_unit;
+ server_mum->debug = params->debug;
+
+ for (uint32_t i = 0; i < thread_num; ++i) {
+ server_unit->listener.fd = -1;
+ server_unit->epfd = -1;
+ server_unit->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event));
+ server_unit->curr_connect = 0;
+ server_unit->recv_bytes = 0;
+ server_unit->ip = inet_addr(params->ip);
+ server_unit->port = htons(params->port);
+ server_unit->pktlen = params->pktlen;
+ server_unit->domain = params->domain;
+ server_unit->api = params->api;
+ server_unit->debug = params->debug;
+ server_unit->next = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit));
+
+ if (pthread_create((tids + i), NULL, sersum_create_and_run, server_unit) < 0) {
+ PRINT_ERROR("server can't create poisx thread %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ server_unit = server_unit->next;
+ }
+
+ if (server_mum->debug == false) {
+ printf("[program informations]: \n\n");
+ }
+ while (true) {
+ sermum_info_print(server_mum);
+ }
+
+ pthread_mutex_destroy(&server_debug_mutex);
+
+ return PROGRAM_OK;
+}
+
+// create server and run
+int32_t server_create_and_run(struct ProgramParams *params)
+{
+ int32_t ret = PROGRAM_OK;
+
+ if (strcmp(params->model, "mum") == 0) {
+ ret = sermum_create_and_run(params);
+ } else {
+ ret = sermud_create_and_run(params);
+ }
+
+ return ret;
+}
diff --git a/examples/src/utilities.c b/examples/src/utilities.c
new file mode 100644
index 0000000..b6ed269
--- /dev/null
+++ b/examples/src/utilities.c
@@ -0,0 +1,128 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "utilities.h"
+
+
+// create the socket and listen
+int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain)
+{
+ if (strcmp(domain, "posix") == 0) {
+ *socket_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ } else {
+ *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+
+ int32_t port_multi = 1;
+ if (setsockopt(*socket_fd, SOL_SOCKET, SO_REUSEPORT, (void *)&port_multi, sizeof(int32_t)) < 0) {
+ PRINT_ERROR("can't set the option of socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (set_socket_unblock(*socket_fd) < 0) {
+ PRINT_ERROR("can't set the socket to unblock! ");
+ return PROGRAM_FAULT;
+ }
+
+ if (strcmp(domain, "posix") == 0) {
+ struct sockaddr_in socket_addr;
+ memset_s(&socket_addr, sizeof(socket_addr), 0, sizeof(socket_addr));
+ socket_addr.sin_family = AF_INET;
+ socket_addr.sin_addr.s_addr = ip;
+ socket_addr.sin_port = port;
+ if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_in)) < 0) {
+ PRINT_ERROR("can't bind the address to socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) {
+ PRINT_ERROR("server socket can't lisiten %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ } else {
+ struct sockaddr_un socket_addr;
+ unlink(SOCKET_UNIX_DOMAIN_FILE);
+ socket_addr.sun_family = AF_UNIX;
+ strcpy_s(socket_addr.sun_path, sizeof(socket_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE);
+ if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_un)) < 0) {
+ PRINT_ERROR("can't bind the address to socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) {
+ PRINT_ERROR("server socket can't lisiten %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create the socket and connect
+int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain)
+{
+ if (strcmp(domain, "posix") == 0) {
+ *socket_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("client can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ struct sockaddr_in server_addr;
+ memset_s(&server_addr, sizeof(server_addr), 0, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_addr.s_addr = ip;
+ server_addr.sin_port = port;
+ if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in)) < 0) {
+ if (errno == EINPROGRESS) {
+ return PROGRAM_INPROGRESS;
+ } else {
+ PRINT_ERROR("client can't connect to the server %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+ } else {
+ *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("client can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ struct sockaddr_un server_addr;
+ server_addr.sun_family = AF_UNIX;
+ strcpy_s(server_addr.sun_path, sizeof(server_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE);
+ if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_un)) < 0) {
+ if (errno == EINPROGRESS) {
+ return PROGRAM_INPROGRESS;
+ } else {
+ PRINT_ERROR("client can't connect to the server %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+ }
+ return PROGRAM_OK;
+}
+
+// set the socket to unblock
+int32_t set_socket_unblock(int32_t socket_fd)
+{
+ return fcntl(socket_fd, F_SETFL, fcntl(socket_fd, F_GETFD, 0) | O_NONBLOCK);
+}
--
2.23.0