sysmaster/backport-fix-init-does-not-enter-the-freeze-during-normal-run.patch
huyubiao 8936fa02c5 sync patches from upstream,change the path of the unit,modify permissions for some directories and files
(cherry picked from commit ce9ff469b57f60130621bc293783bd3ac1fc92f2)
2023-08-05 18:15:53 +08:00

776 lines
25 KiB
Diff

From 2c6d13f3fac35b901b39a851eea2662ae6d02bf1 Mon Sep 17 00:00:00 2001
From: huyubiao <h13958451065@163.com>
Date: Thu, 29 Jun 2023 21:40:53 +0800
Subject: [PATCH] fix: init does not enter the freeze during normal running
---
core/bin/unit/notify.rs | 4 +-
exts/init/src/main.rs | 21 ++--
exts/init/src/runtime/comm.rs | 163 ++++++++++++++++++++-----------
exts/init/src/runtime/epoll.rs | 14 ++-
exts/init/src/runtime/mod.rs | 51 ++++------
exts/init/src/runtime/signals.rs | 95 ++++++++----------
exts/init/src/runtime/timer.rs | 58 ++++++++---
libs/constants/src/lib.rs | 3 +
8 files changed, 226 insertions(+), 183 deletions(-)
diff --git a/core/bin/unit/notify.rs b/core/bin/unit/notify.rs
index f824725..d23a725 100644
--- a/core/bin/unit/notify.rs
+++ b/core/bin/unit/notify.rs
@@ -109,7 +109,7 @@ impl NotifyManager {
}
}
-const NOTIFY_INVALID_FD: i32 = -1;
+use constants::INVALID_FD;
const NOTIFY_INVALID_PID: libc::pid_t = -1;
struct Notify {
@@ -135,7 +135,7 @@ impl Notify {
rentry: Rc::clone(rentryr),
db: Rc::clone(dbr),
config: Rc::clone(configr),
- fd: RefCell::new(NOTIFY_INVALID_FD),
+ fd: RefCell::new(INVALID_FD),
}
}
diff --git a/exts/init/src/main.rs b/exts/init/src/main.rs
index 587077e..fbd77b9 100644
--- a/exts/init/src/main.rs
+++ b/exts/init/src/main.rs
@@ -23,21 +23,14 @@ fn main() {
// Run: Monitor the sysmaster's liveliness and acceptance of message.
// Unrecover: On-site problem collection or recreate new sysmaster.
match RunTime::new(cmd) {
- Ok(mut run_time) => {
- loop {
- let state = run_time.state();
- let ret = match state {
- InitState::Reexec => run_time.reexec(),
- InitState::Run => run_time.run(),
- InitState::Unrecover => run_time.unrecover(),
- };
- if let Err(err) = ret {
- eprintln!("Failed to {:?}:{:?} ", state, err);
- break;
- }
+ Ok(mut run_time) => loop {
+ let state = run_time.state();
+ match state {
+ InitState::Reexec => run_time.reexec(),
+ InitState::Run => run_time.run(),
+ InitState::Unrecover => run_time.unrecover(),
}
- run_time.clear();
- }
+ },
Err(err) => eprintln!(
"Failed to new init, it may be necessary to run it as root :{:?}",
err
diff --git a/exts/init/src/runtime/comm.rs b/exts/init/src/runtime/comm.rs
index 1a0d0ca..8f71511 100644
--- a/exts/init/src/runtime/comm.rs
+++ b/exts/init/src/runtime/comm.rs
@@ -14,18 +14,18 @@ use super::epoll::Epoll;
use super::timer::Timer;
use nix::errno::Errno;
use nix::sys::epoll::EpollEvent;
+use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor};
use nix::sys::socket::{self, AddressFamily, SockFlag, SockType, UnixAddr};
use nix::unistd;
+use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::rc::Rc;
-use std::str;
-use std::{fs, path::PathBuf};
+use std::{fs, path::PathBuf, str};
const LISTEN_BACKLOG: usize = 10;
-const INVALID_FD: i32 = -1;
const ACCEPT_COUNT: i32 = 3;
const BUF_SIZE: usize = 16; //The communication string length is fixed to 16 characters.
-use constants::{ALIVE, INIT_SOCKET};
+use constants::{ALIVE, INIT_SOCKET, INVALID_FD};
pub struct Comm {
epoll: Rc<Epoll>,
@@ -33,6 +33,8 @@ pub struct Comm {
connect_fd: RawFd,
online_fd: RawFd, // Specsify either listen_fd or connect_fd.
timer: Timer,
+ inotify: Inotify,
+ wd: WatchDescriptor,
}
#[derive(PartialEq, Eq)]
@@ -47,28 +49,7 @@ impl Comm {
pub fn new(epoll: &Rc<Epoll>, time_wait: i64, time_cnt: i64) -> Result<Comm, Errno> {
let timer = Timer::new(epoll, time_wait, time_cnt)?;
- let sock_path = PathBuf::from(INIT_SOCKET);
- let listen_fd = socket::socket(
- AddressFamily::Unix,
- SockType::Stream,
- SockFlag::SOCK_CLOEXEC,
- None,
- )?;
-
- let parent_path = sock_path.as_path().parent();
- match parent_path {
- Some(path) => fs::create_dir_all(path)
- .map_err(|e| Errno::from_i32(e.raw_os_error().unwrap_or(Errno::EINVAL as i32)))?,
- None => return Err(Errno::EINVAL),
- }
-
- if let Err(e) = unistd::unlink(&sock_path) {
- eprintln!("Failed to unlink path:{:?}, error:{}", sock_path, e);
- }
-
- let addr = UnixAddr::new(&sock_path)?;
- socket::bind(listen_fd, &addr)?;
- socket::listen(listen_fd, LISTEN_BACKLOG)?;
+ let (listen_fd, inotify, wd) = create_listen_fd(epoll)?;
let mut comm = Comm {
epoll: epoll.clone(),
@@ -76,51 +57,50 @@ impl Comm {
connect_fd: INVALID_FD,
online_fd: INVALID_FD,
timer,
+ inotify,
+ wd,
};
comm.set_online_fd(comm.listen_fd)?;
-
Ok(comm)
}
pub fn is_fd(&self, fd: RawFd) -> bool {
- if fd == self.online_fd || fd == self.timer.fd() {
- return true;
- }
- false
+ fd == self.online_fd || fd == self.timer.fd() || fd == self.inotify.as_raw_fd()
}
- pub fn proc(&mut self, event: EpollEvent) -> Result<CommType, Errno> {
+ pub fn proc(&mut self, event: EpollEvent) -> CommType {
if self.timer.fd() as u64 == event.data() {
- if self.timer.is_time_out(event)? {
- return Ok(CommType::PipTMOUT);
+ if self.timer.is_time_out(event) {
+ return CommType::PipTMOUT;
}
- return Ok(CommType::Succeed);
+ return CommType::Succeed;
+ }
+
+ if self.inotify.as_raw_fd() as u64 == event.data() {
+ // Dont self.inotify.read_events(), because if recover fails, event can be retrieved to recover again.
+ return self.recover();
+ }
+
+ // When the program runs normally, listen_fd will not be closed,
+ // but connect_fd will be closed during listening.
+ if self.listen_fd as u64 == event.data() && self.epoll.is_err(event) {
+ return self.recover();
}
- // When online_fd fails, ensure that connect_fd is closed, and then execute again after timeout.
if self.online_fd as u64 == event.data() {
match self.online_fd {
- x if x == self.listen_fd => return self.listen_proc(event),
+ x if x == self.listen_fd => return self.listen_proc(),
x if x == self.connect_fd => return self.connect_proc(event),
_ => {}
}
}
- Ok(CommType::Succeed)
+ CommType::Succeed
}
pub fn finish(&mut self) {
_ = self.set_online_fd(self.listen_fd);
- }
-
- pub fn clear(&mut self) {
- self.epoll.safe_close(self.listen_fd);
- self.listen_fd = INVALID_FD;
-
- self.epoll.safe_close(self.connect_fd);
- self.connect_fd = INVALID_FD;
-
- self.timer.clear();
+ self.timer.reset();
}
fn connect(&mut self) -> Result<(), Errno> {
@@ -223,21 +203,18 @@ impl Comm {
Ok(())
}
- fn listen_proc(&mut self, event: EpollEvent) -> Result<CommType, Errno> {
- if self.epoll.is_err(event) {
- return Err(Errno::EINVAL);
- }
+ fn listen_proc(&mut self) -> CommType {
if self.connect().is_err() {
- return Ok(CommType::PipOFF);
+ return CommType::PipOFF;
}
self.timer.reset();
- Ok(CommType::PipON)
+ CommType::PipON
}
- fn connect_proc(&mut self, event: EpollEvent) -> Result<CommType, Errno> {
+ fn connect_proc(&mut self, event: EpollEvent) -> CommType {
if self.epoll.is_err(event) {
_ = self.set_online_fd(self.listen_fd);
- return Ok(CommType::PipOFF);
+ return CommType::PipOFF;
}
match self.recv_msg() {
Ok(buf) => {
@@ -252,6 +229,78 @@ impl Comm {
unistd::sleep(1);
}
}
- Ok(CommType::Succeed)
+ CommType::Succeed
}
+
+ fn recover(&mut self) -> CommType {
+ match create_listen_fd(&self.epoll) {
+ Ok((listen_fd, inotify, wd)) => {
+ self.epoll.safe_close(self.listen_fd);
+ self.epoll.safe_close(self.inotify.as_raw_fd());
+ self.listen_fd = listen_fd;
+ self.inotify = inotify;
+ self.wd = wd;
+ eprintln!("comm recover");
+ if self.online_fd == self.connect_fd {
+ // The socket file(INIT_SOCKET) cannot be used when connecting,
+ // so recreate the socket file(INIT_SOCKET) and return success.
+ return CommType::Succeed;
+ } else {
+ // If init is in the listening state,
+ // the sysmaster cannot be connected through the old socket file(INIT_SOCKET) at this time,
+ // we must recreate the socket file and then reexec the sysmaster.
+ return CommType::PipTMOUT;
+ }
+ }
+ Err(e) => {
+ eprintln!("Failed to create_listen_fd:{:?}", e);
+ }
+ }
+ CommType::Succeed
+ }
+}
+
+impl Drop for Comm {
+ fn drop(&mut self) {
+ self.epoll.safe_close(self.listen_fd);
+ self.listen_fd = INVALID_FD;
+
+ self.epoll.safe_close(self.connect_fd);
+ self.connect_fd = INVALID_FD;
+
+ let _ = self.inotify.rm_watch(self.wd);
+ self.epoll.safe_close(self.inotify.as_raw_fd());
+ }
+}
+
+fn create_listen_fd(epoll: &Rc<Epoll>) -> Result<(i32, Inotify, WatchDescriptor), Errno> {
+ let listen_fd = socket::socket(
+ AddressFamily::Unix,
+ SockType::Stream,
+ SockFlag::SOCK_CLOEXEC,
+ None,
+ )?;
+
+ let sock_path = PathBuf::from(INIT_SOCKET);
+ let parent_path = sock_path.as_path().parent();
+ match parent_path {
+ Some(path) => fs::create_dir_all(path)
+ .map_err(|e| Errno::from_i32(e.raw_os_error().unwrap_or(Errno::EINVAL as i32)))?,
+ None => return Err(Errno::EINVAL),
+ }
+
+ if let Err(e) = unistd::unlink(&sock_path) {
+ eprintln!("Failed to unlink path:{:?}, error:{}", sock_path, e);
+ }
+
+ let addr = UnixAddr::new(&sock_path)?;
+ socket::bind(listen_fd, &addr)?;
+ socket::listen(listen_fd, LISTEN_BACKLOG)?;
+
+ let inotify = Inotify::init(InitFlags::all())?;
+
+ let wd = inotify.add_watch(INIT_SOCKET, AddWatchFlags::IN_ALL_EVENTS)?;
+
+ epoll.register(inotify.as_raw_fd())?;
+ Ok((listen_fd, inotify, wd))
}
diff --git a/exts/init/src/runtime/epoll.rs b/exts/init/src/runtime/epoll.rs
index ad7475f..6c21bbc 100644
--- a/exts/init/src/runtime/epoll.rs
+++ b/exts/init/src/runtime/epoll.rs
@@ -10,6 +10,7 @@
// NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.
+use constants::INVALID_FD;
use nix::errno::Errno;
use nix::sys::epoll::{self, EpollEvent, EpollFlags, EpollOp};
use nix::unistd;
@@ -21,7 +22,7 @@ pub struct Epoll {
impl Epoll {
pub(crate) fn new() -> Result<Epoll, Errno> {
- let epoll_fd = epoll::epoll_create1(epoll::EpollCreateFlags::empty())?;
+ let epoll_fd = epoll::epoll_create1(epoll::EpollCreateFlags::EPOLL_CLOEXEC)?;
Ok(Epoll { epoll_fd })
}
@@ -61,10 +62,6 @@ impl Epoll {
false
}
- pub(crate) fn clear(&self) {
- self.safe_close(self.epoll_fd);
- }
-
pub(crate) fn safe_close(&self, fd: RawFd) {
if fd <= 0 {
return;
@@ -75,3 +72,10 @@ impl Epoll {
};
}
}
+
+impl Drop for Epoll {
+ fn drop(&mut self) {
+ self.safe_close(self.epoll_fd);
+ self.epoll_fd = INVALID_FD;
+ }
+}
diff --git a/exts/init/src/runtime/mod.rs b/exts/init/src/runtime/mod.rs
index d11d8f2..e81a513 100644
--- a/exts/init/src/runtime/mod.rs
+++ b/exts/init/src/runtime/mod.rs
@@ -81,7 +81,7 @@ impl RunTime {
self.state
}
- pub fn reexec(&mut self) -> Result<(), Errno> {
+ pub fn reexec(&mut self) {
if self.need_reexec {
self.reexec_manager();
}
@@ -89,43 +89,33 @@ impl RunTime {
let event = self.epoll.wait_one();
let fd = event.data() as RawFd;
match fd {
- _x if self.comm.is_fd(fd) => self.reexec_comm_dispatch(event)?,
- _x if self.signals.is_fd(fd) => self.reexec_signal_dispatch(event)?,
+ _x if self.comm.is_fd(fd) => self.reexec_comm_dispatch(event),
+ _x if self.signals.is_fd(fd) => self.reexec_signal_dispatch(event),
_ => self.epoll.safe_close(fd),
}
- Ok(())
}
- pub fn run(&mut self) -> Result<(), Errno> {
+ pub fn run(&mut self) {
let event = self.epoll.wait_one();
let fd = event.data() as RawFd;
match fd {
- _x if self.comm.is_fd(fd) => self.run_comm_dispatch(event)?,
- _x if self.signals.is_fd(fd) => self.run_signal_dispatch(event)?,
+ _x if self.comm.is_fd(fd) => self.run_comm_dispatch(event),
+ _x if self.signals.is_fd(fd) => self.run_signal_dispatch(event),
_ => self.epoll.safe_close(fd),
}
- Ok(())
}
- pub fn unrecover(&mut self) -> Result<(), Errno> {
+ pub fn unrecover(&mut self) {
let event = self.epoll.wait_one();
let fd = event.data() as RawFd;
match fd {
_x if self.comm.is_fd(fd) => self.unrecover_comm_dispatch(event),
- _x if self.signals.is_fd(fd) => self.unrecover_signal_dispatch(event)?,
+ _x if self.signals.is_fd(fd) => self.unrecover_signal_dispatch(event),
_ => self.epoll.safe_close(fd),
}
- Ok(())
- }
-
- pub fn clear(&mut self) {
- self.comm.clear();
- self.signals.clear();
- self.epoll.clear();
}
pub fn reexec_self(&mut self) {
- self.clear();
let mut args = Vec::new();
let mut init_path = CString::new("/usr/bin/init").unwrap();
if let Some(str) = std::env::args().next() {
@@ -154,17 +144,16 @@ impl RunTime {
unsafe { libc::kill(self.sysmaster_pid.into(), libc::SIGABRT) };
}
- fn reexec_comm_dispatch(&mut self, event: EpollEvent) -> Result<(), Errno> {
- match self.comm.proc(event)? {
+ fn reexec_comm_dispatch(&mut self, event: EpollEvent) {
+ match self.comm.proc(event) {
CommType::PipON => self.state = InitState::Run,
CommType::PipTMOUT => self.need_reexec = true,
_ => {}
}
- Ok(())
}
- fn run_comm_dispatch(&mut self, event: EpollEvent) -> Result<(), Errno> {
- match self.comm.proc(event)? {
+ fn run_comm_dispatch(&mut self, event: EpollEvent) {
+ match self.comm.proc(event) {
CommType::PipOFF => self.state = InitState::Reexec,
CommType::PipTMOUT => {
self.state = InitState::Reexec;
@@ -172,15 +161,14 @@ impl RunTime {
}
_ => {}
}
- Ok(())
}
fn unrecover_comm_dispatch(&mut self, event: EpollEvent) {
_ = self.comm.proc(event);
}
- fn reexec_signal_dispatch(&mut self, event: EpollEvent) -> Result<(), Errno> {
- if let Some(siginfo) = self.signals.read(event)? {
+ fn reexec_signal_dispatch(&mut self, event: EpollEvent) {
+ if let Some(siginfo) = self.signals.read(event) {
match siginfo {
_x if self.signals.is_zombie(siginfo) => self.do_recycle(),
_x if self.signals.is_restart(siginfo) => self.do_reexec(),
@@ -188,11 +176,10 @@ impl RunTime {
_ => {}
}
}
- Ok(())
}
- fn run_signal_dispatch(&mut self, event: EpollEvent) -> Result<(), Errno> {
- if let Some(siginfo) = self.signals.read(event)? {
+ fn run_signal_dispatch(&mut self, event: EpollEvent) {
+ if let Some(siginfo) = self.signals.read(event) {
match siginfo {
_x if self.signals.is_zombie(siginfo) => self.do_recycle(),
_x if self.signals.is_restart(siginfo) => self.do_reexec(),
@@ -200,11 +187,10 @@ impl RunTime {
_ => {}
}
}
- Ok(())
}
- fn unrecover_signal_dispatch(&mut self, event: EpollEvent) -> Result<(), Errno> {
- if let Some(siginfo) = self.signals.read(event)? {
+ fn unrecover_signal_dispatch(&mut self, event: EpollEvent) {
+ if let Some(siginfo) = self.signals.read(event) {
match siginfo {
_x if self.signals.is_zombie(siginfo) => {
self.signals.recycle_zombie();
@@ -216,7 +202,6 @@ impl RunTime {
_ => {}
}
}
- Ok(())
}
fn change_to_unrecover(&mut self) {
diff --git a/exts/init/src/runtime/signals.rs b/exts/init/src/runtime/signals.rs
index 1c3059c..327ff3e 100644
--- a/exts/init/src/runtime/signals.rs
+++ b/exts/init/src/runtime/signals.rs
@@ -13,7 +13,8 @@
use super::epoll::Epoll;
use nix::errno::Errno;
use nix::sys::epoll::EpollEvent;
-use nix::sys::signal::{SigmaskHow, Signal};
+use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
+use nix::sys::signalfd::{self, SfdFlags};
use nix::sys::wait::{self, Id, WaitPidFlag, WaitStatus};
use nix::unistd;
use std::mem;
@@ -21,42 +22,12 @@ use std::ops::Neg;
use std::os::unix::io::RawFd;
use std::rc::Rc;
+use constants::INVALID_FD;
use constants::{SIG_RESTART_MANAGER_OFFSET, SIG_RUN_UNRECOVER_OFFSET, SIG_SWITCH_ROOT_OFFSET};
-const INVALID_FD: i32 = -1;
-
-pub(crate) struct SigSet {
- sigset: libc::sigset_t,
-}
-
-impl SigSet {
- /// Initialize to include nothing.
- pub fn empty() -> SigSet {
- let mut sigset = mem::MaybeUninit::zeroed();
- let _ = unsafe { libc::sigemptyset(sigset.as_mut_ptr()) };
-
- unsafe {
- SigSet {
- sigset: sigset.assume_init(),
- }
- }
- }
-
- /// Add the specified signal to the set.
- pub fn add(&mut self, signal: libc::c_int) {
- unsafe {
- libc::sigaddset(
- &mut self.sigset as *mut libc::sigset_t,
- signal as libc::c_int,
- )
- };
- }
-}
pub struct Signals {
epoll: Rc<Epoll>,
- signal_fd: RawFd,
- set: SigSet,
- oldset: SigSet,
+ pub signal_fd: RawFd,
signals: Vec<i32>,
zombie_signal: i32,
restart_signal: i32,
@@ -71,8 +42,6 @@ impl Signals {
Signals {
epoll: epoll.clone(),
signal_fd: INVALID_FD,
- set: SigSet::empty(),
- oldset: SigSet::empty(),
signals,
zombie_signal: libc::SIGCHLD,
unrecover_signal: libc::SIGRTMIN() + SIG_RUN_UNRECOVER_OFFSET,
@@ -101,29 +70,25 @@ impl Signals {
}
pub fn create_signals_epoll(&mut self) -> Result<(), Errno> {
- self.reset_sigset();
+ self.signal_fd = self.reset_sigset()?;
self.epoll.register(self.signal_fd)?;
Ok(())
}
- pub fn reset_sigset(&mut self) {
+ pub fn reset_sigset(&mut self) -> Result<RawFd, Errno> {
+ let mut sigset = SigSet::empty();
for sig in self.signals.clone() {
- self.set.add(sig);
- }
-
- unsafe {
- libc::pthread_sigmask(libc::SIG_BLOCK, &self.set.sigset, &mut self.oldset.sigset);
- self.signal_fd = libc::signalfd(
- -1,
- &mut self.set.sigset as *const libc::sigset_t,
- libc::SFD_NONBLOCK,
- );
+ let signum: Signal = unsafe { std::mem::transmute(sig) };
+ sigset.add(signum);
}
+ let mut oldset = SigSet::empty();
+ signal::pthread_sigmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut oldset))?;
+ signalfd::signalfd(-1, &sigset, SfdFlags::SFD_CLOEXEC | SfdFlags::SFD_NONBLOCK)
}
- pub fn read(&mut self, event: EpollEvent) -> Result<Option<libc::signalfd_siginfo>, Errno> {
+ pub fn read(&mut self, event: EpollEvent) -> Option<libc::signalfd_siginfo> {
if self.epoll.is_err(event) {
- return Err(Errno::EIO);
+ return self.recover();
}
let mut buffer = mem::MaybeUninit::<libc::signalfd_siginfo>::zeroed();
@@ -139,14 +104,14 @@ impl Signals {
match res {
x if x == size as isize => {
let info = unsafe { buffer.assume_init() };
- Ok(Some(info))
+ Some(info)
}
- x if x >= 0 => Ok(None),
+ x if x >= 0 => None,
x => {
let err = Errno::from_i32(x.neg() as i32);
eprintln!("read_signals failed err:{:?}", err);
unistd::sleep(1);
- Ok(None)
+ None
}
}
}
@@ -191,13 +156,31 @@ impl Signals {
}
pub fn is_fd(&self, fd: RawFd) -> bool {
- if fd == self.signal_fd {
- return true;
+ fd == self.signal_fd
+ }
+
+ fn recover(&mut self) -> Option<libc::signalfd_siginfo> {
+ match self.reset_sigset() {
+ Ok(signal_fd) => match self.epoll.register(signal_fd) {
+ Ok(_) => {
+ self.epoll.safe_close(self.signal_fd);
+ self.signal_fd = signal_fd;
+ eprintln!("signals recover");
+ }
+ Err(e) => {
+ eprintln!("Failed to register signal_fd:{:?}", e);
+ }
+ },
+ Err(e) => {
+ eprintln!("Failed to create_signals_epoll:{:?}", e);
+ }
}
- false
+ None
}
+}
- pub fn clear(&mut self) {
+impl Drop for Signals {
+ fn drop(&mut self) {
self.epoll.safe_close(self.signal_fd);
self.signal_fd = INVALID_FD;
if let Err(e) = nix::sys::signal::pthread_sigmask(
diff --git a/exts/init/src/runtime/timer.rs b/exts/init/src/runtime/timer.rs
index f7b5f30..54fcc62 100644
--- a/exts/init/src/runtime/timer.rs
+++ b/exts/init/src/runtime/timer.rs
@@ -24,25 +24,18 @@ pub struct Timer {
epoll: Rc<Epoll>,
timer: TimerFd,
current_cnt: i64,
+ time_wait: i64,
time_cnt: i64,
}
impl Timer {
pub fn new(epoll: &Rc<Epoll>, time_wait: i64, time_cnt: i64) -> Result<Timer, Errno> {
- let timer = TimerFd::new(
- ClockId::CLOCK_REALTIME,
- TimerFlags::TFD_NONBLOCK | TimerFlags::TFD_CLOEXEC,
- )?;
- timer.set(
- Expiration::Interval(TimeSpec::seconds(time_wait)),
- TimerSetTimeFlags::empty(),
- )?;
-
- epoll.register(timer.as_raw_fd())?;
+ let timer = create_timer(epoll, time_wait)?;
Ok(Timer {
epoll: epoll.clone(),
timer,
current_cnt: 0,
+ time_wait,
time_cnt,
})
}
@@ -56,18 +49,18 @@ impl Timer {
}
#[allow(clippy::wrong_self_convention)]
- pub fn is_time_out(&mut self, event: EpollEvent) -> Result<bool, Errno> {
+ pub fn is_time_out(&mut self, event: EpollEvent) -> bool {
if self.epoll.is_err(event) {
- return Err(Errno::EIO);
+ return self.recover();
}
self.flush();
self.current_cnt += 1;
if self.time_cnt <= self.current_cnt {
eprintln!("time out!");
self.reset();
- return Ok(true);
+ return true;
}
- Ok(false)
+ false
}
// reset timer.
@@ -79,7 +72,40 @@ impl Timer {
self.timer.as_raw_fd()
}
- pub fn clear(&mut self) {
- self.epoll.safe_close(self.timer.as_raw_fd());
+ fn recover(&mut self) -> bool {
+ match create_timer(&self.epoll, self.time_wait) {
+ Ok(timer) => {
+ // After successfully creating a new timer, recycle the old timer so that
+ // if create_timer fails, event can be retrieved to create_timer again.
+ // timer have drop, no need to manually close timer.
+ self.timer = timer;
+ eprintln!("timer recover");
+ }
+ Err(e) => {
+ eprintln!("Failed to create_timer:{:?}", e);
+ }
+ }
+ // Here we believe that the system has encountered an exception, set it to timeout
+ true
+ }
+}
+
+impl Drop for Timer {
+ fn drop(&mut self) {
+ // self.timer does not need to drop, because TimerFd have drop.
}
}
+
+fn create_timer(epoll: &Rc<Epoll>, time_wait: i64) -> Result<TimerFd, Errno> {
+ let timer = TimerFd::new(
+ ClockId::CLOCK_REALTIME,
+ TimerFlags::TFD_NONBLOCK | TimerFlags::TFD_CLOEXEC,
+ )?;
+ timer.set(
+ Expiration::Interval(TimeSpec::seconds(time_wait)),
+ TimerSetTimeFlags::empty(),
+ )?;
+
+ epoll.register(timer.as_raw_fd())?;
+ Ok(timer)
+}
diff --git a/libs/constants/src/lib.rs b/libs/constants/src/lib.rs
index 6005067..1f84b40 100644
--- a/libs/constants/src/lib.rs
+++ b/libs/constants/src/lib.rs
@@ -29,3 +29,6 @@ pub const SCTL_SOCKET: &str = "/run/sysmaster/sctl";
/// Default log file path when LogTarget is configured to "file"
pub const LOG_FILE_PATH: &str = "/var/log/sysmaster/sysmaster.log";
+
+/// invalid fd
+pub const INVALID_FD: i32 = -1;
--
2.33.0