From 47fdab7bf180b058f6bbed10dd17e9a4c784eecc Mon Sep 17 00:00:00 2001 From: liqiang Date: Thu, 1 Jun 2023 15:46:05 +0800 Subject: rewrite client rexec_run Signed-off-by: liqiang --- qtfs/rexec/rexec.c | 256 +++++++++++++++++++++++++++++--------- qtfs/rexec/rexec.h | 8 ++ qtfs/rexec/rexec_server.c | 51 +++++--- 3 files changed, 236 insertions(+), 79 deletions(-) diff --git a/qtfs/rexec/rexec.c b/qtfs/rexec/rexec.c index 4dd206d..489ebec 100644 --- a/qtfs/rexec/rexec.c +++ b/qtfs/rexec/rexec.c @@ -42,6 +42,21 @@ #define REXEC_MSG_LEN 1024 FILE *rexec_logfile = NULL; +struct rexec_global_var { + int rexec_hs_fd[2]; +}; + +struct rexec_global_var g_rexec; + + +struct rexec_client_event { + int fd; + int outfd; // for stdin out err and other pipe + int (*handler)(struct rexec_client_event *); + int *exit_status; + int *pidfd; +}; + #define REXEC_PIDMAP_PATH "/var/run/rexec/pids" #define REXEC_PIDMAP_PATH_LEN 64 #define REXEC_PID_LEN 16 @@ -84,37 +99,39 @@ static int rexec_msg_fill_argv(int argc, char *argv[], char *msg) return offset; } -static int rexec_io(int infd, int outfd, char *buf, int buflen) +static int rexec_io(struct rexec_client_event *evt) { +#define MAX_MSG_LEN 256 + char buf[MAX_MSG_LEN]; int len; int ret; - while ((len = read(infd, buf, buflen)) > 0) { - ret = write(outfd, buf, len); + while ((len = read(evt->fd, buf, MAX_MSG_LEN)) > 0) { + ret = write(evt->outfd, buf, len); if (ret <= 0) { - rexec_err("Read from fd:%d len:%d write to fd:%d failed ret:%d", infd, len, outfd, ret); - return -1; + rexec_err("Read from fd:%d len:%d write to fd:%d failed ret:%d", evt->fd, len, evt->outfd, ret); + return REXEC_EVENT_EXIT; } if (ret != len) { - rexec_err("Read from fd:%d len:%d but write to fd:%d ret:%d", infd, len, outfd, ret); + rexec_err("Read from fd:%d len:%d but write to fd:%d ret:%d", evt->fd, len, evt->outfd, ret); } } - return 0; + return REXEC_EVENT_OK; } // return -1 means process exit. -static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd) +static int rexec_conn_msg(struct rexec_client_event *evt) { struct rexec_msg head; - int ret = recv(connfd, &head, sizeof(struct rexec_msg), MSG_WAITALL); + int ret = recv(evt->fd, &head, sizeof(struct rexec_msg), MSG_WAITALL); if (ret <= 0) { rexec_err("Rexec conn recv err:%d errno:%d", ret, errno); - return -1; + return REXEC_EVENT_EXIT; } switch (head.msgtype) { case REXEC_KILL: - *exit_status = head.exit_status; + *evt->exit_status = head.exit_status; rexec_err("Rexec conn recv kill msg, exit:%d now.", head.exit_status); - return -1; + return REXEC_EVENT_EXIT; case REXEC_PIDMAP: { int mypid = getpid(); int peerpid = head.pid; @@ -122,9 +139,9 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd) char buf[REXEC_PID_LEN] = {0}; int fd; int err; - if (*pidfd > 0) { + if (*evt->pidfd > 0) { rexec_err("Rexec pidmap msg > 1 error."); - return 0; + return REXEC_EVENT_OK; } sprintf(path, "%s/%d", REXEC_PIDMAP_PATH, mypid); fd = open(path, O_CREAT|O_WRONLY, 0600); @@ -133,23 +150,41 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd) mypid, peerpid, path, fd); break; } - *pidfd = fd; + *evt->pidfd = fd; if ((err = flock(fd, LOCK_EX)) != 0) { rexec_err("Rexec flock file:%s failed, errno:%d rexec exit.", path, err); - return -1; + return REXEC_EVENT_EXIT; } if ((err = ftruncate(fd, 0)) != 0) { rexec_err("Rexec pidmap file:%s clear failed errno:%d rexec exit.", path, err); - return -1; + return REXEC_EVENT_EXIT; } if ((err = lseek(fd, 0, SEEK_SET)) < 0) { rexec_err("Rexec pidmap file:%s lseek 0 failed errno:%d rexec exit", path, err); - return -1; + return REXEC_EVENT_EXIT; } sprintf(buf, "%d", peerpid); if ((err = write(fd, buf, strlen(buf))) <= 0) { rexec_err("Rexec pidmap file:%s write pid:%d failed errno:%d rexec exit.", path, peerpid, err); - return -1; + return REXEC_EVENT_EXIT; + } + if (g_rexec.rexec_hs_fd[PIPE_WRITE] != -1 && g_rexec.rexec_hs_fd[PIPE_READ] != -1) { + err = write(g_rexec.rexec_hs_fd[PIPE_WRITE], "1", 1); + if (err <= 0) { + rexec_err("rexec handshake write 1 failed, hs write:%d.", g_rexec.rexec_hs_fd[PIPE_WRITE]); + return REXEC_EVENT_ERR; + } + } else { + char msg[sizeof(struct rexec_msg) + 1]; + struct rexec_msg *hs = msg; + char *ok = hs->msg; + hs->msgtype = REXEC_HANDSHAKE; + hs->msglen = 1; + *ok = '1'; + if (write(evt->fd, hs, sizeof(struct rexec_msg) + 1) <= 0) { + rexec_err("send handshake failed, remote process will die"); + return REXEC_EVENT_EXIT; + } } break; } @@ -159,6 +194,35 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd) rexec_log("Rexec conn recv msgtype:%d argc:%d stdno:%d msglen:%d", head.msgtype, head.argc, head.stdno, head.msglen); + return REXEC_EVENT_OK; +} + +static struct rexec_client_event *rexec_add_event(int efd, int fd, int outfd, int (*handler)(struct rexec_client_event *)) +{ + struct rexec_client_event *event = (struct rexec_client_event *)malloc(sizeof(struct rexec_client_event)); + if (event == NULL) { + rexec_err("malloc failed."); + return NULL; + } + event->fd = fd; + event->outfd = outfd; + event->handler = handler; + struct epoll_event evt; + evt.data.ptr = (void *)event; + evt.events = EPOLLIN; + if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) { + rexec_err("epoll ctl add fd:%d event failed.", event->fd); + free(event); + return NULL; + } + return event; +} + +static int rexec_del_event(struct rexec_client_event *event) +{ + // close will del fd in epoll list + close(event->fd); + free(event); return 0; } @@ -166,48 +230,48 @@ enum { REPOL_IN_INDEX = 0, REPOL_OUT_INDEX, REPOL_ERR_INDEX, - REPOL_CONN_INDEX, REPOL_INV_INDEX, }; -static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *argv[]) +static int rexec_std_event(int efd, int rstdin, int rstdout, int rstderr) { - int exit_status = EXIT_FAILURE; -#define REXEC_MAX_EVENTS 4 - int infds[4] = {STDIN_FILENO, rstdout, rstderr, connfd}; - int outfds[4] = {rstdin, STDOUT_FILENO, STDERR_FILENO, connfd}; + #define REXEC_MAX_EVENTS 4 + int infds[REPOL_INV_INDEX] = {STDIN_FILENO, rstdout, rstderr}; + int outfds[REPOL_INV_INDEX] = {rstdin, STDOUT_FILENO, STDERR_FILENO}; - int efd = epoll_create1(0); - if (efd == -1) { - rexec_err("epoll create1 failed, errno:%d.", errno); - return exit_status; - } - struct epoll_event evt; for (int i = 0; i < REPOL_INV_INDEX; i++) { - evt.data.u32 = i; - evt.events = EPOLLIN; - if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, infds[i], &evt)) { + if (NULL == rexec_add_event(efd, infds[i], outfds[i], rexec_io)) { rexec_err("epoll ctl add fd:%d event failed and ignore this mistake.", infds[i]); continue; } else { if (rexec_set_nonblock(infds[i], 1) != 0) { rexec_err("rexec set fd:%d i:%d non block failed.", infds[i], i); - return exit_status; + return -1; } } } + return 0; +} + +static int rexec_run(int efd, int connfd, char *argv[]) +{ + int pidfd = -1; + int exit_status = EXIT_FAILURE; + + struct rexec_client_event *connevt = rexec_add_event(efd, connfd, -1, rexec_conn_msg); + if (NULL == connevt || rexec_set_nonblock(connfd, 1) != 0) { + // process will exit, fd or mem resource will free by kernel soon + rexec_err("rexec add connfd event failed"); + return exit_status; + } + // 这两个指针只能在当前函数上下文使用,是当前函数栈指针 + connevt->exit_status = &exit_status; + connevt->pidfd = &pidfd; struct epoll_event *evts = calloc(REXEC_MAX_EVENTS, sizeof(struct epoll_event)); if (evts == NULL) { rexec_err("init calloc evts failed."); goto end; } - int buflen = REXEC_MSG_LEN; - char *buf = (char *)malloc(buflen); - int pidfd = -1; - if (buf == NULL) { - rexec_err("Rexec malloc failed."); - goto free_end; - } rexec_log("Rexec process start run, as proxy of remote %s", argv[1]); while (1) { int n = epoll_wait(efd, evts, REXEC_MAX_EVENTS, 1000); @@ -219,23 +283,16 @@ static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *arg continue; } for (int i = 0; i < n; i++) { - int infd = -1; - int outfd = -1; - if (evts[i].data.u32 >= REPOL_INV_INDEX) { - rexec_err("invalid epoll events index data:%d", evts[i].data.u32); - continue; + struct rexec_client_event *evt = (struct rexec_client_event *)evts[i].data.ptr; + int ret = evt->handler(evt); + if (evts[i].events & EPOLLHUP || ret == REXEC_EVENT_EXIT) { + process_exit = 1; } - infd = infds[evts[i].data.u32]; - outfd = outfds[evts[i].data.u32]; - if (infd == connfd) { - if (evts[i].events & EPOLLHUP || rexec_conn_msg(connfd, &exit_status, &pidfd) == -1) - process_exit = 1; - } else { - if (rexec_io(infd, outfd, buf, buflen) == -1) { - close(infd); - } + if (ret == REXEC_EVENT_DEL) { + rexec_del_event(evt); } } + // process will exit, and free all resource and exit if (process_exit) { rexec_log("Rexec process %s exit.", argv[1]); break; @@ -250,8 +307,6 @@ static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *arg remove(path); } - free(buf); - free_end: free(evts); @@ -319,7 +374,7 @@ struct rexec_fdinfo { int offset; }; -static inline int rexec_is_reg_file(int fd) +static inline unsigned int rexec_fd_mode(int fd) { struct stat st; char path[32] = {0}; @@ -327,9 +382,13 @@ static inline int rexec_is_reg_file(int fd) rexec_err("get fd:%d fstat failed, errno:%d", fd, errno); return 0; } - if (S_ISREG(st.st_mode)) { + return st.st_mode; +} + +static inline int rexec_is_reg_file(int fd) +{ + if (S_ISREG(rexec_fd_mode(fd))) return 1; - } return 0; } @@ -429,16 +488,85 @@ err_end: return NULL; } +static int rexec_handshake_proc(struct rexec_client_event *evt) +{ + char msg[sizeof(struct rexec_msg) + 1]; + struct rexec_msg *hs = msg; + int ret = read(evt->fd, hs->msg, 1); + if (ret <= 0) { + rexec_err("read from handshake pipe failed, ret:%d err:%d", ret, errno); + return REXEC_EVENT_DEL; + } + hs->msgtype = REXEC_HANDSHAKE; + hs->msglen = 1; + ret = write(evt->outfd, hs, sizeof(struct rexec_msg) + 1); + if (ret < 0) { + rexec_err("send handshake failed, connfd:%d.", evt->outfd); + } + return REXEC_EVENT_OK; +} + +static int rexec_handshake_init(int efd, int connfd) +{ + char *hs_read = getenv("REXEC_HANDSHAKE_RD"); + char *hs_write = getenv("REXEC_HANDSHAKE_WR"); + + if (hs_read == NULL || hs_write == NULL) { + rexec_log("handshake not in effect, read:%lx write%lx", hs_read, hs_write); + return 0; + } + g_rexec.rexec_hs_fd[PIPE_READ] = atoi(hs_read); + g_rexec.rexec_hs_fd[PIPE_WRITE] = atoi(hs_write); + if (g_rexec.rexec_hs_fd[PIPE_READ] <= STDERR_FILENO || g_rexec.rexec_hs_fd[PIPE_WRITE] <= STDERR_FILENO) { + rexec_log("handshake invalid fd read:%d write:%d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]); + goto err_end; + } + if (!S_ISFIFO(rexec_fd_mode(g_rexec.rexec_hs_fd[PIPE_READ])) || !S_ISFIFO(rexec_fd_mode(g_rexec.rexec_hs_fd[PIPE_WRITE]))) { + rexec_err("handshake fd mode not fifo:%d %d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]); + goto err_end; + } + if (rexec_add_event(efd, g_rexec.rexec_hs_fd[PIPE_READ], connfd, rexec_handshake_proc) == NULL) { + rexec_err("add handshake pipe read fd:%d to epoll failed", g_rexec.rexec_hs_fd[PIPE_READ]); + goto err_end; + } + rexec_log("handshake effect read:%d write:%d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]); + return 0; +err_end: + g_rexec.rexec_hs_fd[PIPE_READ] = -1; + g_rexec.rexec_hs_fd[PIPE_WRITE] = -1; + return -1; +} + +static void rexec_global_var_init() +{ + memset(&g_rexec, 0, sizeof(g_rexec)); + g_rexec.rexec_hs_fd[PIPE_READ] = -1; + g_rexec.rexec_hs_fd[PIPE_WRITE] = -1; + return; +} + int main(int argc, char *argv[]) { rexec_log_init(); rexec_clear_pids(); + int efd = epoll_create1(0); + if (efd == -1) { + rexec_err("epoll create1 failed, errno:%d.", errno); + return -1; + } + rexec_global_var_init(); + int connfd = rexec_conn_to_server(); if (connfd < 0) { rexec_err("Rexec connect to server failed, errno:%d", errno); return -1; } + + if (rexec_handshake_init(efd, connfd) != 0) { + rexec_err("Rexec handshake environment set but get error."); + return -1; + } rexec_log("Remote exec binary:%s", argv[1]); int arglen = rexec_calc_argv_len(argc - 1, &argv[1]); @@ -513,7 +641,11 @@ int main(int argc, char *argv[]) close(rstdin[0]); close(rstdout[1]); close(rstderr[1]); - exit_status = rexec_run(rstdin[1], rstdout[0], rstderr[0], connfd, argv); + if (rexec_std_event(efd, rstdin[1], rstdout[0], rstderr[0]) != 0) { + rexec_err("add std event failed"); + goto err_end; + } + exit_status = rexec_run(efd, connfd, argv); close(rstdin[1]); close(rstdout[0]); close(rstderr[0]); diff --git a/qtfs/rexec/rexec.h b/qtfs/rexec/rexec.h index ba7c2be..ce1280a 100644 --- a/qtfs/rexec/rexec.h +++ b/qtfs/rexec/rexec.h @@ -24,6 +24,13 @@ enum { PIPE_WRITE, }; +enum { + REXEC_EVENT_OK, + REXEC_EVENT_DEL, // del this event + REXEC_EVENT_EXIT, // exit process + REXEC_EVENT_ERR, +}; + enum { REXEC_STDIN = 0x5a, REXEC_STDOUT, @@ -45,6 +52,7 @@ enum rexec_msgtype { REXEC_KILL, // kill process REXEC_PIPE, // client send a pipefd as stdin/out/err to server REXEC_PIDMAP, // server send remote process's pid to client + REXEC_HANDSHAKE, }; struct rexec_msg { diff --git a/qtfs/rexec/rexec_server.c b/qtfs/rexec/rexec_server.c index 686c051..2aa3275 100644 --- a/qtfs/rexec/rexec_server.c +++ b/qtfs/rexec/rexec_server.c @@ -65,12 +65,6 @@ struct rexec_event { int (*handler)(struct rexec_event *); }; -enum { - REXEC_EVENT_OK, - REXEC_EVENT_ERR, - REXEC_EVENT_DEL, -}; - static int rexec_add_event(int efd, int fd, int pid, int (*handler)(struct rexec_event *)) { struct rexec_event *event = (struct rexec_event *)malloc(sizeof(struct rexec_event)); @@ -86,6 +80,7 @@ static int rexec_add_event(int efd, int fd, int pid, int (*handler)(struct rexec evt.events = EPOLLIN; if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) { rexec_err("epoll ctl add fd:%d event failed.", event->fd); + free(event); return -1; } return 0; @@ -136,15 +131,6 @@ static int rexec_event_handshake(struct rexec_event *event) rexec_log("Rexec recv son pid:%d, connfd:%d", sonpid, connfd); rexec_hash_insert_direct(child_hash, sonpid, connfd); - - struct rexec_msg head; - head.msgtype = REXEC_PIDMAP; - head.msglen = 0; - head.pid = sonpid; - ret = write(connfd, &head, sizeof(struct rexec_msg)); - if (ret <= 0) { - rexec_err("Rexec send son pid:%d to client failed, ret:%d errno:%d", sonpid, ret, errno); - } rexec_add_event(main_epoll_fd, connfd, sonpid, rexec_event_process_manage); // 成功后同样要删除这个pipe监听事件,删除时会close掉fd @@ -326,7 +312,7 @@ static int rexec_start_new_process(int newconnfd) int scmfd = -1; int len = sizeof(struct rexec_msg); memset(&head, 0, sizeof(struct rexec_msg)); - int ret = rexec_recvmsg(newconnfd, (char *)&head, len, &scmfd, MSG_WAITALL); + ret = rexec_recvmsg(newconnfd, (char *)&head, len, &scmfd, MSG_WAITALL); if (ret <= 0) { rexec_log("recvmsg ret:%d, errno:%d", ret, errno); goto err_to_parent; @@ -375,14 +361,45 @@ static int rexec_start_new_process(int newconnfd) goto err_free; } + char *ack; int mypid = getpid(); + char msg[sizeof(struct rexec_msg) + 1]; + struct rexec_msg *pm = msg; + pm->msgtype = REXEC_PIDMAP; + pm->msglen = 0; + pm->pid = mypid; + ret = write(newconnfd, pm, sizeof(struct rexec_msg)); + if (ret <= 0) { + rexec_err("Rexec send son pid:%d to client failed, ret:%d errno:%d", mypid, ret, errno); + } else { +retry: + rexec_log("Waiting for rexec client handshake..."); + ret = read(newconnfd, pm, sizeof(struct rexec_msg) + 1); + if (ret <= 0) { + rexec_err("Recv handshake failed, ret:%d err:%d", ret, errno); + goto err_to_parent; + } + if (pm->msgtype != REXEC_HANDSHAKE) { + rexec_err("Recv unexpected msg:%d", pm->msgtype); + goto retry; + } + ack = pm->msg; + if (*ack != '1') { + rexec_err("recv error handshake ack from client:%c, exit now", *ack); + goto err_to_parent; + } + } // 写会PID必须放在基于newconnfd接收完所有消息之后, // 后面newconnfd的控制权交回父进程rexec server服务进程 - write(pipefd[PIPE_WRITE], &mypid, sizeof(int)); + if (write(pipefd[PIPE_WRITE], &mypid, sizeof(int)) <= 0) { + rexec_err("write pid to parent failed, pipefd:%d.", pipefd[PIPE_WRITE]); + } // 子进程不再使用pipe write和connfd close(pipefd[PIPE_WRITE]); close(newconnfd); + rexec_log("handshake over normaly, continue to exec new process:%s.", binary); + // rexec_shim_entry argv like: // argv[0]: binary // argv[1]: -f -- 2.33.0