dpu-utilities/0001-rewrite-client-rexec_run.patch
Weifeng Su fe851cd555 Sync patches from source
The change in patches:
1. Fix cache issue when recreate file
2. Introduce CMAKE to build userspace apps
3. CleanCode

Signed-off-by: Weifeng Su <suweifeng1@huawei.com>
2023-06-12 08:24:22 +00:00

548 lines
19 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 47fdab7bf180b058f6bbed10dd17e9a4c784eecc Mon Sep 17 00:00:00 2001
From: liqiang <liqiang64@huawei.com>
Date: Thu, 1 Jun 2023 15:46:05 +0800
Subject: rewrite client rexec_run
Signed-off-by: liqiang <liqiang64@huawei.com>
---
qtfs/rexec/rexec.c | 256 +++++++++++++++++++++++++++++---------
qtfs/rexec/rexec.h | 8 ++
qtfs/rexec/rexec_server.c | 51 +++++---
3 files changed, 236 insertions(+), 79 deletions(-)
diff --git a/qtfs/rexec/rexec.c b/qtfs/rexec/rexec.c
index 4dd206d..489ebec 100644
--- a/qtfs/rexec/rexec.c
+++ b/qtfs/rexec/rexec.c
@@ -42,6 +42,21 @@
#define REXEC_MSG_LEN 1024
FILE *rexec_logfile = NULL;
+struct rexec_global_var {
+ int rexec_hs_fd[2];
+};
+
+struct rexec_global_var g_rexec;
+
+
+struct rexec_client_event {
+ int fd;
+ int outfd; // for stdin out err and other pipe
+ int (*handler)(struct rexec_client_event *);
+ int *exit_status;
+ int *pidfd;
+};
+
#define REXEC_PIDMAP_PATH "/var/run/rexec/pids"
#define REXEC_PIDMAP_PATH_LEN 64
#define REXEC_PID_LEN 16
@@ -84,37 +99,39 @@ static int rexec_msg_fill_argv(int argc, char *argv[], char *msg)
return offset;
}
-static int rexec_io(int infd, int outfd, char *buf, int buflen)
+static int rexec_io(struct rexec_client_event *evt)
{
+#define MAX_MSG_LEN 256
+ char buf[MAX_MSG_LEN];
int len;
int ret;
- while ((len = read(infd, buf, buflen)) > 0) {
- ret = write(outfd, buf, len);
+ while ((len = read(evt->fd, buf, MAX_MSG_LEN)) > 0) {
+ ret = write(evt->outfd, buf, len);
if (ret <= 0) {
- rexec_err("Read from fd:%d len:%d write to fd:%d failed ret:%d", infd, len, outfd, ret);
- return -1;
+ rexec_err("Read from fd:%d len:%d write to fd:%d failed ret:%d", evt->fd, len, evt->outfd, ret);
+ return REXEC_EVENT_EXIT;
}
if (ret != len) {
- rexec_err("Read from fd:%d len:%d but write to fd:%d ret:%d", infd, len, outfd, ret);
+ rexec_err("Read from fd:%d len:%d but write to fd:%d ret:%d", evt->fd, len, evt->outfd, ret);
}
}
- return 0;
+ return REXEC_EVENT_OK;
}
// return -1 means process exit.
-static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
+static int rexec_conn_msg(struct rexec_client_event *evt)
{
struct rexec_msg head;
- int ret = recv(connfd, &head, sizeof(struct rexec_msg), MSG_WAITALL);
+ int ret = recv(evt->fd, &head, sizeof(struct rexec_msg), MSG_WAITALL);
if (ret <= 0) {
rexec_err("Rexec conn recv err:%d errno:%d", ret, errno);
- return -1;
+ return REXEC_EVENT_EXIT;
}
switch (head.msgtype) {
case REXEC_KILL:
- *exit_status = head.exit_status;
+ *evt->exit_status = head.exit_status;
rexec_err("Rexec conn recv kill msg, exit:%d now.", head.exit_status);
- return -1;
+ return REXEC_EVENT_EXIT;
case REXEC_PIDMAP: {
int mypid = getpid();
int peerpid = head.pid;
@@ -122,9 +139,9 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
char buf[REXEC_PID_LEN] = {0};
int fd;
int err;
- if (*pidfd > 0) {
+ if (*evt->pidfd > 0) {
rexec_err("Rexec pidmap msg > 1 error.");
- return 0;
+ return REXEC_EVENT_OK;
}
sprintf(path, "%s/%d", REXEC_PIDMAP_PATH, mypid);
fd = open(path, O_CREAT|O_WRONLY, 0600);
@@ -133,23 +150,41 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
mypid, peerpid, path, fd);
break;
}
- *pidfd = fd;
+ *evt->pidfd = fd;
if ((err = flock(fd, LOCK_EX)) != 0) {
rexec_err("Rexec flock file:%s failed, errno:%d rexec exit.", path, err);
- return -1;
+ return REXEC_EVENT_EXIT;
}
if ((err = ftruncate(fd, 0)) != 0) {
rexec_err("Rexec pidmap file:%s clear failed errno:%d rexec exit.", path, err);
- return -1;
+ return REXEC_EVENT_EXIT;
}
if ((err = lseek(fd, 0, SEEK_SET)) < 0) {
rexec_err("Rexec pidmap file:%s lseek 0 failed errno:%d rexec exit", path, err);
- return -1;
+ return REXEC_EVENT_EXIT;
}
sprintf(buf, "%d", peerpid);
if ((err = write(fd, buf, strlen(buf))) <= 0) {
rexec_err("Rexec pidmap file:%s write pid:%d failed errno:%d rexec exit.", path, peerpid, err);
- return -1;
+ return REXEC_EVENT_EXIT;
+ }
+ if (g_rexec.rexec_hs_fd[PIPE_WRITE] != -1 && g_rexec.rexec_hs_fd[PIPE_READ] != -1) {
+ err = write(g_rexec.rexec_hs_fd[PIPE_WRITE], "1", 1);
+ if (err <= 0) {
+ rexec_err("rexec handshake write 1 failed, hs write:%d.", g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ return REXEC_EVENT_ERR;
+ }
+ } else {
+ char msg[sizeof(struct rexec_msg) + 1];
+ struct rexec_msg *hs = msg;
+ char *ok = hs->msg;
+ hs->msgtype = REXEC_HANDSHAKE;
+ hs->msglen = 1;
+ *ok = '1';
+ if (write(evt->fd, hs, sizeof(struct rexec_msg) + 1) <= 0) {
+ rexec_err("send handshake failed, remote process will die");
+ return REXEC_EVENT_EXIT;
+ }
}
break;
}
@@ -159,6 +194,35 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
rexec_log("Rexec conn recv msgtype:%d argc:%d stdno:%d msglen:%d",
head.msgtype, head.argc, head.stdno, head.msglen);
+ return REXEC_EVENT_OK;
+}
+
+static struct rexec_client_event *rexec_add_event(int efd, int fd, int outfd, int (*handler)(struct rexec_client_event *))
+{
+ struct rexec_client_event *event = (struct rexec_client_event *)malloc(sizeof(struct rexec_client_event));
+ if (event == NULL) {
+ rexec_err("malloc failed.");
+ return NULL;
+ }
+ event->fd = fd;
+ event->outfd = outfd;
+ event->handler = handler;
+ struct epoll_event evt;
+ evt.data.ptr = (void *)event;
+ evt.events = EPOLLIN;
+ if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) {
+ rexec_err("epoll ctl add fd:%d event failed.", event->fd);
+ free(event);
+ return NULL;
+ }
+ return event;
+}
+
+static int rexec_del_event(struct rexec_client_event *event)
+{
+ // close will del fd in epoll list
+ close(event->fd);
+ free(event);
return 0;
}
@@ -166,48 +230,48 @@ enum {
REPOL_IN_INDEX = 0,
REPOL_OUT_INDEX,
REPOL_ERR_INDEX,
- REPOL_CONN_INDEX,
REPOL_INV_INDEX,
};
-static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *argv[])
+static int rexec_std_event(int efd, int rstdin, int rstdout, int rstderr)
{
- int exit_status = EXIT_FAILURE;
-#define REXEC_MAX_EVENTS 4
- int infds[4] = {STDIN_FILENO, rstdout, rstderr, connfd};
- int outfds[4] = {rstdin, STDOUT_FILENO, STDERR_FILENO, connfd};
+ #define REXEC_MAX_EVENTS 4
+ int infds[REPOL_INV_INDEX] = {STDIN_FILENO, rstdout, rstderr};
+ int outfds[REPOL_INV_INDEX] = {rstdin, STDOUT_FILENO, STDERR_FILENO};
- int efd = epoll_create1(0);
- if (efd == -1) {
- rexec_err("epoll create1 failed, errno:%d.", errno);
- return exit_status;
- }
- struct epoll_event evt;
for (int i = 0; i < REPOL_INV_INDEX; i++) {
- evt.data.u32 = i;
- evt.events = EPOLLIN;
- if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, infds[i], &evt)) {
+ if (NULL == rexec_add_event(efd, infds[i], outfds[i], rexec_io)) {
rexec_err("epoll ctl add fd:%d event failed and ignore this mistake.", infds[i]);
continue;
} else {
if (rexec_set_nonblock(infds[i], 1) != 0) {
rexec_err("rexec set fd:%d i:%d non block failed.", infds[i], i);
- return exit_status;
+ return -1;
}
}
}
+ return 0;
+}
+
+static int rexec_run(int efd, int connfd, char *argv[])
+{
+ int pidfd = -1;
+ int exit_status = EXIT_FAILURE;
+
+ struct rexec_client_event *connevt = rexec_add_event(efd, connfd, -1, rexec_conn_msg);
+ if (NULL == connevt || rexec_set_nonblock(connfd, 1) != 0) {
+ // process will exit, fd or mem resource will free by kernel soon
+ rexec_err("rexec add connfd event failed");
+ return exit_status;
+ }
+ // 这两个指针只能在当前函数上下文使用,是当前函数栈指针
+ connevt->exit_status = &exit_status;
+ connevt->pidfd = &pidfd;
struct epoll_event *evts = calloc(REXEC_MAX_EVENTS, sizeof(struct epoll_event));
if (evts == NULL) {
rexec_err("init calloc evts failed.");
goto end;
}
- int buflen = REXEC_MSG_LEN;
- char *buf = (char *)malloc(buflen);
- int pidfd = -1;
- if (buf == NULL) {
- rexec_err("Rexec malloc failed.");
- goto free_end;
- }
rexec_log("Rexec process start run, as proxy of remote %s", argv[1]);
while (1) {
int n = epoll_wait(efd, evts, REXEC_MAX_EVENTS, 1000);
@@ -219,23 +283,16 @@ static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *arg
continue;
}
for (int i = 0; i < n; i++) {
- int infd = -1;
- int outfd = -1;
- if (evts[i].data.u32 >= REPOL_INV_INDEX) {
- rexec_err("invalid epoll events index data:%d", evts[i].data.u32);
- continue;
+ struct rexec_client_event *evt = (struct rexec_client_event *)evts[i].data.ptr;
+ int ret = evt->handler(evt);
+ if (evts[i].events & EPOLLHUP || ret == REXEC_EVENT_EXIT) {
+ process_exit = 1;
}
- infd = infds[evts[i].data.u32];
- outfd = outfds[evts[i].data.u32];
- if (infd == connfd) {
- if (evts[i].events & EPOLLHUP || rexec_conn_msg(connfd, &exit_status, &pidfd) == -1)
- process_exit = 1;
- } else {
- if (rexec_io(infd, outfd, buf, buflen) == -1) {
- close(infd);
- }
+ if (ret == REXEC_EVENT_DEL) {
+ rexec_del_event(evt);
}
}
+ // process will exit, and free all resource and exit
if (process_exit) {
rexec_log("Rexec process %s exit.", argv[1]);
break;
@@ -250,8 +307,6 @@ static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *arg
remove(path);
}
- free(buf);
-
free_end:
free(evts);
@@ -319,7 +374,7 @@ struct rexec_fdinfo {
int offset;
};
-static inline int rexec_is_reg_file(int fd)
+static inline unsigned int rexec_fd_mode(int fd)
{
struct stat st;
char path[32] = {0};
@@ -327,9 +382,13 @@ static inline int rexec_is_reg_file(int fd)
rexec_err("get fd:%d fstat failed, errno:%d", fd, errno);
return 0;
}
- if (S_ISREG(st.st_mode)) {
+ return st.st_mode;
+}
+
+static inline int rexec_is_reg_file(int fd)
+{
+ if (S_ISREG(rexec_fd_mode(fd)))
return 1;
- }
return 0;
}
@@ -429,16 +488,85 @@ err_end:
return NULL;
}
+static int rexec_handshake_proc(struct rexec_client_event *evt)
+{
+ char msg[sizeof(struct rexec_msg) + 1];
+ struct rexec_msg *hs = msg;
+ int ret = read(evt->fd, hs->msg, 1);
+ if (ret <= 0) {
+ rexec_err("read from handshake pipe failed, ret:%d err:%d", ret, errno);
+ return REXEC_EVENT_DEL;
+ }
+ hs->msgtype = REXEC_HANDSHAKE;
+ hs->msglen = 1;
+ ret = write(evt->outfd, hs, sizeof(struct rexec_msg) + 1);
+ if (ret < 0) {
+ rexec_err("send handshake failed, connfd:%d.", evt->outfd);
+ }
+ return REXEC_EVENT_OK;
+}
+
+static int rexec_handshake_init(int efd, int connfd)
+{
+ char *hs_read = getenv("REXEC_HANDSHAKE_RD");
+ char *hs_write = getenv("REXEC_HANDSHAKE_WR");
+
+ if (hs_read == NULL || hs_write == NULL) {
+ rexec_log("handshake not in effect, read:%lx write%lx", hs_read, hs_write);
+ return 0;
+ }
+ g_rexec.rexec_hs_fd[PIPE_READ] = atoi(hs_read);
+ g_rexec.rexec_hs_fd[PIPE_WRITE] = atoi(hs_write);
+ if (g_rexec.rexec_hs_fd[PIPE_READ] <= STDERR_FILENO || g_rexec.rexec_hs_fd[PIPE_WRITE] <= STDERR_FILENO) {
+ rexec_log("handshake invalid fd read:%d write:%d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ goto err_end;
+ }
+ if (!S_ISFIFO(rexec_fd_mode(g_rexec.rexec_hs_fd[PIPE_READ])) || !S_ISFIFO(rexec_fd_mode(g_rexec.rexec_hs_fd[PIPE_WRITE]))) {
+ rexec_err("handshake fd mode not fifo:%d %d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ goto err_end;
+ }
+ if (rexec_add_event(efd, g_rexec.rexec_hs_fd[PIPE_READ], connfd, rexec_handshake_proc) == NULL) {
+ rexec_err("add handshake pipe read fd:%d to epoll failed", g_rexec.rexec_hs_fd[PIPE_READ]);
+ goto err_end;
+ }
+ rexec_log("handshake effect read:%d write:%d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ return 0;
+err_end:
+ g_rexec.rexec_hs_fd[PIPE_READ] = -1;
+ g_rexec.rexec_hs_fd[PIPE_WRITE] = -1;
+ return -1;
+}
+
+static void rexec_global_var_init()
+{
+ memset(&g_rexec, 0, sizeof(g_rexec));
+ g_rexec.rexec_hs_fd[PIPE_READ] = -1;
+ g_rexec.rexec_hs_fd[PIPE_WRITE] = -1;
+ return;
+}
+
int main(int argc, char *argv[])
{
rexec_log_init();
rexec_clear_pids();
+ int efd = epoll_create1(0);
+ if (efd == -1) {
+ rexec_err("epoll create1 failed, errno:%d.", errno);
+ return -1;
+ }
+ rexec_global_var_init();
+
int connfd = rexec_conn_to_server();
if (connfd < 0) {
rexec_err("Rexec connect to server failed, errno:%d", errno);
return -1;
}
+
+ if (rexec_handshake_init(efd, connfd) != 0) {
+ rexec_err("Rexec handshake environment set but get error.");
+ return -1;
+ }
rexec_log("Remote exec binary:%s", argv[1]);
int arglen = rexec_calc_argv_len(argc - 1, &argv[1]);
@@ -513,7 +641,11 @@ int main(int argc, char *argv[])
close(rstdin[0]);
close(rstdout[1]);
close(rstderr[1]);
- exit_status = rexec_run(rstdin[1], rstdout[0], rstderr[0], connfd, argv);
+ if (rexec_std_event(efd, rstdin[1], rstdout[0], rstderr[0]) != 0) {
+ rexec_err("add std event failed");
+ goto err_end;
+ }
+ exit_status = rexec_run(efd, connfd, argv);
close(rstdin[1]);
close(rstdout[0]);
close(rstderr[0]);
diff --git a/qtfs/rexec/rexec.h b/qtfs/rexec/rexec.h
index ba7c2be..ce1280a 100644
--- a/qtfs/rexec/rexec.h
+++ b/qtfs/rexec/rexec.h
@@ -24,6 +24,13 @@ enum {
PIPE_WRITE,
};
+enum {
+ REXEC_EVENT_OK,
+ REXEC_EVENT_DEL, // del this event
+ REXEC_EVENT_EXIT, // exit process
+ REXEC_EVENT_ERR,
+};
+
enum {
REXEC_STDIN = 0x5a,
REXEC_STDOUT,
@@ -45,6 +52,7 @@ enum rexec_msgtype {
REXEC_KILL, // kill process
REXEC_PIPE, // client send a pipefd as stdin/out/err to server
REXEC_PIDMAP, // server send remote process's pid to client
+ REXEC_HANDSHAKE,
};
struct rexec_msg {
diff --git a/qtfs/rexec/rexec_server.c b/qtfs/rexec/rexec_server.c
index 686c051..2aa3275 100644
--- a/qtfs/rexec/rexec_server.c
+++ b/qtfs/rexec/rexec_server.c
@@ -65,12 +65,6 @@ struct rexec_event {
int (*handler)(struct rexec_event *);
};
-enum {
- REXEC_EVENT_OK,
- REXEC_EVENT_ERR,
- REXEC_EVENT_DEL,
-};
-
static int rexec_add_event(int efd, int fd, int pid, int (*handler)(struct rexec_event *))
{
struct rexec_event *event = (struct rexec_event *)malloc(sizeof(struct rexec_event));
@@ -86,6 +80,7 @@ static int rexec_add_event(int efd, int fd, int pid, int (*handler)(struct rexec
evt.events = EPOLLIN;
if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) {
rexec_err("epoll ctl add fd:%d event failed.", event->fd);
+ free(event);
return -1;
}
return 0;
@@ -136,15 +131,6 @@ static int rexec_event_handshake(struct rexec_event *event)
rexec_log("Rexec recv son pid:%d, connfd:%d", sonpid, connfd);
rexec_hash_insert_direct(child_hash, sonpid, connfd);
-
- struct rexec_msg head;
- head.msgtype = REXEC_PIDMAP;
- head.msglen = 0;
- head.pid = sonpid;
- ret = write(connfd, &head, sizeof(struct rexec_msg));
- if (ret <= 0) {
- rexec_err("Rexec send son pid:%d to client failed, ret:%d errno:%d", sonpid, ret, errno);
- }
rexec_add_event(main_epoll_fd, connfd, sonpid, rexec_event_process_manage);
// 成功后同样要删除这个pipe监听事件删除时会close掉fd
@@ -326,7 +312,7 @@ static int rexec_start_new_process(int newconnfd)
int scmfd = -1;
int len = sizeof(struct rexec_msg);
memset(&head, 0, sizeof(struct rexec_msg));
- int ret = rexec_recvmsg(newconnfd, (char *)&head, len, &scmfd, MSG_WAITALL);
+ ret = rexec_recvmsg(newconnfd, (char *)&head, len, &scmfd, MSG_WAITALL);
if (ret <= 0) {
rexec_log("recvmsg ret:%d, errno:%d", ret, errno);
goto err_to_parent;
@@ -375,14 +361,45 @@ static int rexec_start_new_process(int newconnfd)
goto err_free;
}
+ char *ack;
int mypid = getpid();
+ char msg[sizeof(struct rexec_msg) + 1];
+ struct rexec_msg *pm = msg;
+ pm->msgtype = REXEC_PIDMAP;
+ pm->msglen = 0;
+ pm->pid = mypid;
+ ret = write(newconnfd, pm, sizeof(struct rexec_msg));
+ if (ret <= 0) {
+ rexec_err("Rexec send son pid:%d to client failed, ret:%d errno:%d", mypid, ret, errno);
+ } else {
+retry:
+ rexec_log("Waiting for rexec client handshake...");
+ ret = read(newconnfd, pm, sizeof(struct rexec_msg) + 1);
+ if (ret <= 0) {
+ rexec_err("Recv handshake failed, ret:%d err:%d", ret, errno);
+ goto err_to_parent;
+ }
+ if (pm->msgtype != REXEC_HANDSHAKE) {
+ rexec_err("Recv unexpected msg:%d", pm->msgtype);
+ goto retry;
+ }
+ ack = pm->msg;
+ if (*ack != '1') {
+ rexec_err("recv error handshake ack from client:%c, exit now", *ack);
+ goto err_to_parent;
+ }
+ }
// 写会PID必须放在基于newconnfd接收完所有消息之后
// 后面newconnfd的控制权交回父进程rexec server服务进程
- write(pipefd[PIPE_WRITE], &mypid, sizeof(int));
+ if (write(pipefd[PIPE_WRITE], &mypid, sizeof(int)) <= 0) {
+ rexec_err("write pid to parent failed, pipefd:%d.", pipefd[PIPE_WRITE]);
+ }
// 子进程不再使用pipe write和connfd
close(pipefd[PIPE_WRITE]);
close(newconnfd);
+ rexec_log("handshake over normaly, continue to exec new process:%s.", binary);
+
// rexec_shim_entry argv like:
// argv[0]: binary
// argv[1]: -f
--
2.33.0