From 814b66143605ad409be0f8aace468386f4fd891e Mon Sep 17 00:00:00 2001 From: wu-changsheng Date: Mon, 5 Sep 2022 15:39:12 +0800 Subject: [PATCH] sync examples code --- examples/CMakeLists.txt | 2 +- examples/README.md | 178 +++++++++++- examples/inc/bussiness.h | 122 +++++++++ examples/inc/client.h | 121 ++++++++ examples/inc/parameter.h | 11 +- examples/inc/server.h | 231 ++++++++++++++++ examples/inc/utilities.h | 82 +++++- examples/main.c | 8 + examples/src/bussiness.c | 234 ++++++++++++++++ examples/src/client.c | 387 ++++++++++++++++++++++++++ examples/src/parameter.c | 214 ++++++++------- examples/src/server.c | 578 +++++++++++++++++++++++++++++++++++++++ examples/src/utilities.c | 128 +++++++++ 13 files changed, 2185 insertions(+), 111 deletions(-) create mode 100644 examples/inc/bussiness.h create mode 100644 examples/inc/client.h create mode 100644 examples/inc/server.h create mode 100644 examples/src/bussiness.c create mode 100644 examples/src/client.c create mode 100644 examples/src/server.c create mode 100644 examples/src/utilities.c diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index b1c2b07..c38e6cb 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -18,7 +18,7 @@ project(${PROJECT_NAME}) message(STATUS "PROJECT_SOURCE_DIR: " ${PROJECT_SOURCE_DIR}) message(STATUS "PROJECT_BINARY_DIR: " ${PROJECT_BINARY_DIR}) -set(CMAKE_C_FLAGS "-O2 -g -fstack-protector-strong -Wall -Werror -pthread") +set(CMAKE_C_FLAGS "-O2 -g -fstack-protector-strong -Wall -Werror -pthread -lboundscheck") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D EXAMPLE_COMPILE") include_directories(${PROJECT_SOURCE_DIR}/inc) diff --git a/examples/README.md b/examples/README.md index 6f82bb2..4e165a4 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,4 +1,6 @@ -# gazzle 示例程序 +# gazzlle 示例程序 + +## 功能 * 支持 TCP 、 unix 非阻塞通讯。 * 支持多线程网络 IO 复用模型,线程之间相互独立。TCP 的 `listen` 、`epoll` 、`read` 、`write` 、`connect` 等接口都在同一线程内。`connect` 连接数可配。 @@ -9,9 +11,93 @@ ## 网络模型 * **单线程非阻塞**:采用同步非阻塞 IO 模型,在单线程中采用非阻塞的方式监听并发起 IO 请求,当内核中数据到达后读取数据、执行业务逻辑并发送。 + +``` + 单线程非阻塞模型 + | + 创建套接字并监听 + | <-------+ + 读取数据 | + | | + 业务逻辑 | + | | + 发送数据 | + | | + +---------+ +``` + * **多线程非阻塞IO复用**:基于 `epoll` 实现多线程非阻塞 IO 模型。每个线程之间互不干涉。通过 `epoll` 监控多个当前线程负责的 fd ,当任何一个数据状态准备就绪时,返回并执行读写操作和对应的业务逻辑。 + +``` + 多线程非阻塞IO复用模型 + | + 创建套接字并监听 + | + 创建若干个线程 + | + +------------+------------+------------+------------+ + | | | | | + 创建套接字并监听 ... ... 创建套接字并监听 ... + | | + 线程内部初始化 线程内部初始化 + epoll,并注册 epoll,并注册 + 套接字监听事件 套接字监听事件 + | <---------+ | <----------+ + +-----+-----+ | +-----+-----+ | + | | | | | | + (新连接) (新报文) | (新连接) (新报文) | + 建连并注 读取数据 | 建连并注 读取数据 | + 册新连接 业务逻辑 | 册新连接 业务逻辑 | + 监听事件 发送数据 | 监听事件 发送数据 | + | | | | | | + +-----+-----+ | +-----+-----+ | + | | | | + +-----------+ +-----------+ +``` + * **多线程非阻塞非对称**:采用基于 `epoll` 的单线程多路 IO 复用监听连接事件,并采用多线程的方式完成后续读写监听业务。 server 在启动监听之前,开辟一定数量的线程,用线程池管理。主线程创建监听 `fd` 之后,采用多路 IO 复用机制 (`epoll`) 进行 IO 状态监控。当监听到客户端的连接请求时,建立连接并将相关 `fd` 分发给线程池的某个线程进行监听。线程池中的每个线程都采用多路 IO 复用机制 (`epoll`) ,用来监听主线程中建立成功并分发下来的 `socket` 。 +``` + 多线程非阻塞非对称模型 +------------------------+ + | | | + 创建监听线程 | +-------------+--- ... -----+ + | | | | | + 创建套接字,初始化 | 初始化epoll ... ... 初始化epoll + eopll并且并注册套 | 并注册事件 并注册事件 + 接字监听事件 | | <-- + | <-- + + | | 读取数据 | 读取数据 | + 当有新连接时,创建工作线程 | 业务逻辑 | 业务逻辑 | + | | 发送数据 | 发送数据 | + +----------------+ | | | | + +-----+ +-----+ +``` + +* **客户端**:创建若干线程,每个线程创建若干 `socket` 与客户端建立连接,并使用 `epoll` 进行状态监控,建连后向服务端发送数据并等待服务端数据传回,当接受到服务端传回数据后进行校验,校验无误再次发送数据。 + +``` + 多线程非阻塞IO复用模型 + | + 创建若干个线程 + +------------+------------+------------+------------+ + | | | | | + 线程内部初始化 线程内部初始化 + epoll ... ... epoll ... + | | + 依次创建套接字, 依次创建套接字, + 建连并注册事件 建连并注册事件 + | <---------+ | <---------+ + 发送数据 | 发送数据 | + 接收数据并校验 | 接收数据并校验 | + | | | | + +------------+ | +------------+ | + | | | | | | + 成功 失败 | 成功 失败 | + | | | | | | + 发送数据 终止 | 发送数据 终止 | + | | | | + +-----------------+ +-----------------+ +``` + ## 程序接口 * `-a, --as [server | client]`:作为服务端还是客户端。 @@ -24,21 +110,103 @@ * `mud (multi thread, unblock, dissymmetric)`:多线程非阻塞非对称。 * `-t, --threadnum`:线程数设置。 * `-c, --connectnum`:连接数设置。 -* `-A, --api [unix | posix]`:内部实现的接口类型。 - * `unix`:基于 unix 接口实现。 - * `posix`:基于 posix 接口实现。 +* `-D, --domain [unix | posix]`:通信协议。 + * `unix`:基于 unix 协议实现。 + * `posix`:基于 posix 协议实现。 +* `-A, --api [readwrite | recvsend | recvsendmsg]`:内部实现的接口类型。 + * `readwrite` :使用 `read` 和 `write` 接口。 + * `recvsend` :使用 `recv` 和 `send` 接口。 + * `recvsendmsg` :使用 `recvmsg` 和 `sendmsg` 接口。 * `-P, --pktlen [xxxx]`:报文长度配置。 * `-v, --verify`:是否校验报文。 * `-r, --ringpmd`:是否基于dpdk ring PMD 收发环回。 +* `-d, --debug`:是否打印调试信息。 * `-h, --help`:获得帮助信息。 ## 使用 + * **环境配置** + * 参考 https://gitee.com/openeuler/libboundscheck 。 + + * **编译** + ``` cd build mkdir examples cd examples cmake ../../examples make -./examples --help ``` + + * **查看帮助信息** + + ``` + ./examples --help + + -a, --as [server | client]: set programas server or client. + server: as server. + client: as client. +-i, --ip [???.???.???.???]: set ip address. +-p, --port [????]: set port number in range of 1024 - 65535. +-m, --model [mum | mud]: set the network model. + mum: multi thread, unblock, multiplexing IO network model. + mud: multi thread, unblock, dissymmetric network model. +-t, --threadnum [???]: set thread number in range of 1 - 1000. +-c, --connectnum [???]: set connection number of each thread. +-D, --domain [unix | posix]: set domain type is server or client. + unix: use unix's api. + posix: use posix api. +-A, --api [readwrite | recvsend | recvsendmsg]: set api type is server or client. + readwrite: use `read` and `write`. + recvsend: use `recv and `send`. + recvsendmsg: use `recvmsg` and `sendmsg`. +-P, --pktlen [????]: set packet length in range of 2 - 10485760. +-v, --verify: set to verifying the message packet. +-r, --ringpmd: set to use ringpmd. +-d, --debug: set to print the debug information. +-h, --help: see helps. + ``` + + * 创建服务端 + +``` +./example --as server --verify + +[program parameters]: +--> [as]: server +--> [server ip]: 127.0.0.1 +--> [server port]: 5050 +--> [model]: mum +--> [thread number]: 8 +--> [domain]: posix +--> [api]: read & write +--> [packet length]: 1024 +--> [verify]: on +--> [ringpmd]: off +--> [debug]: off + +[program informations]: +--> : [connect num]: 0, [receive]: 0.000 B/s +``` + + * 创建客户端 + +``` +./example --as client --verify + +[program parameters]: +--> [as]: client +--> [server ip]: 127.0.0.1 +--> [server port]: 5050 +--> [thread number]: 8 +--> [connection number]: 10 +--> [domain]: posix +--> [api]: read & write +--> [packet length]: 1024 +--> [verify]: on +--> [ringpmd]: off +--> [debug]: off + +[program informations]: +--> : [connect num]: 80, [send]: 357.959 MB/s +``` \ No newline at end of file diff --git a/examples/inc/bussiness.h b/examples/inc/bussiness.h new file mode 100644 index 0000000..f16d771 --- /dev/null +++ b/examples/inc/bussiness.h @@ -0,0 +1,122 @@ +/* +* Copyright (c) 2022-2023. yyangoO. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +#ifndef __EXAMPLES_BUSSINESS_H__ +#define __EXAMPLES_BUSSINESS_H__ + + +#include "utilities.h" +#include "parameter.h" + + +#define BUSSINESS_MESSAGE_SIZE 26 ///< the size of business message + + +/** + * @brief server handler + * The server handler. + */ +struct ServerHandler +{ + int32_t fd; ///< socket file descriptor +}; + +/** + * @brief client handler + * The client handler. + */ +struct ClientHandler +{ + int32_t fd; ///< socket file descriptor + uint32_t msg_idx; ///< the start charactors index of message +}; + + +/** + * @brief read by specify api + * This function processes the reading by specify api. + * @param fd the file descriptor + * @param buffer_in the input buffer + * @param length the length + * @param api the type of api + * @return the result + */ + int32_t read_api(int32_t fd, char *buffer_in, const uint32_t length, const char *api); + +/** + * @brief write by specify api + * This function processes the writing by specify api. + * @param fd the file descriptor + * @param buffer_out the output buffer + * @param length the length + * @param api the type of api + * @return the result + */ + int32_t write_api(int32_t fd, char *buffer_out, const uint32_t length, const char *api); + +/** + * @brief the business processsing of server + * This function processes the business of server. + * @param out the output string + * @param in the input string + * @param size the size of input and output + * @param verify if we verify or not + * @return the result + */ +void server_bussiness(char *out, const char *in, uint32_t size); + +/** + * @brief the business processsing of client + * This function processes the business of client. + * @param out the output string + * @param in the input string + * @param size the size of input and output + * @param verify if we verify or not + * @param msg_idx the start charactors index of message + * @return the result + */ +int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, uint32_t *msg_idx); + +/** + * @brief server checks the information and answers + * This function checks the information and answers. + * @param server_handler server handler + * @param pktlen the length of package + * @param api the api + * @return the result + */ +int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api); + +/** + * @brief client asks server + * This function asks server. + * @param client_handler client handler + * @param pktlen the length of package + * @param api the api + * @return the result + */ +int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api); + +/** + * @brief client checks the information and answers + * This function checks the information and answers. + * @param client_handler client handler + * @param pktlen the length of package + * @param verify verify or not + * @param api the api + * @return the result + */ +int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api); + + +#endif // __EXAMPLES_BUSSINESS_H__ diff --git a/examples/inc/client.h b/examples/inc/client.h new file mode 100644 index 0000000..d3ae017 --- /dev/null +++ b/examples/inc/client.h @@ -0,0 +1,121 @@ +/* +* Copyright (c) 2022-2023. yyangoO. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +#ifndef __EXAMPLES_CLIENT_H__ +#define __EXAMPLES_CLIENT_H__ + + +#include "utilities.h" +#include "parameter.h" +#include "bussiness.h" + + +/** + * @brief client unit + * The information of each thread of client. + */ +struct ClientUnit +{ + struct ClientHandler *handlers; ///< the handlers + int32_t epfd; ///< the connect epoll file descriptor + struct epoll_event *epevs; ///< the epoll events + uint32_t curr_connect; ///< current connection number + uint64_t send_bytes; ///< total send bytes + in_addr_t ip; ///< server ip + uint16_t port; ///< server port + uint32_t connect_num; ///< total connection number + uint32_t pktlen; ///< the length of peckage + bool verify; ///< if we verify or not + char* domain; ///< the communication domain + char* api; ///< the type of api + bool debug; ///< if we print the debug information + struct ClientUnit *next; ///< next pointer +}; + +/** + * @brief client + * The information of client. + */ +struct Client +{ + struct ClientUnit *uints; ///< the server mum unit + bool debug; ///< if we print the debug information +}; + + +/** + * @brief the single thread, client prints informations + * The single thread, client prints informations. + * @param ch_str the charactor string + * @param act_str the action string + * @param ip the ip address + * @param port the port + * @param debug if debug or not + * @return the result pointer + */ +void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug); + +/** + * @brief the client prints informations + * The client prints informations. + * @param client the client information + */ +void client_info_print(struct Client *client); + +/** + * @brief the single thread, client try to connect to server, register to epoll + * The single thread, client try to connect to server, register to epoll. + * @param client_handler the client handler + * @param epoll_fd the epoll file descriptor + * @param ip ip address + * @param port port + * @param domain domain + * @return the result pointer + */ +int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, uint16_t port, const char *api); + +/** + * @brief the single thread, client retry to connect to server, register to epoll + * The single thread, client retry to connect to server, register to epoll. + * @param client_unit the client unit + * @param client_handler the client handler + * @return the result pointer + */ +int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct ClientHandler *client_handler); + +/** + * @brief the single thread, client connects and gets epoll feature descriptors + * The single thread, client connects and gets epoll feature descriptors. + * @param client_unit the client unit + * @return the result pointer + */ +int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit); + +/** + * @brief create client of single thread and run + * This function creates client of single thread and run. + * @param arg each thread's information of server + * @return the result pointer + */ +void *client_s_create_and_run(void *arg); + +/** + * @brief create client and run + * This function create the client and run. + * @param params the parameters pointer + * @return the result + */ +int32_t client_create_and_run(struct ProgramParams *params); + + +#endif // __EXAMPLES_CLIENT_H__ diff --git a/examples/inc/parameter.h b/examples/inc/parameter.h index d25a13a..ee8fe4e 100644 --- a/examples/inc/parameter.h +++ b/examples/inc/parameter.h @@ -24,9 +24,11 @@ #define PARAM_DEFAULT_MODEL ("mum") ///< default model type #define PARAM_DEFAULT_CONNECT_NUM (10) ///< default connection number #define PARAM_DEFAULT_THREAD_NUM (8) ///< default thread number -#define PARAM_DEFAULT_API ("posix") ///< default API type +#define PARAM_DEFAULT_DOMAIN ("posix") ///< default communication domain +#define PARAM_DEFAULT_API ("readwrite") ///< default API type #define PARAM_DEFAULT_PKTLEN (1024) ///< default packet length of message #define PARAM_DEFAULT_VERIFY (false) ///< default flag of message verifying +#define PARAM_DEFAULT_DEBUG (false) ///< default flag of debug #define PARAM_DEFAULT_RINGPMD (false) ///< default flag of ring PMD of dpdk enum { @@ -42,6 +44,8 @@ enum { PARAM_NUM_CONNECT_NUM = 'c', #define PARAM_NAME_THREAD_NUM ("threadnum") ///< name of parameter thread number PARAM_NUM_THREAD_NUM = 't', +#define PARAM_NAME_DOMAIN ("domain") ///< name of parameter domain + PARAM_NUM_DOMAIN = 'D', #define PARAM_NAME_API ("api") ///< name of parameter API type PARAM_NUM_API = 'A', #define PARAM_NAME_PKTLEN ("pktlen") ///< name of parameter packet length of message @@ -50,6 +54,8 @@ enum { PARAM_NUM_VERIFY = 'v', #define PARAM_NAME_RINGPMD ("ringpmd") ///< name of parameter flag of ring PMD of dpdk PARAM_NUM_RINGPMD = 'r', +#define PARAM_NAME_DEBUG ("debug") ///< name of parameter flag of debug + PARAM_NUM_DEBUG = 'd', #define PARAM_NAME_HELP ("help") ///< name of parameter help PARAM_NUM_HELP = 'h', }; @@ -81,13 +87,14 @@ struct ProgramParams { char* model; ///< model type uint32_t thread_num; ///< the number of threads uint32_t connect_num; ///< the connection number + char* domain; ///< the communication dimain char* api; ///< the type of api uint32_t pktlen; ///< the packet length bool verify; ///< if we verify the message or not + bool debug; ///< if we print the debug information or not bool ringpmd; ///< if we use ring PMD or not }; - /** * @brief initialize the parameters * This function initializes the parameters of main function. diff --git a/examples/inc/server.h b/examples/inc/server.h new file mode 100644 index 0000000..fa9096b --- /dev/null +++ b/examples/inc/server.h @@ -0,0 +1,231 @@ +/* +* Copyright (c) 2022-2023. yyangoO. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +#ifndef __EXAMPLES_SERVER_H__ +#define __EXAMPLES_SERVER_H__ + + +#include "utilities.h" +#include "parameter.h" +#include "bussiness.h" + + +/** + * @brief server unit of model mum + * The information of each thread of server of model mum. + */ +struct ServerMumUnit +{ + struct ServerHandler listener; ///< the listen handler + int32_t epfd; ///< the listen epoll file descriptor + struct epoll_event *epevs; ///< the epoll events + uint32_t curr_connect; ///< current connection number + uint64_t recv_bytes; ///< total receive bytes + in_addr_t ip; ///< server ip + uint16_t port; ///< server port + uint32_t pktlen; ///< the length of peckage + char* domain; ///< communication domain + char* api; ///< the type of api + bool debug; ///< if we print the debug information + struct ServerMumUnit *next; ///< next pointer +}; + +/** + * @brief server model mum + * The information of server model mum. + */ +struct ServerMum +{ + struct ServerMumUnit *uints; ///< the server mum unit + bool debug; ///< if we print the debug information +}; + +/** + * @brief server unit of model mud worker unit + * The information of worker unit of server of model mud. + */ +struct ServerMudWorker +{ + struct ServerHandler worker; ///< the worker handler + int32_t epfd; ///< the worker epoll file descriptor + struct epoll_event *epevs; ///< the epoll events + uint64_t recv_bytes; ///< total receive bytes + uint32_t pktlen; ///< the length of peckage + in_addr_t ip; ///< client ip + uint16_t port; ///< client port + char* api; ///< the type of api + bool debug; ///< if we print the debug information + struct ServerMudWorker *next; ///< next pointer +}; + +/** + * @brief server model mud + * The information of server model mud. + */ +struct ServerMud +{ + struct ServerHandler listener; ///< the listen handler + struct ServerMudWorker *workers; ///< the workers + int32_t epfd; ///< the listen epoll file descriptor + struct epoll_event *epevs; ///< the epoll events + uint32_t curr_connect; ///< current connection number + in_addr_t ip; ///< server ip + uint16_t port; ///< server port + uint32_t pktlen; ///< the length of peckage + char* domain; ///< communication domain + char* api; ///< the type of api + bool debug; ///< if we print the debug information +}; + + +/** + * @brief the worker thread, unblock, dissymmetric server prints debug informations + * The worker thread, unblock, dissymmetric server prints debug informations. + * @param ch_str the charactor string + * @param act_str the action string + * @param ip the ip address + * @param port the port + * @param debug if debug or not + * @return the result pointer + */ +void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug); + +/** + * @brief the multi thread, unblock, dissymmetric server prints informations + * The multi thread, unblock, dissymmetric server prints informations. + * @param server_mud the server information + */ +void sermud_info_print(struct ServerMud *server_mud); + +/** + * @brief the worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors + * The worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors. + * @param worker_unit the server worker + * @return the result pointer + */ +int32_t sermud_worker_create_epfd_and_reg(struct ServerMudWorker *worker_unit); + +/** + * @brief the listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors + * The listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors. + * @param server_mud the server unit + * @return the result pointer + */ +int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud); + +/** + * @brief the listener thread, unblock, dissymmetric server accepts the connections + * The listener thread, unblock, dissymmetric server accepts the connections. + * @param server_mud the server unit + * @return the result pointer + */ +int32_t sermud_listener_accept_connects(struct ServerMud *server_mud); + +/** + * @brief the worker thread, unblock, dissymmetric server processes the events + * The worker thread, unblock, dissymmetric server processes the events. + * @param worker_unit the server worker + * @return the result pointer + */ +int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit); + +/** + * @brief the listener thread, unblock, dissymmetric server processes the events + * The listener thread, unblock, dissymmetric server processes the events. + * @param server_mud the server unit + * @return the result pointer + */ +int32_t sermud_listener_proc_epevs(struct ServerMud *server_mud); + +/** + * @brief create the worker thread, unblock, dissymmetric server and run + * This function creates the worker thread, unblock, dissymmetric server and run. + * @param arg each thread's information of server + * @return the result pointer + */ +void *sermud_worker_create_and_run(void *arg); + +/** + * @brief create the listener thread, unblock, dissymmetric server and run + * This function creates the listener thread, unblock, dissymmetric server and run. + * @param arg each thread's information of server + * @return the result pointer + */ +void *sermud_listener_create_and_run(void *arg); + +/** + * @brief create the multi thread, unblock, dissymmetric server and run + * This function creates the multi thread, unblock, dissymmetric server and run. + * @param params the parameters pointer + * @return the result + */ +int32_t sermud_create_and_run(struct ProgramParams *params); + +/** + * @brief the multi thread, unblock, mutliplexing IO server prints informations + * The multi thread, unblock, mutliplexing IO server prints informations. + * @param server_mum the server information + */ +void sermum_info_print(struct ServerMum *server_mum); + +/** + * @brief the single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors + * The single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors. + * @param server_unit the server unit + * @return the result pointer + */ +int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit); + +/** + * @brief the single thread, unblock, mutliplexing IO server accepts the connections + * The single thread, unblock, mutliplexing IO server accepts the connections. + * @param server_unit the server unit + * @param server_handler the server handler + * @return the result pointer + */ +int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler); + +/** + * @brief the single thread, unblock, mutliplexing IO server processes the events + * The single thread, unblock, mutliplexing IO server processes the events. + * @param server_unit the server unit + * @return the result pointer + */ +int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit); + +/** + * @brief create the single thread, unblock, mutliplexing IO server + * This function creates the single thread, unblock, mutliplexing IO server. + * @param arg each thread's information of server + * @return the result pointer + */ +void *sersum_create_and_run(void *arg); + +/** + * @brief create the multi thread, unblock, mutliplexing IO server and run + * This function creates the multi thread, unblock, mutliplexing IO server and run. + * @param params the parameters pointer + * @return the result + */ +int32_t sermum_create_and_run(struct ProgramParams *params); + +/** + * @brief create server and run + * This function create the specify server and run. + * @param params the parameters pointer + * @return the result + */ +int32_t server_create_and_run(struct ProgramParams *params); + + +#endif // __EXAMPLES_SERVER_H__ diff --git a/examples/inc/utilities.h b/examples/inc/utilities.h index f9064c5..a684d35 100644 --- a/examples/inc/utilities.h +++ b/examples/inc/utilities.h @@ -24,16 +24,23 @@ #include #include +#include #include #include #include #include #include +#include +#include +#include #include #include +#include "securec.h" +#include "securectype.h" + #define PRINT_ERROR(format, ...) do \ { \ @@ -59,19 +66,76 @@ printf(format, ##__VA_ARGS__); \ printf("\n"); \ } while(0) +#define PRINT_SERVER_DATAFLOW(format, ...) do \ + { \ + printf("\033[?25l\033[A\033[K"); \ + printf("--> : "); \ + printf(format, ##__VA_ARGS__); \ + printf("\033[?25h\n"); \ + } while(0) +#define PRINT_CLIENT_DATAFLOW(format, ...) do \ + { \ + printf("\033[?25l\033[A\033[K"); \ + printf("--> : "); \ + printf(format, ##__VA_ARGS__); \ + printf("\033[?25h\n"); \ + } while(0) #define LIMIT_VAL_RANGE(val, min, max) ((val) < (min) ? (min) : ((val) > (max) ? (max) : (val))) #define CHECK_VAL_RANGE(val, min, max) ((val) < (min) ? (false) : ((val) > (max) ? (false) : (true))) -#define PROGRAM_OK (0) ///< program ok flag -#define PROGRAM_ABORT (1) ///< program abort flag -#define PROGRAM_FAULT (-1) ///< program fault flag +#define PROGRAM_OK (0) ///< program ok flag +#define PROGRAM_ABORT (1) ///< program abort flag +#define PROGRAM_FAULT (-1) ///< program fault flag +#define PROGRAM_INPROGRESS (-2) ///< program in progress flag + +#define UNIX_TCP_PORT_MIN (1024) ///< TCP minimum port number in unix +#define UNIX_TCP_PORT_MAX (65535) ///< TCP minimum port number in unix +#define THREAD_NUM_MIN (1) ///< minimum number of thead +#define THREAD_NUM_MAX (1000) ///< maximum number of thead +#define MESSAGE_PKTLEN_MIN (2) ///< minimum length of message (1 byte) +#define MESSAGE_PKTLEN_MAX (1024 * 1024 * 10) ///< maximum length of message (10 Mb) + +#define SERVER_SOCKET_LISTEN_BACKLOG (128) ///< the queue of socket +#define SERVER_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll +#define SERVER_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll + +#define CLIENT_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll +#define CLIENT_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll + +#define TERMINAL_REFRESH_MS (100) ///< the time cut off between of terminal refresh + +#define SOCKET_UNIX_DOMAIN_FILE "unix_domain_file" ///< socket unix domain file + + +/** + * @brief create the socket and listen + * Thi function creates the socket and listen. + * @param socket_fd the socket file descriptor + * @param ip ip address + * @param port port number + * @param domain domain + * @return the result + */ +int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain); + +/** + * @brief create the socket and connect + * Thi function creates the socket and connect. + * @param socket_fd the socket file descriptor + * @param ip ip address + * @param port port number + * @param domain domain + * @return the result + */ +int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain); -#define UNIX_TCP_PORT_MIN (1024) ///< TCP minimum port number in unix -#define UNIX_TCP_PORT_MAX (65535) ///< TCP minimum port number in unix -#define THREAD_NUM_MIN (1) ///< minimum number of thead -#define THREAD_NUM_MAX (1000) ///< maximum number of thead -#define MESSAGE_PKTLEN_MIN (1) ///< minimum length of message (1 byte) -#define MESSAGE_PKTLEN_MAX (10485760) ///< maximum length of message (10 Mb) +/** + * @brief set the socket to unblock + * Thi function sets the socket to unblock. + * @param socket_fd the socket file descriptor + * @return the result + */ +int32_t set_socket_unblock(int32_t socket_fd); #endif // __EXAMPLES_UTILITIES_H__ diff --git a/examples/main.c b/examples/main.c index f050dc5..5338572 100644 --- a/examples/main.c +++ b/examples/main.c @@ -12,6 +12,8 @@ #include "utilities.h" #include "parameter.h" +#include "server.h" +#include "client.h" static struct ProgramParams prog_params; @@ -27,5 +29,11 @@ int32_t main(int argc, char *argv[]) } program_params_print(&prog_params); + if (strcmp(prog_params.as, "server") == 0) { + server_create_and_run(&prog_params); + } else { + client_create_and_run(&prog_params); + } + return ret; } diff --git a/examples/src/bussiness.c b/examples/src/bussiness.c new file mode 100644 index 0000000..f55a37b --- /dev/null +++ b/examples/src/bussiness.c @@ -0,0 +1,234 @@ +/* +* Copyright (c) 2022-2023. yyangoO. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +#include "bussiness.h" + + +static const char bussiness_messages_low[] = "abcdefghijklmnopqrstuvwxyz"; // the lower charactors of business message +static const char bussiness_messages_cap[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; // the capital charactors of business message + + +// read by specify api + int32_t read_api(int32_t fd, char *buffer_in, const uint32_t length, const char *api) + { + if (strcmp(api, "readwrite") == 0) { + return read(fd, buffer_in, length); + } else if (strcmp(api, "recvsend") == 0) { + return recv(fd, buffer_in, length, 0); + } else { + struct msghdr msg_recv; + struct iovec iov; + + msg_recv.msg_name = NULL; + msg_recv.msg_namelen = 0; + msg_recv.msg_iov = &iov; + msg_recv.msg_iovlen = 1; + msg_recv.msg_iov->iov_base = buffer_in; + msg_recv.msg_iov->iov_len = length; + msg_recv.msg_control = 0; + msg_recv.msg_controllen = 0; + msg_recv.msg_flags = 0; + + return recvmsg(fd, &msg_recv, 0); + } + } + +// write by specify api + int32_t write_api(int32_t fd, char *buffer_out, const uint32_t length, const char *api) + { + if (strcmp(api, "readwrite") == 0) { + return write(fd, buffer_out, length); + } else if (strcmp(api, "recvsend") == 0) { + return send(fd, buffer_out, length, 0); + } else { + struct msghdr msg_send; + struct iovec iov; + + msg_send.msg_name = NULL; + msg_send.msg_namelen = 0; + msg_send.msg_iov = &iov; + msg_send.msg_iovlen = 1; + msg_send.msg_iov->iov_base = buffer_out; + msg_send.msg_iov->iov_len = length; + msg_send.msg_control = 0; + msg_send.msg_controllen = 0; + msg_send.msg_flags = 0; + + return sendmsg(fd, &msg_send, 0); + } + } + +// the business processsing of server +void server_bussiness(char *out, const char *in, uint32_t size) +{ + char diff = 'a' - 'A'; + for (uint32_t i = 0; i < size; ++i) { + if (in[i] != '\0') { + out[i] = in[i] - diff; + } else { + out[i] = '\0'; + } + } +} + +// the business processsing of client +int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, uint32_t *msg_idx) +{ + if (verify == false) { + for (uint32_t i = 0; i < (size - 1); ++i) { + out[i] = bussiness_messages_low[(*msg_idx + i) % BUSSINESS_MESSAGE_SIZE]; + } + } else { + uint32_t verify_start_idx = (*msg_idx == 0) ? (BUSSINESS_MESSAGE_SIZE - 1) : (*msg_idx - 1); + for (uint32_t i = 0; i < (size - 1); ++i) { + if (in[i] != bussiness_messages_cap[(verify_start_idx + i) % BUSSINESS_MESSAGE_SIZE]) { + return PROGRAM_FAULT; + } + out[i] = bussiness_messages_low[(*msg_idx + i) % BUSSINESS_MESSAGE_SIZE]; + } + } + out[size - 1] = '\0'; + + ++(*msg_idx); + *msg_idx = (*msg_idx) % BUSSINESS_MESSAGE_SIZE; + + return PROGRAM_OK; +} + +// server answers +int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api) +{ + const uint32_t length = pktlen; + char *buffer_in = (char *)malloc(length * sizeof(char)); + char *buffer_out = (char *)malloc(length * sizeof(char)); + + int32_t cread = 0; + int32_t sread = length; + while (cread < sread) { + int32_t nread = read_api(server_handler->fd, buffer_in, length, api); + if (nread == 0) { + return PROGRAM_ABORT; + } else if (nread < 0) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + return PROGRAM_FAULT; + } + } else { + cread += nread; + continue; + } + } + + server_bussiness(buffer_out, buffer_in, length); + + int32_t cwrite = 0; + int32_t swrite = length; + while (cwrite < swrite) { + int32_t nwrite = write_api(server_handler->fd, buffer_out, length, api); + if (nwrite == 0) { + return PROGRAM_ABORT; + } else if (nwrite < 0) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + return PROGRAM_FAULT; + } + } else { + cwrite += nwrite; + continue; + } + } + + free(buffer_in); + free(buffer_out); + + return PROGRAM_OK; +} + +// client asks +int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api) +{ + const uint32_t length = pktlen; + char *buffer_in = (char *)malloc(length * sizeof(char)); + char *buffer_out = (char *)malloc(length * sizeof(char)); + + client_bussiness(buffer_out, buffer_in, length, false, &(client_handler->msg_idx)); + + int32_t cwrite = 0; + int32_t swrite = length; + while (cwrite < swrite) { + int32_t nwrite = write_api(client_handler->fd, buffer_out, length, api); + if (nwrite == 0) { + return PROGRAM_ABORT; + } else if (nwrite < 0) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + return PROGRAM_FAULT; + } + } else { + cwrite += nwrite; + continue; + } + } + + free(buffer_in); + free(buffer_out); + + return PROGRAM_OK; +} + +// client checks +int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api) +{ + const uint32_t length = pktlen; + char *buffer_in = (char *)malloc(length * sizeof(char)); + char *buffer_out = (char *)malloc(length * sizeof(char)); + + int32_t cread = 0; + int32_t sread = length; + while (cread < sread) { + int32_t nread = read_api(client_handler->fd, buffer_in, length, api); + if (nread == 0) { + return PROGRAM_ABORT; + } else if (nread < 0) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + return PROGRAM_FAULT; + } + } else { + cread += nread; + continue; + } + } + + if (client_bussiness(buffer_out, buffer_in, length, verify, &(client_handler->msg_idx)) < 0) { + PRINT_ERROR("message verify fault! "); + getchar(); + } + + int32_t cwrite = 0; + int32_t swrite = length; + while (cwrite < swrite) { + int32_t nwrite = write_api(client_handler->fd, buffer_out, length, api); + if (nwrite == 0) { + return PROGRAM_ABORT; + } else if (nwrite < 0) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + return PROGRAM_FAULT; + } + } else { + cwrite += nwrite; + continue; + } + } + + free(buffer_in); + free(buffer_out); + + return PROGRAM_OK; +} \ No newline at end of file diff --git a/examples/src/client.c b/examples/src/client.c new file mode 100644 index 0000000..aafcd00 --- /dev/null +++ b/examples/src/client.c @@ -0,0 +1,387 @@ +/* +* Copyright (c) 2022-2023. yyangoO. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +#include "client.h" + + +static pthread_mutex_t client_debug_mutex; // the client mutex for printf + + +// the single thread, client prints informations +void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug) +{ + if (debug == true) { + pthread_mutex_lock(&client_debug_mutex); + struct in_addr sin_addr; + sin_addr.s_addr = ip; + PRINT_CLIENT("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \ + ch_str, \ + getpid(), \ + pthread_self(), \ + act_str, \ + inet_ntoa(sin_addr), \ + ntohs(port)); + pthread_mutex_unlock(&client_debug_mutex); + } +} + +// the client prints informations +void client_info_print(struct Client *client) +{ + if (client->debug == false) { + struct timeval begin; + gettimeofday(&begin, NULL); + uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000; + + uint32_t curr_connect = 0; + double bytes_ps = 0; + uint64_t begin_send_bytes = 0; + struct ClientUnit *begin_uint = client->uints; + while (begin_uint != NULL) { + curr_connect += begin_uint->curr_connect; + begin_send_bytes += begin_uint->send_bytes; + begin_uint = begin_uint->next; + } + + struct timeval delay; + delay.tv_sec = 0; + delay.tv_usec = TERMINAL_REFRESH_MS * 1000; + select(0, NULL, NULL, NULL, &delay); + + uint64_t end_send_bytes = 0; + struct ClientUnit *end_uint = client->uints; + while (end_uint != NULL) { + end_send_bytes += end_uint->send_bytes; + end_uint = end_uint->next; + } + + struct timeval end; + gettimeofday(&end, NULL); + uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000; + + double bytes_sub = end_send_bytes > begin_send_bytes ? (double)(end_send_bytes - begin_send_bytes) : 0; + double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0; + + bytes_ps = bytes_sub / time_sub; + + if (bytes_ps < 1024) { + PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f B/s", curr_connect, bytes_ps); + } else if (bytes_ps < (1024 * 1024)) { + PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f KB/s", curr_connect, bytes_ps / 1024); + } else { + PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024)); + } + } +} + +// the single thread, client try to connect to server, register to epoll +int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, uint16_t port, const char *domain) +{ + int32_t create_socket_and_connect_ret = create_socket_and_connect(&(client_handler->fd), ip, port, domain); + if (create_socket_and_connect_ret == PROGRAM_INPROGRESS) { + struct epoll_event ep_ev; + ep_ev.events = EPOLLOUT; + ep_ev.data.ptr = (void *)client_handler; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_handler->fd, &ep_ev) < 0) { + PRINT_ERROR("client cant't set epoll %d! ", errno); + return PROGRAM_FAULT; + } + } + return PROGRAM_OK; +} + +// the single thread, client retry to connect to server, register to epoll +int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct ClientHandler *client_handler) +{ + int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_handler, client_unit->epfd, client_unit->ip, client_unit->port, client_unit->domain); + if (clithd_try_cnntask_ret < 0) { + if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) { + return PROGRAM_OK; + } + return PROGRAM_FAULT; + } + struct epoll_event ep_ev; + ep_ev.events = EPOLLIN | EPOLLET; + ep_ev.data.ptr = (void *)client_handler; + if (epoll_ctl(client_unit->epfd, EPOLL_CTL_ADD, client_handler->fd, &ep_ev) < 0) { + PRINT_ERROR("client cant't set epoll %d! ", errno); + return PROGRAM_FAULT; + } + + ++(client_unit->curr_connect); + + struct sockaddr_in server_addr; + socklen_t server_addr_len = sizeof(server_addr); + if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) { + PRINT_ERROR("client can't socket peername %d! ", errno); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + + int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api); + if (client_ask_ret == PROGRAM_FAULT) { + --client_unit->curr_connect; + struct epoll_event ep_ev; + if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, client_handler->fd, &ep_ev) < 0) { + PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_handler->fd, errno); + return PROGRAM_FAULT; + } + } else if (client_ask_ret == PROGRAM_ABORT) { + --client_unit->curr_connect; + if (close(client_handler->fd) < 0) { + PRINT_ERROR("client can't close the socket %d! ", errno); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } else { + client_unit->send_bytes += client_unit->pktlen; + client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } + + return PROGRAM_OK; +} + +// the single thread, client connects and gets epoll feature descriptors +int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit) +{ + const uint32_t connect_num = client_unit->connect_num; + + client_unit->epfd = epoll_create(CLIENT_EPOLL_SIZE_MAX); + if (client_unit->epfd < 0) { + PRINT_ERROR("client can't create epoll %d! ", errno); + return PROGRAM_FAULT; + } + + for (uint32_t i = 0; i < connect_num; ++i) { + int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_unit->handlers + i, client_unit->epfd, client_unit->ip, client_unit->port, client_unit->domain); + if (clithd_try_cnntask_ret < 0) { + if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) { + continue; + } + return PROGRAM_FAULT; + } else { + struct epoll_event ep_ev; + ep_ev.events = EPOLLIN | EPOLLET; + ep_ev.data.ptr = (struct ClientHandler *)(client_unit->handlers + i); + if (epoll_ctl(client_unit->epfd, EPOLL_CTL_ADD, (client_unit->handlers + i)->fd, &ep_ev) < 0) { + PRINT_ERROR("client cant't set epoll %d! ", errno); + return PROGRAM_FAULT; + } + + ++(client_unit->curr_connect); + + struct sockaddr_in server_addr; + socklen_t server_addr_len = sizeof(server_addr); + if (getpeername((client_unit->handlers + i)->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) { + PRINT_ERROR("client can't socket peername %d! ", errno); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + + int32_t client_ask_ret = client_ask(client_unit->handlers + i, client_unit->pktlen, client_unit->api); + if (client_ask_ret == PROGRAM_FAULT) { + --client_unit->curr_connect; + struct epoll_event ep_ev; + if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, (client_unit->handlers + i)->fd, &ep_ev) < 0) { + PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_unit->epevs[i].data.fd, errno); + return PROGRAM_FAULT; + } + } else if (client_ask_ret == PROGRAM_ABORT) { + --client_unit->curr_connect; + if (close((client_unit->handlers + i)->fd) < 0) { + PRINT_ERROR("client can't close the socket! "); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } else { + client_unit->send_bytes += client_unit->pktlen; + client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } + } + } + + return PROGRAM_OK; +} + +// the single thread, client processes epoll events +int32_t clithd_proc_epevs(struct ClientUnit *client_unit) +{ + int32_t epoll_nfds = epoll_wait(client_unit->epfd, client_unit->epevs, CLIENT_EPOLL_SIZE_MAX, CLIENT_EPOLL_WAIT_TIMEOUT); + if (epoll_nfds < 0) { + PRINT_ERROR("client epoll wait error %d! ", errno); + return PROGRAM_FAULT; + } + + for (int32_t i = 0; i < epoll_nfds; ++i) { + struct epoll_event *curr_epev = client_unit->epevs + i; + + if (curr_epev->events == EPOLLERR) { + PRINT_ERROR("client epoll wait error! %d", curr_epev->events); + return PROGRAM_FAULT; + } else if (curr_epev->events == EPOLLOUT) { + int32_t connect_error = 0; + socklen_t connect_error_len = sizeof(connect_error); + struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr; + if (getsockopt(client_handler->fd, SOL_SOCKET, SO_ERROR, (void *)(&connect_error), &connect_error_len) < 0) { + PRINT_ERROR("client can't get socket option %d! ", errno); + return PROGRAM_FAULT; + } + if (connect_error < 0) { + if (connect_error == ETIMEDOUT) { + if (client_thread_retry_connect(client_unit, client_handler) < 0) { + return PROGRAM_FAULT; + } + continue; + } + PRINT_ERROR("client connect error %d! ", connect_error); + return PROGRAM_FAULT; + } else { + ++(client_unit->curr_connect); + + struct sockaddr_in server_addr; + socklen_t server_addr_len = sizeof(server_addr); + if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) { + PRINT_ERROR("client can't socket peername %d! ", errno); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + + int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api); + if (client_ask_ret == PROGRAM_FAULT) { + --client_unit->curr_connect; + struct epoll_event ep_ev; + if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) { + PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno); + return PROGRAM_FAULT; + } + } else if (client_ask_ret == PROGRAM_ABORT) { + --client_unit->curr_connect; + if (close(curr_epev->data.fd) < 0) { + PRINT_ERROR("client can't close the socket! "); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } else { + client_unit->send_bytes += client_unit->pktlen; + client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } + } + } else if (curr_epev->events == EPOLLIN) { + struct sockaddr_in server_addr; + socklen_t server_addr_len = sizeof(server_addr); + struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr; + if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) { + PRINT_ERROR("client can't socket peername %d! ", errno); + return PROGRAM_FAULT; + } + int32_t client_chkans_ret = client_chkans((struct ClientHandler *)curr_epev->data.ptr, client_unit->pktlen, client_unit->verify, client_unit->api); + if (client_chkans_ret == PROGRAM_FAULT) { + --client_unit->curr_connect; + struct epoll_event ep_ev; + if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) { + PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno); + return PROGRAM_FAULT; + } + } else if (client_chkans_ret == PROGRAM_ABORT) { + --client_unit->curr_connect; + if (close(curr_epev->data.fd) < 0) { + PRINT_ERROR("client can't close the socket %d! ", errno); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } else { + client_unit->send_bytes += client_unit->pktlen; + client_debug_print("client unit", "receive", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + } + } + } + + return PROGRAM_OK; +} + +// create client of single thread and run +void *client_s_create_and_run(void *arg) +{ + struct ClientUnit *client_unit = (struct ClientUnit *)arg; + + if (client_thread_create_epfd_and_reg(client_unit) < 0) { + exit(PROGRAM_FAULT); + } + while (true) { + if (clithd_proc_epevs(client_unit) < 0) { + exit(PROGRAM_FAULT); + } + } + for (int i = 0; i < client_unit->connect_num; ++i) { + close((client_unit->handlers + i)->fd); + } + close(client_unit->epfd); + + return (void *)PROGRAM_OK; +} + +// create client and run +int32_t client_create_and_run(struct ProgramParams *params) +{ + const uint32_t connect_num = params->connect_num; + const uint32_t thread_num = params->thread_num; + pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t)); + struct Client *client = (struct Client *)malloc(sizeof(struct Client)); + struct ClientUnit *client_unit = (struct ClientUnit *)malloc(sizeof(struct ClientUnit)); + + if (pthread_mutex_init(&client_debug_mutex, NULL) < 0) { + PRINT_ERROR("client can't init posix mutex %d! ", errno); + return PROGRAM_FAULT; + } + + client->uints = client_unit; + client->debug = params->debug; + + for (uint32_t i = 0; i < thread_num; ++i) { + client_unit->handlers = (struct ClientHandler *)malloc(connect_num * sizeof(struct ClientHandler)); + for (uint32_t j = 0; j < connect_num; ++j) { + client_unit->handlers[j].fd = -1; + client_unit->handlers[j].msg_idx = 0; + } + client_unit->epfd = -1; + client_unit->epevs = (struct epoll_event *)malloc(CLIENT_EPOLL_SIZE_MAX * sizeof(struct epoll_event)); + client_unit->curr_connect = 0; + client_unit->send_bytes = 0; + client_unit->ip = inet_addr(params->ip); + client_unit->port = htons(params->port); + client_unit->connect_num = params->connect_num; + client_unit->pktlen = params->pktlen; + client_unit->verify = params->verify; + client_unit->domain = params->domain; + client_unit->api = params->api; + client_unit->debug = params->debug; + client_unit->next = (struct ClientUnit *)malloc(sizeof(struct ClientUnit)); + + if (pthread_create((tids + i), NULL, client_s_create_and_run, client_unit) < 0) { + PRINT_ERROR("client can't create thread of poisx %d! ", errno); + return PROGRAM_FAULT; + } + client_unit = client_unit->next; + } + + if (client->debug == false) { + printf("[program informations]: \n\n"); + } + while (true) { + client_info_print(client); + } + + pthread_mutex_destroy(&client_debug_mutex); + + return PROGRAM_OK; +} diff --git a/examples/src/parameter.c b/examples/src/parameter.c index 996188b..100ee11 100644 --- a/examples/src/parameter.c +++ b/examples/src/parameter.c @@ -22,148 +22,141 @@ const char prog_short_opts[] = \ "m:" // model "t:" // thread number "c:" // connect number + "D:" // communication domain "A:" // api "P:" // pktlen "v" // verify "r" // ringpmd + "d" // debug "h" // help ; // program long options -const struct ProgramOption prog_long_opts[] = { \ +const struct ProgramOption prog_long_opts[] = \ +{ {PARAM_NAME_AS, REQUIRED_ARGUMETN, NULL, PARAM_NUM_AS}, {PARAM_NAME_IP, REQUIRED_ARGUMETN, NULL, PARAM_NUM_IP}, {PARAM_NAME_PORT, REQUIRED_ARGUMETN, NULL, PARAM_NUM_PORT}, {PARAM_NAME_MODEL, REQUIRED_ARGUMETN, NULL, PARAM_NUM_MODEL}, {PARAM_NAME_THREAD_NUM, REQUIRED_ARGUMETN, NULL, PARAM_NUM_THREAD_NUM}, {PARAM_NAME_CONNECT_NUM, REQUIRED_ARGUMETN, NULL, PARAM_NUM_CONNECT_NUM}, + {PARAM_NAME_DOMAIN, REQUIRED_ARGUMETN, NULL, PARAM_NUM_DOMAIN}, {PARAM_NAME_API, REQUIRED_ARGUMETN, NULL, PARAM_NUM_API}, {PARAM_NAME_PKTLEN, REQUIRED_ARGUMETN, NULL, PARAM_NUM_PKTLEN}, {PARAM_NAME_VERIFY, NO_ARGUMENT, NULL, PARAM_NUM_VERIFY}, {PARAM_NAME_RINGPMD, NO_ARGUMENT, NULL, PARAM_NUM_RINGPMD}, + {PARAM_NAME_DEBUG, NO_ARGUMENT, NULL, PARAM_NUM_DEBUG}, {PARAM_NAME_HELP, NO_ARGUMENT, NULL, PARAM_NUM_HELP}, }; // get long options -int getopt_long(int argc, char * const argv[], const char *optstring, const struct ProgramOption *long_opts, - int *long_idx); +int getopt_long(int argc, char * const argv[], const char *optstring, const struct ProgramOption *long_opts, int *long_idx); // set `as` parameter -int32_t program_param_prase_as(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_as(struct ProgramParams *params) { - if (strcmp(arg, "server") == 0 || strcmp(arg, "client") == 0) { - params->as = arg; - } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; + if (strcmp(optarg, "server") == 0 || strcmp(optarg, "client") == 0) { + params->as = optarg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - - return PROGRAM_OK; } // set `ip` parameter -int32_t program_param_prase_ip(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_ip(struct ProgramParams *params) { - if (inet_addr(arg) != INADDR_NONE) { - params->ip = arg; - } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; + if (inet_addr(optarg) != INADDR_NONE) { + params->ip = optarg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - - return PROGRAM_OK; } // set `port` parameter -int32_t program_param_prase_port(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_port(struct ProgramParams *params) { - int32_t port_arg = atoi(optarg); + int32_t port_arg = strtol(optarg, NULL, 0); + printf("%d\n", port_arg); if (CHECK_VAL_RANGE(port_arg, UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX) == true) { params->port = (uint32_t)port_arg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; - } - - return PROGRAM_OK; } // set `model` parameter -int32_t program_param_prase_model(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_model(struct ProgramParams *params) { if (strcmp(optarg, "mum") == 0 || strcmp(optarg, "mud") == 0) { params->model = optarg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; - } - - return PROGRAM_OK; } // set `connect_num` parameter -int32_t program_param_prase_connectnum(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_connectnum(struct ProgramParams *params) { - int32_t connectnum_arg = atoi(optarg); + int32_t connectnum_arg = strtol(optarg, NULL, 0); if (connectnum_arg > 0) { params->connect_num = (uint32_t)connectnum_arg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; - } - - return PROGRAM_OK; } // set `thread_num` parameter -int32_t program_param_prase_threadnum(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_threadnum(struct ProgramParams *params) { - int32_t threadnum_arg = atoi(optarg); + int32_t threadnum_arg = strtol(optarg, NULL, 0); if (CHECK_VAL_RANGE(threadnum_arg, THREAD_NUM_MIN, THREAD_NUM_MAX) == true) { params->thread_num = (uint32_t)threadnum_arg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; - } +} - return PROGRAM_OK; +// set `domain` parameter +void program_param_parse_domain(struct ProgramParams *params) +{ + if (strcmp(optarg, "unix") == 0 || strcmp(optarg, "posix") == 0) { + params->domain = optarg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); + } } // set `api` parameter -int32_t program_param_prase_api(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_api(struct ProgramParams *params) { - if (strcmp(optarg, "unix") == 0 || strcmp(optarg, "posix") == 0) { + printf("aaaaaa %s\n", optarg); + if (strcmp(optarg, "readwrite") == 0 || strcmp(optarg, "recvsend") == 0 || strcmp(optarg, "recvsendmsg") == 0) { params->api = optarg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; - } - - return PROGRAM_OK; } // set `pktlen` parameter -int32_t program_param_prase_pktlen(struct ProgramParams *params, char *arg, const char *name) +void program_param_parse_pktlen(struct ProgramParams *params) { - int32_t pktlen_arg = atoi(optarg); + int32_t pktlen_arg = strtol(optarg, NULL, 0); if (CHECK_VAL_RANGE(pktlen_arg, MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX) == true) { params->pktlen = (uint32_t)pktlen_arg; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); } - else { - PRINT_ERROR("illigal argument -- %s \n", name); - return PROGRAM_ABORT; - } - - return PROGRAM_OK; } // initialize the parameters @@ -175,10 +168,12 @@ void program_params_init(struct ProgramParams *params) params->model = PARAM_DEFAULT_MODEL; params->thread_num = PARAM_DEFAULT_THREAD_NUM; params->connect_num = PARAM_DEFAULT_CONNECT_NUM; + params->domain = PARAM_DEFAULT_DOMAIN; params->api = PARAM_DEFAULT_API; params->pktlen = PARAM_DEFAULT_PKTLEN; params->verify = PARAM_DEFAULT_VERIFY; params->ringpmd = PARAM_DEFAULT_RINGPMD; + params->debug = PARAM_DEFAULT_DEBUG; } // print program helps @@ -188,19 +183,24 @@ void program_params_help(void) printf("-a, --as [server | client]: set programas server or client. \n"); printf(" server: as server. \n"); printf(" client: as client. \n"); - printf("-i, --ip [xxx.xxx.xxx.xxx]: set ip address. \n"); - printf("-p, --port [xxxx]: set port number in range of %d - %d. \n", UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX); + printf("-i, --ip [???.???.???.???]: set ip address. \n"); + printf("-p, --port [????]: set port number in range of %d - %d. \n", UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX); printf("-m, --model [mum | mud]: set the network model. \n"); printf(" mum: multi thread, unblock, multiplexing IO network model. \n"); printf(" mud: multi thread, unblock, dissymmetric network model. \n"); - printf("-t, --threadnum [xxxx]: set thread number in range of %d - %d. \n", THREAD_NUM_MIN, THREAD_NUM_MAX); - printf("-c, --connectnum [xxxx]: set thread number of connection. \n"); - printf("-A, --api [unix | posix]: set api type is server or client. \n"); + printf("-t, --threadnum [???]: set thread number in range of %d - %d. \n", THREAD_NUM_MIN, THREAD_NUM_MAX); + printf("-c, --connectnum [???]: set connection number of each thread. \n"); + printf("-D, --domain [unix | posix]: set domain type is server or client. \n"); printf(" unix: use unix's api. \n"); printf(" posix: use posix api. \n"); - printf("-P, --pktlen [xxxx]: set packet length in range of %d - %d. \n", MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX); + printf("-A, --api [readwrite | recvsend | recvsendmsg]: set api type is server or client. \n"); + printf(" readwrite: use `read` and `write`. \n"); + printf(" recvsend: use `recv and `send`. \n"); + printf(" recvsendmsg: use `recvmsg` and `sendmsg`. \n"); + printf("-P, --pktlen [????]: set packet length in range of %d - %d. \n", MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX); printf("-v, --verify: set to verifying the message packet. \n"); - printf("-r, --ringpmd: set use ringpmd. \n"); + printf("-r, --ringpmd: set to use ringpmd. \n"); + printf("-d, --debug: set to print the debug information. \n"); printf("-h, --help: see helps. \n"); printf("\n"); } @@ -208,40 +208,44 @@ void program_params_help(void) // parse the parameters int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char *argv[]) { - int32_t ret = PROGRAM_OK; + int32_t c; - while (ret == PROGRAM_OK) { + while (true) { int32_t opt_idx = 0; - int32_t c = getopt_long(argc, argv, prog_short_opts, prog_long_opts, &opt_idx); + c = getopt_long(argc, argv, prog_short_opts, prog_long_opts, &opt_idx); + if (c == -1) { break; } switch (c) { case (PARAM_NUM_AS): - ret = program_param_prase_as(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_as(params); break; case (PARAM_NUM_IP): - ret = program_param_prase_ip(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_ip(params); break; case (PARAM_NUM_PORT): - ret = program_param_prase_port(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_port(params); break; case (PARAM_NUM_MODEL): - ret = program_param_prase_model(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_model(params); break; case (PARAM_NUM_CONNECT_NUM): - ret = program_param_prase_connectnum(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_connectnum(params); break; case (PARAM_NUM_THREAD_NUM): - ret = program_param_prase_threadnum(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_threadnum(params); + break; + case (PARAM_NUM_DOMAIN): + program_param_parse_domain(params); break; case (PARAM_NUM_API): - ret = program_param_prase_api(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_api(params); break; case (PARAM_NUM_PKTLEN): - ret = program_param_prase_pktlen(params, optarg, prog_long_opts[opt_idx].name); + program_param_parse_pktlen(params); break; case (PARAM_NUM_VERIFY): params->verify = true; @@ -249,6 +253,9 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char * case (PARAM_NUM_RINGPMD): params->ringpmd = true; break; + case (PARAM_NUM_DEBUG): + params->debug = true; + break; case (PARAM_NUM_HELP): program_params_help(); return PROGRAM_ABORT; @@ -260,7 +267,12 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char * } } - return ret; + if (strcmp(params->domain, "unix") == 0) { + params->thread_num = 1; + params->connect_num = 1; + } + + return PROGRAM_OK; } // print the parameters @@ -269,14 +281,28 @@ void program_params_print(struct ProgramParams *params) printf("\n"); printf("[program parameters]: \n"); printf("--> [as]: %s \n", params->as); - printf("--> [ip]: %s \n", params->ip); - printf("--> [port]: %u \n", params->port); - printf("--> [model]: %s \n", params->model); - printf("--> [thread number]: %u \n", params->thread_num); - printf("--> [connection number]: %u \n", params->connect_num); - printf("--> [api]: %s \n", params->api); + printf("--> [server ip]: %s \n", params->ip); + printf("--> [server port]: %u \n", params->port); + if (strcmp(params->as, "server") == 0) { + printf("--> [model]: %s \n", params->model); + } + if ((strcmp(params->as, "server") == 0 && strcmp(params->model, "mum") == 0) || strcmp(params->as, "client") == 0) { + printf("--> [thread number]: %u \n", params->thread_num); + } + if (strcmp(params->as, "client") == 0) { + printf("--> [connection number]: %u \n", params->connect_num); + } + printf("--> [domain]: %s \n", params->domain); + if (strcmp(params->api, "readwrite") == 0) { + printf("--> [api]: read & write \n"); + } else if (strcmp(params->api, "recvsend") == 0) { + printf("--> [api]: recv & send \n"); + } else { + printf("--> [api]: recvmsg & sendmsg \n"); + } printf("--> [packet length]: %u \n", params->pktlen); - printf("--> [verify]: %s \n", (true == params->verify) ? "on" : "off"); - printf("--> [ringpmd]: %s \n", (true == params->ringpmd) ? "on" : "off"); + printf("--> [verify]: %s \n", (params->verify == true) ? "on" : "off"); + printf("--> [ringpmd]: %s \n", (params->ringpmd == true) ? "on" : "off"); + printf("--> [debug]: %s \n", (params->debug == true) ? "on" : "off"); printf("\n"); } diff --git a/examples/src/server.c b/examples/src/server.c new file mode 100644 index 0000000..d1dab72 --- /dev/null +++ b/examples/src/server.c @@ -0,0 +1,578 @@ +/* +* Copyright (c) 2022-2023. yyangoO. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +#include "server.h" + + +static pthread_mutex_t server_debug_mutex; // the server mutex for debug + +// server debug information print +void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug) +{ + if (debug == true) { + pthread_mutex_lock(&server_debug_mutex); + struct in_addr sin_addr; + sin_addr.s_addr = ip; + PRINT_SERVER("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \ + ch_str, \ + getpid(), \ + pthread_self(), \ + act_str, \ + inet_ntoa(sin_addr), \ + ntohs(port)); + pthread_mutex_unlock(&server_debug_mutex); + } +} + +// the multi thread, unblock, dissymmetric server prints informations +void sermud_info_print(struct ServerMud *server_mud) +{ + if (server_mud->debug == false) { + uint32_t curr_connect = server_mud->curr_connect; + + struct timeval begin; + gettimeofday(&begin, NULL); + uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000; + + double bytes_ps = 0; + uint64_t begin_recv_bytes = 0; + struct ServerMudWorker *begin_uint = server_mud->workers; + while (begin_uint != NULL) { + begin_recv_bytes += begin_uint->recv_bytes; + begin_uint = begin_uint->next; + } + + struct timeval delay; + delay.tv_sec = 0; + delay.tv_usec = TERMINAL_REFRESH_MS * 1000; + select(0, NULL, NULL, NULL, &delay); + + uint64_t end_recv_bytes = 0; + struct ServerMudWorker *end_uint = server_mud->workers; + while (end_uint != NULL) { + end_recv_bytes += end_uint->recv_bytes; + end_uint = end_uint->next; + } + + struct timeval end; + gettimeofday(&end, NULL); + uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000; + + double bytes_sub = end_recv_bytes > begin_recv_bytes ? (double)(end_recv_bytes - begin_recv_bytes) : 0; + double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0; + + bytes_ps = bytes_sub / time_sub; + + if (bytes_ps < 1024) { + PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f B/s", curr_connect, bytes_ps); + } else if (bytes_ps < (1024 * 1024)) { + PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f KB/s", curr_connect, bytes_ps / 1024); + } else { + PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024)); + } + } +} + +// the worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors +int32_t sermud_worker_create_epfd_and_reg(struct ServerMudWorker *worker_unit) +{ + worker_unit->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX); + if (worker_unit->epfd < 0) { + PRINT_ERROR("server can't create epoll %d! ", worker_unit->epfd); + return PROGRAM_FAULT; + } + + struct epoll_event ep_ev; + ep_ev.data.ptr = (void *)&(worker_unit->worker); + ep_ev.events = EPOLLIN | EPOLLET; + if (epoll_ctl(worker_unit->epfd, EPOLL_CTL_ADD, worker_unit->worker.fd, &ep_ev) < 0) { + PRINT_ERROR("server can't control epoll %d! ", errno); + return PROGRAM_FAULT; + } + + return PROGRAM_OK; +} + +// the listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors +int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud) +{ + server_mud->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX); + if (server_mud->epfd < 0) { + PRINT_ERROR("server can't create epoll %d! ", server_mud->epfd); + return PROGRAM_FAULT; + } + + struct epoll_event ep_ev; + ep_ev.data.ptr = (void *)&(server_mud->listener); + ep_ev.events = EPOLLIN | EPOLLET; + if (epoll_ctl(server_mud->epfd, EPOLL_CTL_ADD, server_mud->listener.fd, &ep_ev) < 0) { + PRINT_ERROR("server can't control epoll %d! ", errno); + return PROGRAM_FAULT; + } + + server_debug_print("server mud listener", "waiting", server_mud->ip, server_mud->port, server_mud->debug); + + return PROGRAM_OK; +} + +// the listener thread, unblock, dissymmetric server accepts the connections +int32_t sermud_listener_accept_connects(struct ServerMud *server_mud) +{ + while (true) { + struct sockaddr_in accept_addr; + uint32_t sockaddr_in_len = sizeof(struct sockaddr_in); + int32_t accept_fd = accept(server_mud->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len); + if (accept_fd < 0) { + break; + } + + if (set_socket_unblock(accept_fd) < 0) { + PRINT_ERROR("server can't set the connect socket to unblock! "); + return PROGRAM_FAULT; + } + + ++(server_mud->curr_connect); + + pthread_t *tid = (pthread_t *)malloc(sizeof(pthread_t)); + struct ServerMudWorker *worker = (struct ServerMudWorker *)malloc(sizeof(struct ServerMudWorker)); + worker->worker.fd = accept_fd; + worker->epfd = -1; + worker->epevs = (struct epoll_event *)malloc(sizeof(struct epoll_event)); + worker->recv_bytes = 0; + worker->pktlen = server_mud->pktlen; + worker->ip = accept_addr.sin_addr.s_addr; + worker->port = accept_addr.sin_port; + worker->api = server_mud->api; + worker->debug = server_mud->debug; + worker->next = server_mud->workers; + + server_mud->workers = worker; + + if (pthread_create(tid, NULL, sermud_worker_create_and_run, server_mud->workers) < 0) { + PRINT_ERROR("server can't create poisx thread %d! ", errno); + return PROGRAM_FAULT; + } + + server_debug_print("server mud listener", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_mud->debug); + } + + return PROGRAM_OK; +} + +// the worker thread, unblock, dissymmetric server processes the events +int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit) +{ + int32_t epoll_nfds = epoll_wait(worker_unit->epfd, worker_unit->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT); + if (epoll_nfds < 0) { + PRINT_ERROR("server epoll wait error %d! ", errno); + return PROGRAM_FAULT; + } + + for (int32_t i = 0; i < epoll_nfds; ++i) { + struct epoll_event *curr_epev = worker_unit->epevs + i; + + if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) { + PRINT_ERROR("server epoll wait error %d! ", curr_epev->events); + return PROGRAM_FAULT; + } + + if (curr_epev->events == EPOLLIN) { + struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr; + + int32_t server_ans_ret = server_ans(server_handler, worker_unit->pktlen, worker_unit->api); + if (server_ans_ret == PROGRAM_FAULT) { + struct epoll_event ep_ev; + if (epoll_ctl(worker_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) { + PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno); + return PROGRAM_FAULT; + } + } else if (server_ans_ret == PROGRAM_ABORT) { + if (close(server_handler->fd) < 0) { + PRINT_ERROR("server can't close the socket %d! ", errno); + return PROGRAM_FAULT; + } + server_debug_print("server mud worker", "close", worker_unit->ip, worker_unit->port, worker_unit->debug); + } else { + worker_unit->recv_bytes += worker_unit->pktlen; + server_debug_print("server mud worker", "receive", worker_unit->ip, worker_unit->port, worker_unit->debug); + } + } + } + + return PROGRAM_OK; +} + +// the listener thread, unblock, dissymmetric server processes the events +int32_t sermud_listener_proc_epevs(struct ServerMud *server_mud) +{ + int32_t epoll_nfds = epoll_wait(server_mud->epfd, server_mud->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT); + if (epoll_nfds < 0) { + PRINT_ERROR("server epoll wait error %d! ", errno); + return PROGRAM_FAULT; + } + + for (int32_t i = 0; i < epoll_nfds; ++i) { + struct epoll_event *curr_epev = server_mud->epevs + i; + + if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) { + PRINT_ERROR("server epoll wait error %d! ", curr_epev->events); + return PROGRAM_FAULT; + } + + if (curr_epev->events == EPOLLIN) { + int32_t sermud_listener_accept_connects_ret = sermud_listener_accept_connects(server_mud); + if (sermud_listener_accept_connects_ret < 0) { + PRINT_ERROR("server try accept error %d! ", sermud_listener_accept_connects_ret); + return PROGRAM_FAULT; + } + } + } + + return PROGRAM_OK; +} + +// create the worker thread, unblock, dissymmetric server and run +void *sermud_worker_create_and_run(void *arg) +{ + pthread_detach(pthread_self()); + + struct ServerMudWorker *worker_unit = (struct ServerMudWorker *)arg; + + if (sermud_worker_create_epfd_and_reg(worker_unit) < 0) { + exit(PROGRAM_FAULT); + } + while (true) { + if (sermud_worker_proc_epevs(worker_unit) < 0) { + exit(PROGRAM_FAULT); + } + } + + close(worker_unit->worker.fd); + close(worker_unit->epfd); + + return (void *)PROGRAM_OK; +} + +// create the listener thread, unblock, dissymmetric server and run +void *sermud_listener_create_and_run(void *arg) +{ + struct ServerMud *server_mud = (struct ServerMud *)arg; + + if (create_socket_and_listen(&(server_mud->listener.fd), server_mud->ip, server_mud->port, server_mud->domain) < 0) { + exit(PROGRAM_FAULT); + } + if (sermud_listener_create_epfd_and_reg(server_mud) < 0) { + exit(PROGRAM_FAULT); + } + while (true) { + if (sermud_listener_proc_epevs(server_mud) < 0) { + exit(PROGRAM_FAULT); + } + } + if (close(server_mud->listener.fd) < 0 || close(server_mud->epfd) < 0) { + exit(PROGRAM_FAULT); + } + + return (void *)PROGRAM_OK; +} + +// create the multi thread, unblock, dissymmetric server and run +int32_t sermud_create_and_run(struct ProgramParams *params) +{ + pthread_t *tid = (pthread_t *)malloc(sizeof(pthread_t)); + struct ServerMud *server_mud = (struct ServerMud *)malloc(sizeof(struct ServerMud)); + + if (pthread_mutex_init(&server_debug_mutex, NULL) < 0) { + PRINT_ERROR("server can't init posix mutex %d! ", errno); + return PROGRAM_FAULT; + } + + server_mud->listener.fd = -1; + server_mud->workers = NULL; + server_mud->epfd = -1; + server_mud->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event)); + server_mud->curr_connect = 0; + server_mud->ip = inet_addr(params->ip); + server_mud->port = htons(params->port); + server_mud->pktlen = params->pktlen; + server_mud->domain = params->domain; + server_mud->api = params->api; + server_mud->debug = params->debug; + + if (pthread_create(tid, NULL, sermud_listener_create_and_run, server_mud) < 0) { + PRINT_ERROR("server can't create poisx thread %d! ", errno); + return PROGRAM_FAULT; + } + + if (server_mud->debug == false) { + printf("[program informations]: \n\n"); + } + while (true) { + sermud_info_print(server_mud); + } + + pthread_mutex_destroy(&server_debug_mutex); + + return PROGRAM_OK; +} + +// the multi thread, unblock, mutliplexing IO server prints informations +void sermum_info_print(struct ServerMum *server_mum) +{ + if (server_mum->debug == false) { + struct timeval begin; + gettimeofday(&begin, NULL); + uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000; + + uint32_t curr_connect = 0; + double bytes_ps = 0; + uint64_t begin_recv_bytes = 0; + struct ServerMumUnit *begin_uint = server_mum->uints; + while (begin_uint != NULL) { + curr_connect += begin_uint->curr_connect; + begin_recv_bytes += begin_uint->recv_bytes; + begin_uint = begin_uint->next; + } + + struct timeval delay; + delay.tv_sec = 0; + delay.tv_usec = TERMINAL_REFRESH_MS * 1000; + select(0, NULL, NULL, NULL, &delay); + + uint64_t end_recv_bytes = 0; + struct ServerMumUnit *end_uint = server_mum->uints; + while (end_uint != NULL) { + end_recv_bytes += end_uint->recv_bytes; + end_uint = end_uint->next; + } + + struct timeval end; + gettimeofday(&end, NULL); + uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000; + + double bytes_sub = end_recv_bytes > begin_recv_bytes ? (double)(end_recv_bytes - begin_recv_bytes) : 0; + double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0; + + bytes_ps = bytes_sub / time_sub; + + if (bytes_ps < 1024) { + PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f B/s", curr_connect, bytes_ps); + } else if (bytes_ps < (1024 * 1024)) { + PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f KB/s", curr_connect, bytes_ps / 1024); + } else { + PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024)); + } + } +} + +// the single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors +int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit) +{ + server_unit->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX); + if (server_unit->epfd < 0) { + PRINT_ERROR("server can't create epoll %d! ", server_unit->epfd); + return PROGRAM_FAULT; + } + + struct epoll_event ep_ev; + ep_ev.data.ptr = (void *)&(server_unit->listener); + ep_ev.events = EPOLLIN | EPOLLET; + if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, server_unit->listener.fd, &ep_ev) < 0) { + PRINT_ERROR("server can't control epoll %d! ", errno); + return PROGRAM_FAULT; + } + + server_debug_print("server mum unit", "waiting", server_unit->ip, server_unit->port, server_unit->debug); + + return PROGRAM_OK; +} + +// the single thread, unblock, mutliplexing IO server accepts the connections +int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler) +{ + while (true) { + struct sockaddr_in accept_addr; + uint32_t sockaddr_in_len = sizeof(struct sockaddr_in); + int32_t accept_fd = accept(server_unit->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len); + if (accept_fd < 0) { + break; + } + + if (set_socket_unblock(accept_fd) < 0) { + PRINT_ERROR("server can't set the connect socket to unblock! "); + return PROGRAM_FAULT; + } + + struct ServerHandler *server_handler = (struct ServerHandler *)malloc(sizeof(struct ServerHandler)); + server_handler->fd = accept_fd; + struct epoll_event ep_ev; + ep_ev.data.ptr = (void *)server_handler; + ep_ev.events = EPOLLIN | EPOLLET; + if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, accept_fd, &ep_ev) < 0) { + PRINT_ERROR("server can't add socket '%d' to control epoll %d! ", accept_fd, errno); + return PROGRAM_FAULT; + } + + ++server_unit->curr_connect; + + server_debug_print("server mum unit", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_unit->debug); + } + + return PROGRAM_OK; +} + +// the single thread, unblock, mutliplexing IO server processes the events +int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit) +{ + int32_t epoll_nfds = epoll_wait(server_unit->epfd, server_unit->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT); + if (epoll_nfds < 0) { + PRINT_ERROR("server epoll wait error %d! ", errno); + return PROGRAM_FAULT; + } + + for (int32_t i = 0; i < epoll_nfds; ++i) { + struct epoll_event *curr_epev = server_unit->epevs + i; + + if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) { + PRINT_ERROR("server epoll wait error %d! ", curr_epev->events); + return PROGRAM_FAULT; + } + + if (curr_epev->events == EPOLLIN) { + if (curr_epev->data.ptr == (void *)&(server_unit->listener)) { + int32_t sersum_accept_connects_ret = sersum_accept_connects(server_unit, &(server_unit->listener)); + if (sersum_accept_connects_ret < 0) { + PRINT_ERROR("server try accept error %d! ", sersum_accept_connects_ret); + return PROGRAM_FAULT; + } + continue; + } else { + struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr; + struct sockaddr_in connect_addr; + socklen_t connect_addr_len = sizeof(connect_addr); + if (getpeername(server_handler->fd, (struct sockaddr *)&connect_addr, &connect_addr_len) < 0) { + PRINT_ERROR("server can't socket peername %d! ", errno); + return PROGRAM_FAULT; + } + + int32_t server_ans_ret = server_ans(server_handler, server_unit->pktlen, server_unit->api); + if (server_ans_ret == PROGRAM_FAULT) { + --server_unit->curr_connect; + struct epoll_event ep_ev; + if (epoll_ctl(server_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) { + PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno); + return PROGRAM_FAULT; + } + } else if (server_ans_ret == PROGRAM_ABORT) { + --server_unit->curr_connect; + if (close(server_handler->fd) < 0) { + PRINT_ERROR("server can't close the socket %d! ", errno); + return PROGRAM_FAULT; + } + server_debug_print("server mum unit", "close", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug); + } else { + server_unit->recv_bytes += server_unit->pktlen; + server_debug_print("server mum unit", "receive", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug); + } + } + } + } + + return PROGRAM_OK; +} + +// create the single thread, unblock, mutliplexing IO server +void *sersum_create_and_run(void *arg) +{ + struct ServerMumUnit *server_unit = (struct ServerMumUnit *)arg; + + if (create_socket_and_listen(&(server_unit->listener.fd), server_unit->ip, server_unit->port, server_unit->domain) < 0) { + exit(PROGRAM_FAULT); + } + if (sersum_create_epfd_and_reg(server_unit) < 0) { + exit(PROGRAM_FAULT); + } + while (true) { + if (sersum_proc_epevs(server_unit) < 0) { + exit(PROGRAM_FAULT); + } + } + + close(server_unit->listener.fd); + close(server_unit->epfd); + + return (void *)PROGRAM_OK; +} + +// create the multi thread, unblock, mutliplexing IO server +int32_t sermum_create_and_run(struct ProgramParams *params) +{ + const uint32_t thread_num = params->thread_num; + pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t)); + struct ServerMum *server_mum = (struct ServerMum *)malloc(sizeof(struct ServerMum)); + struct ServerMumUnit *server_unit = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit)); + + if (pthread_mutex_init(&server_debug_mutex, NULL) < 0) { + PRINT_ERROR("server can't init posix mutex %d! ", errno); + return PROGRAM_FAULT; + } + + server_mum->uints = server_unit; + server_mum->debug = params->debug; + + for (uint32_t i = 0; i < thread_num; ++i) { + server_unit->listener.fd = -1; + server_unit->epfd = -1; + server_unit->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event)); + server_unit->curr_connect = 0; + server_unit->recv_bytes = 0; + server_unit->ip = inet_addr(params->ip); + server_unit->port = htons(params->port); + server_unit->pktlen = params->pktlen; + server_unit->domain = params->domain; + server_unit->api = params->api; + server_unit->debug = params->debug; + server_unit->next = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit)); + + if (pthread_create((tids + i), NULL, sersum_create_and_run, server_unit) < 0) { + PRINT_ERROR("server can't create poisx thread %d! ", errno); + return PROGRAM_FAULT; + } + server_unit = server_unit->next; + } + + if (server_mum->debug == false) { + printf("[program informations]: \n\n"); + } + while (true) { + sermum_info_print(server_mum); + } + + pthread_mutex_destroy(&server_debug_mutex); + + return PROGRAM_OK; +} + +// create server and run +int32_t server_create_and_run(struct ProgramParams *params) +{ + int32_t ret = PROGRAM_OK; + + if (strcmp(params->model, "mum") == 0) { + ret = sermum_create_and_run(params); + } else { + ret = sermud_create_and_run(params); + } + + return ret; +} diff --git a/examples/src/utilities.c b/examples/src/utilities.c new file mode 100644 index 0000000..b6ed269 --- /dev/null +++ b/examples/src/utilities.c @@ -0,0 +1,128 @@ +/* +* Copyright (c) 2022-2023. yyangoO. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +#include "utilities.h" + + +// create the socket and listen +int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain) +{ + if (strcmp(domain, "posix") == 0) { + *socket_fd = socket(AF_INET, SOCK_STREAM, 0); + if (*socket_fd < 0) { + PRINT_ERROR("can't create socket %d! ", errno); + return PROGRAM_FAULT; + } + } else { + *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (*socket_fd < 0) { + PRINT_ERROR("can't create socket %d! ", errno); + return PROGRAM_FAULT; + } + } + + int32_t port_multi = 1; + if (setsockopt(*socket_fd, SOL_SOCKET, SO_REUSEPORT, (void *)&port_multi, sizeof(int32_t)) < 0) { + PRINT_ERROR("can't set the option of socket %d! ", errno); + return PROGRAM_FAULT; + } + + if (set_socket_unblock(*socket_fd) < 0) { + PRINT_ERROR("can't set the socket to unblock! "); + return PROGRAM_FAULT; + } + + if (strcmp(domain, "posix") == 0) { + struct sockaddr_in socket_addr; + memset_s(&socket_addr, sizeof(socket_addr), 0, sizeof(socket_addr)); + socket_addr.sin_family = AF_INET; + socket_addr.sin_addr.s_addr = ip; + socket_addr.sin_port = port; + if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_in)) < 0) { + PRINT_ERROR("can't bind the address to socket %d! ", errno); + return PROGRAM_FAULT; + } + + if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) { + PRINT_ERROR("server socket can't lisiten %d! ", errno); + return PROGRAM_FAULT; + } + } else { + struct sockaddr_un socket_addr; + unlink(SOCKET_UNIX_DOMAIN_FILE); + socket_addr.sun_family = AF_UNIX; + strcpy_s(socket_addr.sun_path, sizeof(socket_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE); + if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_un)) < 0) { + PRINT_ERROR("can't bind the address to socket %d! ", errno); + return PROGRAM_FAULT; + } + + if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) { + PRINT_ERROR("server socket can't lisiten %d! ", errno); + return PROGRAM_FAULT; + } + } + + return PROGRAM_OK; +} + +// create the socket and connect +int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain) +{ + if (strcmp(domain, "posix") == 0) { + *socket_fd = socket(AF_INET, SOCK_STREAM, 0); + if (*socket_fd < 0) { + PRINT_ERROR("client can't create socket %d! ", errno); + return PROGRAM_FAULT; + } + + struct sockaddr_in server_addr; + memset_s(&server_addr, sizeof(server_addr), 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = ip; + server_addr.sin_port = port; + if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in)) < 0) { + if (errno == EINPROGRESS) { + return PROGRAM_INPROGRESS; + } else { + PRINT_ERROR("client can't connect to the server %d! ", errno); + return PROGRAM_FAULT; + } + } + } else { + *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (*socket_fd < 0) { + PRINT_ERROR("client can't create socket %d! ", errno); + return PROGRAM_FAULT; + } + + struct sockaddr_un server_addr; + server_addr.sun_family = AF_UNIX; + strcpy_s(server_addr.sun_path, sizeof(server_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE); + if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_un)) < 0) { + if (errno == EINPROGRESS) { + return PROGRAM_INPROGRESS; + } else { + PRINT_ERROR("client can't connect to the server %d! ", errno); + return PROGRAM_FAULT; + } + } + } + return PROGRAM_OK; +} + +// set the socket to unblock +int32_t set_socket_unblock(int32_t socket_fd) +{ + return fcntl(socket_fd, F_SETFL, fcntl(socket_fd, F_GETFD, 0) | O_NONBLOCK); +} -- 2.23.0