refactor instance interfaces and add signal function

(cherry picked from commit 0509a1c1085175fc5f7061da747eed724d59563e)
This commit is contained in:
fly_1997 2024-05-31 16:31:04 +08:00 committed by openeuler-sync-bot
parent 48c927a246
commit d080eabfc6
5 changed files with 2645 additions and 3 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,121 @@
From 0758b61d2e9af546d3c55bc9cb3704cd7a4fe59a Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Tue, 28 May 2024 20:00:07 +0800
Subject: [PATCH 3/4] modify some interfaces and add interface comments
---
src/plugin_mgr/instance_run_handler.cpp | 1 +
src/plugin_mgr/instance_run_handler.h | 10 +++-------
src/plugin_mgr/interface.h | 17 ++++++++++-------
src/plugin_mgr/plugin.h | 4 ++--
4 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index c60207b..fc9dc04 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -34,6 +34,7 @@ void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
}
void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance, uint64_t time) {
+ /* To check if an instance is enabled, enable() is called in the PluginManager. */
instance->set_enabled(true);
schedule_queue.push(ScheduleInstance{instance, time});
INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index fc45874..f0bf263 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -42,7 +42,7 @@ private:
class ScheduleInstance {
public:
bool operator < (const ScheduleInstance &rhs) const {
- return time > rhs.time || (time == rhs.time && instance->get_type() > rhs.instance->get_type());
+ return time > rhs.time || (time == rhs.time && instance->get_priority() > rhs.instance->get_priority());
}
std::shared_ptr<Instance> instance;
uint64_t time;
@@ -61,9 +61,6 @@ public:
int get_cycle() {
return cycle;
}
- bool is_instance_exist(const std::string &name) {
- return memory_store.is_instance_exist(name);
- }
void recv_queue_push(InstanceRunMessage &msg) {
this->recv_queue.push(msg);
}
@@ -77,15 +74,14 @@ private:
void run_instance(std::shared_ptr<Instance> instance);
void delete_instance(std::shared_ptr<Instance> instance);
void insert_instance(std::shared_ptr<Instance> instance, uint64_t time);
- void adjust_collector_queue(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps, bool flag);
- void check_scenario_dependency(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps);
+ /* Instance execution queue. */
std::priority_queue<ScheduleInstance> schedule_queue;
+ /*Receives messages from the PluginManager. */
SafeQueue<InstanceRunMessage> recv_queue;
MemoryStore &memory_store;
int cycle;
static const int DEFAULT_CYCLE_SIZE = 10;
- static const int MAX_DEPENDENCIES_SIZE = 20;
};
#endif // !PLUGIN_MGR_INSTANCE_RUN_HANDLER_H
diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h
index 6495b92..8aa6933 100644
--- a/src/plugin_mgr/interface.h
+++ b/src/plugin_mgr/interface.h
@@ -14,12 +14,6 @@
#include <stdbool.h>
#include <stdint.h>
-enum PluginType {
- COLLECTOR,
- SCENARIO,
- TUNE,
-};
-
struct DataBuf {
int len;
void *data;
@@ -43,10 +37,19 @@ struct Param {
struct Interface {
const char* (*get_version)();
+ /* The instance name is a unique identifier in the system. */
const char* (*get_name)();
const char* (*get_description)();
+ /* Specifies the instance dependencies, which is used as the input information
+ * for instance execution.
+ */
const char* (*get_dep)();
- enum PluginType (*get_type)();
+ /* Instance scheduling priority. In a uniform time period, a instance with a
+ * lower priority is scheduled first.
+ */
+ int (*get_priority)();
+ int (*get_type)();
+ /* Instance execution period. */
int (*get_period)();
bool (*enable)();
void (*disable)();
diff --git a/src/plugin_mgr/plugin.h b/src/plugin_mgr/plugin.h
index f6b5029..7415b99 100644
--- a/src/plugin_mgr/plugin.h
+++ b/src/plugin_mgr/plugin.h
@@ -25,8 +25,8 @@ public:
std::string get_name() const {
return this->name;
}
- PluginType get_type() const {
- return interface->get_type();
+ int get_priority() const {
+ return interface->get_priority();
}
Interface* get_interface() const {
return this->interface;
--
2.33.0

View File

@ -0,0 +1,401 @@
From 6b8debf93307ee68f2ecd4b0bf5ac2d01ee278b8 Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Wed, 29 May 2024 15:12:31 +0800
Subject: [PATCH 4/4] add the SIGINT signal processing and singleton classes
---
src/plugin_mgr/instance_run_handler.cpp | 1 -
src/plugin_mgr/logger.cpp | 2 +-
src/plugin_mgr/logger.h | 3 +-
src/plugin_mgr/main.cpp | 37 +++++++++++++++++++------
src/plugin_mgr/memory_store.h | 2 +-
src/plugin_mgr/message_manager.cpp | 22 ++++++++++-----
src/plugin_mgr/message_manager.h | 32 +++++++++++----------
src/plugin_mgr/plugin.h | 2 +-
src/plugin_mgr/plugin_manager.cpp | 26 ++++++++++-------
src/plugin_mgr/plugin_manager.h | 21 ++++++++++----
10 files changed, 97 insertions(+), 51 deletions(-)
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index fc9dc04..ecd7778 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -35,7 +35,6 @@ void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance, uint64_t time) {
/* To check if an instance is enabled, enable() is called in the PluginManager. */
- instance->set_enabled(true);
schedule_queue.push(ScheduleInstance{instance, time});
INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
}
diff --git a/src/plugin_mgr/logger.cpp b/src/plugin_mgr/logger.cpp
index 7a924c2..af39583 100644
--- a/src/plugin_mgr/logger.cpp
+++ b/src/plugin_mgr/logger.cpp
@@ -26,7 +26,7 @@ Logger::Logger() {
logger.addAppender(appender);
}
-void Logger::init(Config *config) {
+void Logger::init(std::shared_ptr<Config> config) {
log4cplus::SharedAppenderPtr appender(new log4cplus::FileAppender(config->get_log_path() + "/server.log"));
appender->setName("file");
log4cplus::tstring pattern = LOG4CPLUS_TEXT("%D{%m/%d/%y %H:%M:%S} [%t] %-5p %c - %m"
diff --git a/src/plugin_mgr/logger.h b/src/plugin_mgr/logger.h
index e86dd72..a219c86 100644
--- a/src/plugin_mgr/logger.h
+++ b/src/plugin_mgr/logger.h
@@ -13,6 +13,7 @@
#define PLUGIN_MGR_LOGGER_H
#include "config.h"
+#include <memory>
#include <log4cplus/log4cplus.h>
#define INFO(fmt) LOG4CPLUS_INFO(logger.get(), fmt)
@@ -24,7 +25,7 @@
class Logger {
public:
Logger();
- void init(Config *config);
+ void init(std::shared_ptr<Config> config);
log4cplus::Logger get() {
return logger;
}
diff --git a/src/plugin_mgr/main.cpp b/src/plugin_mgr/main.cpp
index 698ba62..5cfb020 100644
--- a/src/plugin_mgr/main.cpp
+++ b/src/plugin_mgr/main.cpp
@@ -10,6 +10,7 @@
* See the Mulan PSL v2 for more details.
******************************************************************************/
#include "plugin_manager.h"
+#include <csignal>
Logger logger;
@@ -20,8 +21,26 @@ void print_help() {
" ./oeaware /etc/oeAware/config.yaml\n");
}
+void signal_handler(int signum) {
+ auto &plugin_manager = PluginManager::get_instance();
+ auto memory_store = plugin_manager.get_memory_store();
+ auto all_plugins = memory_store.get_all_plugins();
+ for (auto plugin : all_plugins) {
+ for (size_t i = 0; i < plugin->get_instance_len(); ++i) {
+ auto instance = plugin->get_instance(i);
+ if (!instance->get_enabled()) {
+ continue;
+ }
+ instance->get_interface()->disable();
+ INFO("[PluginManager] " << instance->get_name() << " instance disabled.");
+ }
+ }
+ exit(signum);
+}
+
int main(int argc, char **argv) {
- Config config;
+ signal(SIGINT, signal_handler);
+ std::shared_ptr<Config> config = std::make_shared<Config>();
if (argc < 2) {
ERROR("System need a argument!");
exit(EXIT_FAILURE);
@@ -39,20 +58,20 @@ int main(int argc, char **argv) {
ERROR("Insufficient permission on " << config_path);
exit(EXIT_FAILURE);
}
- if (!config.load(config_path)) {
+ if (!config->load(config_path)) {
ERROR("Config load error!");
exit(EXIT_FAILURE);
}
- logger.init(&config);
- SafeQueue<Message> handler_msg;
- SafeQueue<Message> res_msg;
+ logger.init(config);
+ std::shared_ptr<SafeQueue<Message>> handler_msg = std::make_shared<SafeQueue<Message>>();
+ std::shared_ptr<SafeQueue<Message>> res_msg = std::make_shared<SafeQueue<Message>>();
INFO("[MessageManager] Start message manager!");
- MessageManager message_manager(&handler_msg, &res_msg);
- message_manager.init();
+ MessageManager &message_manager = MessageManager::get_instance();
+ message_manager.init(handler_msg, res_msg);
message_manager.run();
INFO("[PluginManager] Start plugin manager!");
- PluginManager plugin_manager(config, handler_msg, res_msg);
- plugin_manager.init();
+ PluginManager& plugin_manager = PluginManager::get_instance();
+ plugin_manager.init(config, handler_msg, res_msg);
plugin_manager.run();
return 0;
}
\ No newline at end of file
diff --git a/src/plugin_mgr/memory_store.h b/src/plugin_mgr/memory_store.h
index 999795a..0e862db 100644
--- a/src/plugin_mgr/memory_store.h
+++ b/src/plugin_mgr/memory_store.h
@@ -42,7 +42,7 @@ public:
bool is_instance_exist(const std::string &name) {
return dep_handler.is_instance_exist(name);
}
- std::vector<std::shared_ptr<Plugin>> get_all_plugins() {
+ const std::vector<std::shared_ptr<Plugin>> get_all_plugins() {
std::vector<std::shared_ptr<Plugin>> res;
for (auto &p : plugins) {
res.emplace_back(p.second);
diff --git a/src/plugin_mgr/message_manager.cpp b/src/plugin_mgr/message_manager.cpp
index 815122c..a9f862d 100644
--- a/src/plugin_mgr/message_manager.cpp
+++ b/src/plugin_mgr/message_manager.cpp
@@ -57,7 +57,7 @@ bool TcpSocket::init() {
return true;
}
-static void send_msg(Msg &msg, SafeQueue<Message> *handler_msg) {
+static void send_msg(Msg &msg, std::shared_ptr<SafeQueue<Message>> handler_msg) {
Message message;
message.set_opt(msg.opt());
message.set_type(MessageType::EXTERNAL);
@@ -66,7 +66,7 @@ static void send_msg(Msg &msg, SafeQueue<Message> *handler_msg) {
}
handler_msg->push(message);
}
-static void recv_msg(Msg &msg, SafeQueue<Message> *res_msg) {
+static void recv_msg(Msg &msg, std::shared_ptr<SafeQueue<Message>> res_msg) {
Message res;
res_msg->wait_and_pop(res);
msg.set_opt(res.get_opt());
@@ -75,7 +75,7 @@ static void recv_msg(Msg &msg, SafeQueue<Message> *res_msg) {
}
}
-void TcpSocket::serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message> *res_msg){
+void TcpSocket::serve_accept(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg){
struct epoll_event evs[MAX_EVENT_SIZE];
int sz = sizeof(evs) / sizeof(struct epoll_event);
while (true) {
@@ -112,12 +112,20 @@ void TcpSocket::serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message>
}
}
-void handler(MessageManager *mgr) {
- TcpSocket* tcp_socket = mgr->tcp_socket;
- if (!tcp_socket->init()) {
+void MessageManager::tcp_start() {
+ if (!tcp_socket.init()) {
return;
}
- tcp_socket->serve_accept(mgr->handler_msg, mgr->res_msg);
+ tcp_socket.serve_accept(handler_msg, res_msg);
+}
+
+static void handler(MessageManager *mgr) {
+ mgr->tcp_start();
+}
+
+void MessageManager::init(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg) {
+ this->handler_msg = handler_msg;
+ this->res_msg = res_msg;
}
void MessageManager::run() {
diff --git a/src/plugin_mgr/message_manager.h b/src/plugin_mgr/message_manager.h
index 95bbd1a..f5240aa 100644
--- a/src/plugin_mgr/message_manager.h
+++ b/src/plugin_mgr/message_manager.h
@@ -61,34 +61,38 @@ private:
class TcpSocket {
public:
- TcpSocket() {}
+ TcpSocket() : sock(-1), epfd(-1) { }
~TcpSocket() {
close(sock);
}
bool init();
- void serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message> *res_msg);
+ void serve_accept(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg);
private:
int domain_listen(const char *name);
-
+private:
int sock;
int epfd;
};
class MessageManager {
public:
- MessageManager(SafeQueue<Message> *handler_msg, SafeQueue<Message> *res_msg) {
- this->handler_msg = handler_msg;
- this->res_msg = res_msg;
- this->tcp_socket = nullptr;
- }
- void init(){
- this->tcp_socket = new TcpSocket();
+ MessageManager(const MessageManager&) = delete;
+ MessageManager& operator=(const MessageManager&) = delete;
+ static MessageManager& get_instance() {
+ static MessageManager message_manager;
+ return message_manager;
}
+ void init(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg);
+ void tcp_start();
void run();
-
- SafeQueue<Message> *handler_msg;
- SafeQueue<Message> *res_msg;
- TcpSocket *tcp_socket;
+private:
+ MessageManager() { }
+private:
+ /* Message queue stores messages from the client and is consumed by PluginManager. */
+ std::shared_ptr<SafeQueue<Message>> handler_msg;
+ /* Message queue stores messages from PluginManager and is consumed by TcpSocket. */
+ std::shared_ptr<SafeQueue<Message>> res_msg;
+ TcpSocket tcp_socket;
};
#endif // !PLUGIN_MGR_MESSAGE_MANAGER_H
\ No newline at end of file
diff --git a/src/plugin_mgr/plugin.h b/src/plugin_mgr/plugin.h
index 7415b99..a011261 100644
--- a/src/plugin_mgr/plugin.h
+++ b/src/plugin_mgr/plugin.h
@@ -81,7 +81,7 @@ public:
void add_instance(std::shared_ptr<Instance> ins) {
instances.emplace_back(ins);
}
- std::shared_ptr<Instance> get_instance(int i) const {
+ std::shared_ptr<Instance> get_instance(size_t i) const {
return instances[i];
}
size_t get_instance_len() const {
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index 7900ecf..4c5f5fc 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -15,7 +15,12 @@
const static int ST_MODE_MASK = 0777;
-void PluginManager::init() {
+
+void PluginManager::init(std::shared_ptr<Config> config, std::shared_ptr<SafeQueue<Message>> handler_msg,
+ std::shared_ptr<SafeQueue<Message>> res_msg) {
+ this->config = config;
+ this->handler_msg = handler_msg;
+ this->res_msg = res_msg;
instance_run_handler.reset(new InstanceRunHandler(memory_store));
pre_load();
}
@@ -220,12 +225,13 @@ ErrorCode PluginManager::instance_enabled(const std::string &name) {
if (instance->get_enabled()) {
continue;
}
+ instance->set_enabled(true);
instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance));
}
return ErrorCode::OK;
} else {
- for (auto instance : new_enabled) {
- instance->get_interface()->disable();
+ for (auto ins : new_enabled) {
+ ins->get_interface()->disable();
}
return ErrorCode::ENABLE_INSTANCE_ENV;
}
@@ -275,7 +281,7 @@ std::string PluginManager::get_plugin_in_dir(const std::string &path) {
}
ErrorCode PluginManager::add_list(std::string &res) {
- auto plugin_list = config.get_plugin_list();
+ auto plugin_list = config->get_plugin_list();
res += "Supported Packages:\n";
for (auto &p : plugin_list) {
res += p.first + "\n";
@@ -286,16 +292,16 @@ ErrorCode PluginManager::add_list(std::string &res) {
}
ErrorCode PluginManager::download(const std::string &name, std::string &res) {
- if (!config.is_plugin_info_exist(name)) {
+ if (!config->is_plugin_info_exist(name)) {
return ErrorCode::DOWNLOAD_NOT_FOUND;
}
- res += config.get_plugin_info(name).get_url();
+ res += config->get_plugin_info(name).get_url();
return ErrorCode::OK;
}
void PluginManager::pre_enable() {
- for (size_t i = 0; i < config.get_enable_list_size(); ++i) {
- EnableItem item = config.get_enable_list(i);
+ for (size_t i = 0; i < config->get_enable_list_size(); ++i) {
+ EnableItem item = config->get_enable_list(i);
if (item.get_enabled()) {
std::string name = item.get_name();
if (!memory_store.is_plugin_exist(name)) {
@@ -394,7 +400,7 @@ int PluginManager::run() {
while (true) {
Message msg;
Message res;
- this->handler_msg.wait_and_pop(msg);
+ this->handler_msg->wait_and_pop(msg);
if (msg.get_opt() == Opt::SHUTDOWN) break;
switch (msg.get_opt()) {
case Opt::LOAD: {
@@ -557,7 +563,7 @@ int PluginManager::run() {
break;
}
if (msg.get_type() == MessageType::EXTERNAL)
- res_msg.push(res);
+ res_msg->push(res);
}
return 0;
}
diff --git a/src/plugin_mgr/plugin_manager.h b/src/plugin_mgr/plugin_manager.h
index 18d3f35..b7c04fe 100644
--- a/src/plugin_mgr/plugin_manager.h
+++ b/src/plugin_mgr/plugin_manager.h
@@ -20,12 +20,21 @@
class PluginManager {
public:
- PluginManager(Config &config, SafeQueue<Message> &handler_msg, SafeQueue<Message> &res_msg) :
- config(config), handler_msg(handler_msg), res_msg(res_msg) { }
+ PluginManager(const PluginManager&) = delete;
+ PluginManager& operator=(const PluginManager&) = delete;
+ static PluginManager& get_instance() {
+ static PluginManager plugin_manager;
+ return plugin_manager;
+ }
int run();
- void init();
+ void init(std::shared_ptr<Config> config, std::shared_ptr<SafeQueue<Message>> handler_msg,
+ std::shared_ptr<SafeQueue<Message>> res_msg);
+ const MemoryStore& get_memory_store() {
+ return this->memory_store;
+ }
const void* get_data_buffer(const std::string &name);
private:
+ PluginManager() { }
void pre_load();
void pre_enable();
void pre_load_plugin();
@@ -50,9 +59,9 @@ private:
std::string get_plugin_in_dir(const std::string &path);
private:
std::unique_ptr<InstanceRunHandler> instance_run_handler;
- Config &config;
- SafeQueue<Message> &handler_msg;
- SafeQueue<Message> &res_msg;
+ std::shared_ptr<Config> config;
+ std::shared_ptr<SafeQueue<Message>> handler_msg;
+ std::shared_ptr<SafeQueue<Message>> res_msg;
MemoryStore memory_store;
};
--
2.33.0

View File

@ -1,11 +1,15 @@
Name: oeAware-manager
Version: v1.0.1
Release: 2
Release: 3
Summary: OeAware server and client
License: MulanPSL2
URL: https://gitee.com/openeuler/%{name}
Source0: %{name}-%{version}.tar.gz
Patch1: 0001-fix-issues.patch
Patch2: 0002-refactor-plugin-interface.patch
Patch3: 0003-the-client-adapts-to-the-new-interface-and-refactor-.patch
Patch4: 0004-modify-some-interfaces-and-add-interface-comments.patch
Patch5: 0005-add-the-SIGINT-signal-processing-and-singleton-class.patch
BuildRequires: cmake make gcc-c++
BuildRequires: boost-devel
@ -14,8 +18,9 @@ BuildRequires: log4cplus-devel
BuildRequires: yaml-cpp-devel
BuildRequires: gtest-devel gmock-devel
Requires: oeAware-collector >= v1.0.0
Requires: oeAware-scenario >= v1.0.0
Requires: oeAware-collector >= v1.0.2
Requires: oeAware-scenario >= v1.0.2
Requires: oeAware-tune >= v1.0.0
Requires: graphviz
%description
@ -49,6 +54,9 @@ install -D -p -m 0644 oeaware.service %{buildroot}%{_unitdir}/oeaware.service
%attr(0644, root, root) %{_unitdir}/oeaware.service
%changelog
* Fri May 31 2024 fly_1997 <flylove7@outlook.com> -v1.0.1-3
- refactor instance interfaces and add signal function
* Wed May 15 2024 fly_1997 <flylove7@outlook.com> -v1.0.1-2
- fix pre-enable failed, dependencies missing error, memory leak
- fix warning message