update version to v1.0.2-1

(cherry picked from commit f8fc1fd7a70ee5a61c1099a17ad2d71d5ca31f99)
This commit is contained in:
fly_1997 2024-06-19 20:29:22 +08:00 committed by openeuler-sync-bot
parent b7bd72c4ef
commit 59c3b31edb
11 changed files with 6 additions and 4784 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,121 +0,0 @@
From 0758b61d2e9af546d3c55bc9cb3704cd7a4fe59a Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Tue, 28 May 2024 20:00:07 +0800
Subject: [PATCH 3/4] modify some interfaces and add interface comments
---
src/plugin_mgr/instance_run_handler.cpp | 1 +
src/plugin_mgr/instance_run_handler.h | 10 +++-------
src/plugin_mgr/interface.h | 17 ++++++++++-------
src/plugin_mgr/plugin.h | 4 ++--
4 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index c60207b..fc9dc04 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -34,6 +34,7 @@ void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
}
void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance, uint64_t time) {
+ /* To check if an instance is enabled, enable() is called in the PluginManager. */
instance->set_enabled(true);
schedule_queue.push(ScheduleInstance{instance, time});
INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index fc45874..f0bf263 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -42,7 +42,7 @@ private:
class ScheduleInstance {
public:
bool operator < (const ScheduleInstance &rhs) const {
- return time > rhs.time || (time == rhs.time && instance->get_type() > rhs.instance->get_type());
+ return time > rhs.time || (time == rhs.time && instance->get_priority() > rhs.instance->get_priority());
}
std::shared_ptr<Instance> instance;
uint64_t time;
@@ -61,9 +61,6 @@ public:
int get_cycle() {
return cycle;
}
- bool is_instance_exist(const std::string &name) {
- return memory_store.is_instance_exist(name);
- }
void recv_queue_push(InstanceRunMessage &msg) {
this->recv_queue.push(msg);
}
@@ -77,15 +74,14 @@ private:
void run_instance(std::shared_ptr<Instance> instance);
void delete_instance(std::shared_ptr<Instance> instance);
void insert_instance(std::shared_ptr<Instance> instance, uint64_t time);
- void adjust_collector_queue(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps, bool flag);
- void check_scenario_dependency(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps);
+ /* Instance execution queue. */
std::priority_queue<ScheduleInstance> schedule_queue;
+ /*Receives messages from the PluginManager. */
SafeQueue<InstanceRunMessage> recv_queue;
MemoryStore &memory_store;
int cycle;
static const int DEFAULT_CYCLE_SIZE = 10;
- static const int MAX_DEPENDENCIES_SIZE = 20;
};
#endif // !PLUGIN_MGR_INSTANCE_RUN_HANDLER_H
diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h
index 6495b92..8aa6933 100644
--- a/src/plugin_mgr/interface.h
+++ b/src/plugin_mgr/interface.h
@@ -14,12 +14,6 @@
#include <stdbool.h>
#include <stdint.h>
-enum PluginType {
- COLLECTOR,
- SCENARIO,
- TUNE,
-};
-
struct DataBuf {
int len;
void *data;
@@ -43,10 +37,19 @@ struct Param {
struct Interface {
const char* (*get_version)();
+ /* The instance name is a unique identifier in the system. */
const char* (*get_name)();
const char* (*get_description)();
+ /* Specifies the instance dependencies, which is used as the input information
+ * for instance execution.
+ */
const char* (*get_dep)();
- enum PluginType (*get_type)();
+ /* Instance scheduling priority. In a uniform time period, a instance with a
+ * lower priority is scheduled first.
+ */
+ int (*get_priority)();
+ int (*get_type)();
+ /* Instance execution period. */
int (*get_period)();
bool (*enable)();
void (*disable)();
diff --git a/src/plugin_mgr/plugin.h b/src/plugin_mgr/plugin.h
index f6b5029..7415b99 100644
--- a/src/plugin_mgr/plugin.h
+++ b/src/plugin_mgr/plugin.h
@@ -25,8 +25,8 @@ public:
std::string get_name() const {
return this->name;
}
- PluginType get_type() const {
- return interface->get_type();
+ int get_priority() const {
+ return interface->get_priority();
}
Interface* get_interface() const {
return this->interface;
--
2.33.0

View File

@ -1,401 +0,0 @@
From 6b8debf93307ee68f2ecd4b0bf5ac2d01ee278b8 Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Wed, 29 May 2024 15:12:31 +0800
Subject: [PATCH 4/4] add the SIGINT signal processing and singleton classes
---
src/plugin_mgr/instance_run_handler.cpp | 1 -
src/plugin_mgr/logger.cpp | 2 +-
src/plugin_mgr/logger.h | 3 +-
src/plugin_mgr/main.cpp | 37 +++++++++++++++++++------
src/plugin_mgr/memory_store.h | 2 +-
src/plugin_mgr/message_manager.cpp | 22 ++++++++++-----
src/plugin_mgr/message_manager.h | 32 +++++++++++----------
src/plugin_mgr/plugin.h | 2 +-
src/plugin_mgr/plugin_manager.cpp | 26 ++++++++++-------
src/plugin_mgr/plugin_manager.h | 21 ++++++++++----
10 files changed, 97 insertions(+), 51 deletions(-)
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index fc9dc04..ecd7778 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -35,7 +35,6 @@ void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance, uint64_t time) {
/* To check if an instance is enabled, enable() is called in the PluginManager. */
- instance->set_enabled(true);
schedule_queue.push(ScheduleInstance{instance, time});
INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
}
diff --git a/src/plugin_mgr/logger.cpp b/src/plugin_mgr/logger.cpp
index 7a924c2..af39583 100644
--- a/src/plugin_mgr/logger.cpp
+++ b/src/plugin_mgr/logger.cpp
@@ -26,7 +26,7 @@ Logger::Logger() {
logger.addAppender(appender);
}
-void Logger::init(Config *config) {
+void Logger::init(std::shared_ptr<Config> config) {
log4cplus::SharedAppenderPtr appender(new log4cplus::FileAppender(config->get_log_path() + "/server.log"));
appender->setName("file");
log4cplus::tstring pattern = LOG4CPLUS_TEXT("%D{%m/%d/%y %H:%M:%S} [%t] %-5p %c - %m"
diff --git a/src/plugin_mgr/logger.h b/src/plugin_mgr/logger.h
index e86dd72..a219c86 100644
--- a/src/plugin_mgr/logger.h
+++ b/src/plugin_mgr/logger.h
@@ -13,6 +13,7 @@
#define PLUGIN_MGR_LOGGER_H
#include "config.h"
+#include <memory>
#include <log4cplus/log4cplus.h>
#define INFO(fmt) LOG4CPLUS_INFO(logger.get(), fmt)
@@ -24,7 +25,7 @@
class Logger {
public:
Logger();
- void init(Config *config);
+ void init(std::shared_ptr<Config> config);
log4cplus::Logger get() {
return logger;
}
diff --git a/src/plugin_mgr/main.cpp b/src/plugin_mgr/main.cpp
index 698ba62..5cfb020 100644
--- a/src/plugin_mgr/main.cpp
+++ b/src/plugin_mgr/main.cpp
@@ -10,6 +10,7 @@
* See the Mulan PSL v2 for more details.
******************************************************************************/
#include "plugin_manager.h"
+#include <csignal>
Logger logger;
@@ -20,8 +21,26 @@ void print_help() {
" ./oeaware /etc/oeAware/config.yaml\n");
}
+void signal_handler(int signum) {
+ auto &plugin_manager = PluginManager::get_instance();
+ auto memory_store = plugin_manager.get_memory_store();
+ auto all_plugins = memory_store.get_all_plugins();
+ for (auto plugin : all_plugins) {
+ for (size_t i = 0; i < plugin->get_instance_len(); ++i) {
+ auto instance = plugin->get_instance(i);
+ if (!instance->get_enabled()) {
+ continue;
+ }
+ instance->get_interface()->disable();
+ INFO("[PluginManager] " << instance->get_name() << " instance disabled.");
+ }
+ }
+ exit(signum);
+}
+
int main(int argc, char **argv) {
- Config config;
+ signal(SIGINT, signal_handler);
+ std::shared_ptr<Config> config = std::make_shared<Config>();
if (argc < 2) {
ERROR("System need a argument!");
exit(EXIT_FAILURE);
@@ -39,20 +58,20 @@ int main(int argc, char **argv) {
ERROR("Insufficient permission on " << config_path);
exit(EXIT_FAILURE);
}
- if (!config.load(config_path)) {
+ if (!config->load(config_path)) {
ERROR("Config load error!");
exit(EXIT_FAILURE);
}
- logger.init(&config);
- SafeQueue<Message> handler_msg;
- SafeQueue<Message> res_msg;
+ logger.init(config);
+ std::shared_ptr<SafeQueue<Message>> handler_msg = std::make_shared<SafeQueue<Message>>();
+ std::shared_ptr<SafeQueue<Message>> res_msg = std::make_shared<SafeQueue<Message>>();
INFO("[MessageManager] Start message manager!");
- MessageManager message_manager(&handler_msg, &res_msg);
- message_manager.init();
+ MessageManager &message_manager = MessageManager::get_instance();
+ message_manager.init(handler_msg, res_msg);
message_manager.run();
INFO("[PluginManager] Start plugin manager!");
- PluginManager plugin_manager(config, handler_msg, res_msg);
- plugin_manager.init();
+ PluginManager& plugin_manager = PluginManager::get_instance();
+ plugin_manager.init(config, handler_msg, res_msg);
plugin_manager.run();
return 0;
}
\ No newline at end of file
diff --git a/src/plugin_mgr/memory_store.h b/src/plugin_mgr/memory_store.h
index 999795a..0e862db 100644
--- a/src/plugin_mgr/memory_store.h
+++ b/src/plugin_mgr/memory_store.h
@@ -42,7 +42,7 @@ public:
bool is_instance_exist(const std::string &name) {
return dep_handler.is_instance_exist(name);
}
- std::vector<std::shared_ptr<Plugin>> get_all_plugins() {
+ const std::vector<std::shared_ptr<Plugin>> get_all_plugins() {
std::vector<std::shared_ptr<Plugin>> res;
for (auto &p : plugins) {
res.emplace_back(p.second);
diff --git a/src/plugin_mgr/message_manager.cpp b/src/plugin_mgr/message_manager.cpp
index 815122c..a9f862d 100644
--- a/src/plugin_mgr/message_manager.cpp
+++ b/src/plugin_mgr/message_manager.cpp
@@ -57,7 +57,7 @@ bool TcpSocket::init() {
return true;
}
-static void send_msg(Msg &msg, SafeQueue<Message> *handler_msg) {
+static void send_msg(Msg &msg, std::shared_ptr<SafeQueue<Message>> handler_msg) {
Message message;
message.set_opt(msg.opt());
message.set_type(MessageType::EXTERNAL);
@@ -66,7 +66,7 @@ static void send_msg(Msg &msg, SafeQueue<Message> *handler_msg) {
}
handler_msg->push(message);
}
-static void recv_msg(Msg &msg, SafeQueue<Message> *res_msg) {
+static void recv_msg(Msg &msg, std::shared_ptr<SafeQueue<Message>> res_msg) {
Message res;
res_msg->wait_and_pop(res);
msg.set_opt(res.get_opt());
@@ -75,7 +75,7 @@ static void recv_msg(Msg &msg, SafeQueue<Message> *res_msg) {
}
}
-void TcpSocket::serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message> *res_msg){
+void TcpSocket::serve_accept(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg){
struct epoll_event evs[MAX_EVENT_SIZE];
int sz = sizeof(evs) / sizeof(struct epoll_event);
while (true) {
@@ -112,12 +112,20 @@ void TcpSocket::serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message>
}
}
-void handler(MessageManager *mgr) {
- TcpSocket* tcp_socket = mgr->tcp_socket;
- if (!tcp_socket->init()) {
+void MessageManager::tcp_start() {
+ if (!tcp_socket.init()) {
return;
}
- tcp_socket->serve_accept(mgr->handler_msg, mgr->res_msg);
+ tcp_socket.serve_accept(handler_msg, res_msg);
+}
+
+static void handler(MessageManager *mgr) {
+ mgr->tcp_start();
+}
+
+void MessageManager::init(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg) {
+ this->handler_msg = handler_msg;
+ this->res_msg = res_msg;
}
void MessageManager::run() {
diff --git a/src/plugin_mgr/message_manager.h b/src/plugin_mgr/message_manager.h
index 95bbd1a..f5240aa 100644
--- a/src/plugin_mgr/message_manager.h
+++ b/src/plugin_mgr/message_manager.h
@@ -61,34 +61,38 @@ private:
class TcpSocket {
public:
- TcpSocket() {}
+ TcpSocket() : sock(-1), epfd(-1) { }
~TcpSocket() {
close(sock);
}
bool init();
- void serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message> *res_msg);
+ void serve_accept(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg);
private:
int domain_listen(const char *name);
-
+private:
int sock;
int epfd;
};
class MessageManager {
public:
- MessageManager(SafeQueue<Message> *handler_msg, SafeQueue<Message> *res_msg) {
- this->handler_msg = handler_msg;
- this->res_msg = res_msg;
- this->tcp_socket = nullptr;
- }
- void init(){
- this->tcp_socket = new TcpSocket();
+ MessageManager(const MessageManager&) = delete;
+ MessageManager& operator=(const MessageManager&) = delete;
+ static MessageManager& get_instance() {
+ static MessageManager message_manager;
+ return message_manager;
}
+ void init(std::shared_ptr<SafeQueue<Message>> handler_msg, std::shared_ptr<SafeQueue<Message>> res_msg);
+ void tcp_start();
void run();
-
- SafeQueue<Message> *handler_msg;
- SafeQueue<Message> *res_msg;
- TcpSocket *tcp_socket;
+private:
+ MessageManager() { }
+private:
+ /* Message queue stores messages from the client and is consumed by PluginManager. */
+ std::shared_ptr<SafeQueue<Message>> handler_msg;
+ /* Message queue stores messages from PluginManager and is consumed by TcpSocket. */
+ std::shared_ptr<SafeQueue<Message>> res_msg;
+ TcpSocket tcp_socket;
};
#endif // !PLUGIN_MGR_MESSAGE_MANAGER_H
\ No newline at end of file
diff --git a/src/plugin_mgr/plugin.h b/src/plugin_mgr/plugin.h
index 7415b99..a011261 100644
--- a/src/plugin_mgr/plugin.h
+++ b/src/plugin_mgr/plugin.h
@@ -81,7 +81,7 @@ public:
void add_instance(std::shared_ptr<Instance> ins) {
instances.emplace_back(ins);
}
- std::shared_ptr<Instance> get_instance(int i) const {
+ std::shared_ptr<Instance> get_instance(size_t i) const {
return instances[i];
}
size_t get_instance_len() const {
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index 7900ecf..4c5f5fc 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -15,7 +15,12 @@
const static int ST_MODE_MASK = 0777;
-void PluginManager::init() {
+
+void PluginManager::init(std::shared_ptr<Config> config, std::shared_ptr<SafeQueue<Message>> handler_msg,
+ std::shared_ptr<SafeQueue<Message>> res_msg) {
+ this->config = config;
+ this->handler_msg = handler_msg;
+ this->res_msg = res_msg;
instance_run_handler.reset(new InstanceRunHandler(memory_store));
pre_load();
}
@@ -220,12 +225,13 @@ ErrorCode PluginManager::instance_enabled(const std::string &name) {
if (instance->get_enabled()) {
continue;
}
+ instance->set_enabled(true);
instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance));
}
return ErrorCode::OK;
} else {
- for (auto instance : new_enabled) {
- instance->get_interface()->disable();
+ for (auto ins : new_enabled) {
+ ins->get_interface()->disable();
}
return ErrorCode::ENABLE_INSTANCE_ENV;
}
@@ -275,7 +281,7 @@ std::string PluginManager::get_plugin_in_dir(const std::string &path) {
}
ErrorCode PluginManager::add_list(std::string &res) {
- auto plugin_list = config.get_plugin_list();
+ auto plugin_list = config->get_plugin_list();
res += "Supported Packages:\n";
for (auto &p : plugin_list) {
res += p.first + "\n";
@@ -286,16 +292,16 @@ ErrorCode PluginManager::add_list(std::string &res) {
}
ErrorCode PluginManager::download(const std::string &name, std::string &res) {
- if (!config.is_plugin_info_exist(name)) {
+ if (!config->is_plugin_info_exist(name)) {
return ErrorCode::DOWNLOAD_NOT_FOUND;
}
- res += config.get_plugin_info(name).get_url();
+ res += config->get_plugin_info(name).get_url();
return ErrorCode::OK;
}
void PluginManager::pre_enable() {
- for (size_t i = 0; i < config.get_enable_list_size(); ++i) {
- EnableItem item = config.get_enable_list(i);
+ for (size_t i = 0; i < config->get_enable_list_size(); ++i) {
+ EnableItem item = config->get_enable_list(i);
if (item.get_enabled()) {
std::string name = item.get_name();
if (!memory_store.is_plugin_exist(name)) {
@@ -394,7 +400,7 @@ int PluginManager::run() {
while (true) {
Message msg;
Message res;
- this->handler_msg.wait_and_pop(msg);
+ this->handler_msg->wait_and_pop(msg);
if (msg.get_opt() == Opt::SHUTDOWN) break;
switch (msg.get_opt()) {
case Opt::LOAD: {
@@ -557,7 +563,7 @@ int PluginManager::run() {
break;
}
if (msg.get_type() == MessageType::EXTERNAL)
- res_msg.push(res);
+ res_msg->push(res);
}
return 0;
}
diff --git a/src/plugin_mgr/plugin_manager.h b/src/plugin_mgr/plugin_manager.h
index 18d3f35..b7c04fe 100644
--- a/src/plugin_mgr/plugin_manager.h
+++ b/src/plugin_mgr/plugin_manager.h
@@ -20,12 +20,21 @@
class PluginManager {
public:
- PluginManager(Config &config, SafeQueue<Message> &handler_msg, SafeQueue<Message> &res_msg) :
- config(config), handler_msg(handler_msg), res_msg(res_msg) { }
+ PluginManager(const PluginManager&) = delete;
+ PluginManager& operator=(const PluginManager&) = delete;
+ static PluginManager& get_instance() {
+ static PluginManager plugin_manager;
+ return plugin_manager;
+ }
int run();
- void init();
+ void init(std::shared_ptr<Config> config, std::shared_ptr<SafeQueue<Message>> handler_msg,
+ std::shared_ptr<SafeQueue<Message>> res_msg);
+ const MemoryStore& get_memory_store() {
+ return this->memory_store;
+ }
const void* get_data_buffer(const std::string &name);
private:
+ PluginManager() { }
void pre_load();
void pre_enable();
void pre_load_plugin();
@@ -50,9 +59,9 @@ private:
std::string get_plugin_in_dir(const std::string &path);
private:
std::unique_ptr<InstanceRunHandler> instance_run_handler;
- Config &config;
- SafeQueue<Message> &handler_msg;
- SafeQueue<Message> &res_msg;
+ std::shared_ptr<Config> config;
+ std::shared_ptr<SafeQueue<Message>> handler_msg;
+ std::shared_ptr<SafeQueue<Message>> res_msg;
MemoryStore memory_store;
};
--
2.33.0

View File

@ -1,682 +0,0 @@
From 40bb095943d23c0bf271c633fc7ec6379925762a Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Wed, 5 Jun 2024 11:08:32 +0800
Subject: [PATCH] add dynamic dependencncy adjustment
---
src/plugin_mgr/dep_handler.cpp | 108 ++++++++--------
src/plugin_mgr/dep_handler.h | 37 ++++--
src/plugin_mgr/instance_run_handler.cpp | 156 ++++++++++++++++++++----
src/plugin_mgr/instance_run_handler.h | 48 +++++---
src/plugin_mgr/interface.h | 4 +-
src/plugin_mgr/memory_store.h | 10 +-
src/plugin_mgr/plugin_manager.cpp | 41 ++-----
7 files changed, 270 insertions(+), 134 deletions(-)
diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp
index 1006175..227ee2f 100644
--- a/src/plugin_mgr/dep_handler.cpp
+++ b/src/plugin_mgr/dep_handler.cpp
@@ -29,25 +29,47 @@ void DepHandler::add_arc_node(std::shared_ptr<Node> node, const std::vector<std:
node->cnt = dep_nodes.size();
int real_cnt = 0;
for (auto name : dep_nodes) {
- std::shared_ptr<ArcNode> tmp = std::make_shared<ArcNode>();
- tmp->arc_name = name;
- tmp->node_name = node->instance->get_name();
+ std::string from = node->instance->get_name();
+ std::shared_ptr<ArcNode> tmp = std::make_shared<ArcNode>(from, name);
tmp->next = arc_head->next;
arc_head->next = tmp;
if (nodes.count(name)) {
tmp->is_exist = true;
- arc_nodes[name].insert(tmp);
+ tmp->init = true;
real_cnt++;
- } else {
- tmp->is_exist = false;
- arc_nodes[name].insert(tmp);
}
+ in_edges[name].insert(from);
+ arc_nodes[std::make_pair(from, name)] = tmp;
}
if (real_cnt == node->cnt) {
node->instance->set_state(true);
}
node->real_cnt = real_cnt;
+}
+
+void DepHandler::add_edge(const std::string &from, const std::string &to) {
+ if (in_edges[to].find(from) != in_edges[to].end()) {
+ auto arc_node = arc_nodes[std::make_pair(from, to)];
+ arc_node->is_exist = true;
+ return;
+ }
+
+ auto arc_node = std::make_shared<ArcNode>(from, to);
+ auto node = nodes[from];
+ arc_node->next = node->head->next;
+ node->head->next = arc_node;
+ arc_node->is_exist = true;
+ arc_nodes[std::make_pair(from, to)] = arc_node;
+ in_edges[to].insert(from);
+}
+
+void DepHandler::delete_edge(const std::string &from, const std::string &to) {
+ if (in_edges[to].find(from) == in_edges[to].end()) {
+ return;
+ }
+ auto arc_node = arc_nodes[std::make_pair(from, to)];
+ arc_node->is_exist = false;
}
void DepHandler::add_node(std::shared_ptr<Instance> instance) {
@@ -81,10 +103,11 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr<Node> node) {
while(arc) {
std::shared_ptr<ArcNode> tmp = arc->next;
if (arc != node->head){
- std::string name = arc->arc_name;
- arc_nodes[name].erase(arc);
- if (arc_nodes[name].empty()) {
- arc_nodes.erase(name);
+ std::string name = arc->to;
+ in_edges[name].erase(arc->from);
+ arc_nodes.erase(std::make_pair(arc->from, arc->to));
+ if (in_edges[name].empty()) {
+ in_edges.erase(name);
}
}
arc = tmp;
@@ -92,15 +115,18 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr<Node> node) {
}
void DepHandler::update_instance_state(const std::string &name) {
- if (!nodes[name]->instance->get_state() || !arc_nodes.count(name)) return;
- std::unordered_set<std::shared_ptr<ArcNode>> &arcs = arc_nodes[name];
- for (auto &arc_node : arcs) {
- if (nodes.count(arc_node->node_name)) {
- auto tmp = nodes[arc_node->node_name];
+ if (!nodes[name]->instance->get_state() || !in_edges.count(name)) return;
+ std::unordered_set<std::string> &arcs = in_edges[name];
+ for (auto &from : arcs) {
+ auto arc_node = arc_nodes[std::make_pair(from, name)];
+ if (nodes.count(from)) {
+ auto tmp = nodes[from];
tmp->real_cnt++;
if (tmp->real_cnt == tmp->cnt) {
tmp->instance->set_state(true);
}
+ arc_node->is_exist = true;
+ arc_node->init = true;
update_instance_state(tmp->instance->get_name());
}
}
@@ -113,54 +139,38 @@ void DepHandler::query_all_dependencies(std::vector<std::vector<std::string>> &q
}
void DepHandler::query_node_top(const std::string &name, std::vector<std::vector<std::string>> &query) {
- std::shared_ptr<ArcNode> p = nodes[name]->head;
- if (p->next == nullptr) {
+ std::shared_ptr<ArcNode> arc_node = nodes[name]->head;
+ if (arc_node->next == nullptr) {
query.emplace_back(std::vector<std::string>{name});
return;
}
- while (p->next != nullptr) {
- query.emplace_back(std::vector<std::string>{name, p->next->arc_name});
- p = p->next;
+ while (arc_node->next != nullptr) {
+ if (arc_node->next->is_exist) {
+ query.emplace_back(std::vector<std::string>{name, arc_node->next->to});
+ }
+ arc_node = arc_node->next;
}
}
void DepHandler::query_node_dependency(const std::string &name, std::vector<std::vector<std::string>> &query) {
if (!nodes.count(name)) return;
- std::queue<std::string> q;
+ std::queue<std::string> instance_queue;
std::unordered_set<std::string> vis;
vis.insert(name);
- q.push(name);
- while (!q.empty()) {
- auto node = nodes[q.front()];
- q.pop();
+ instance_queue.push(name);
+ while (!instance_queue.empty()) {
+ auto node = nodes[instance_queue.front()];
+ instance_queue.pop();
std::string node_name = node->instance->get_name();
query.emplace_back(std::vector<std::string>{node_name});
for (auto cur = node->head->next; cur != nullptr; cur = cur->next) {
- query.emplace_back(std::vector<std::string>{node_name, cur->arc_name});
- if (!vis.count(cur->arc_name) && nodes.count(cur->arc_name)) {
- vis.insert(cur->arc_name);
- q.push(cur->arc_name);
+ if (cur->is_exist) {
+ query.emplace_back(std::vector<std::string>{node_name, cur->to});
}
- }
- }
-}
-
-std::vector<std::string> DepHandler::get_pre_dependencies(const std::string &name) {
- std::vector<std::string> res;
- std::queue<std::shared_ptr<Node>> q;
- std::unordered_set<std::string> vis;
- vis.insert(name);
- q.push(nodes[name]);
- while (!q.empty()) {
- auto &node = q.front();
- q.pop();
- res.emplace_back(node->instance->get_name());
- for (auto arc_node = node->head->next; arc_node != nullptr; arc_node = arc_node->next) {
- if (!vis.count(arc_node->arc_name)) {
- vis.insert(arc_node->arc_name);
- q.push(nodes[arc_node->arc_name]);
+ if (!vis.count(cur->to) && nodes.count(cur->to)) {
+ vis.insert(cur->to);
+ instance_queue.push(cur->to);
}
}
}
- return res;
}
diff --git a/src/plugin_mgr/dep_handler.h b/src/plugin_mgr/dep_handler.h
index a7a8ef6..322e3b3 100644
--- a/src/plugin_mgr/dep_handler.h
+++ b/src/plugin_mgr/dep_handler.h
@@ -18,10 +18,12 @@
struct ArcNode {
std::shared_ptr<ArcNode> next;
- std::string arc_name;
- std::string node_name;
- bool is_exist;
- ArcNode() : next(nullptr), is_exist(false) { }
+ std::string from;
+ std::string to;
+ bool is_exist; /* Whether this edge exists. */
+ bool init; /* The initial state of the edge. */
+ ArcNode() { }
+ ArcNode(const std::string &from, const std::string &to) : next(nullptr), from(from), to(to), is_exist(false), init(false) { }
};
// a instance node
@@ -29,11 +31,19 @@ struct Node {
std::shared_ptr<Node> next;
std::shared_ptr<ArcNode> head;
std::shared_ptr<Instance> instance;
- int cnt;
- int real_cnt;
+ int cnt; /* Number of dependencies required for loading. */
+ int real_cnt; /* Actual number of dependencies during loading. */
Node(): next(nullptr), head(nullptr), cnt(0), real_cnt(0) { }
};
+struct pair_hash {
+ std::size_t operator() (const std::pair<std::string, std::string> &pair) const {
+ auto h1 = std::hash<std::string>{}(pair.first);
+ auto h2 = std::hash<std::string>{}(pair.second);
+ return h1 ^ h2;
+ }
+};
+
class DepHandler {
public:
DepHandler() {
@@ -43,19 +53,23 @@ public:
bool get_node_state(const std::string &name) {
return this->nodes[name]->instance->get_state();
}
+ void delete_edge(const std::string &from, const std::string &to);
+ void add_edge(const std::string &from, const std::string &to);
void add_instance(std::shared_ptr<Instance> instance);
void delete_instance(const std::string &name);
bool is_instance_exist(const std::string &name);
std::shared_ptr<Instance> get_instance(const std::string &name) const {
+ if (!nodes.count(name)) {
+ return nullptr;
+ }
return nodes.at(name)->instance;
}
void query_node_dependency(const std::string &name, std::vector<std::vector<std::string>> &query);
void query_all_dependencies(std::vector<std::vector<std::string>> &query);
/* check whether the instance has dependencies */
bool have_dep(const std::string &name) {
- return arc_nodes.count(name);
+ return in_edges.count(name);
}
- std::vector<std::string> get_pre_dependencies(const std::string &name);
private:
void add_node(std::shared_ptr<Instance> instance);
void del_node(const std::string &name);
@@ -64,8 +78,11 @@ private:
void update_instance_state(const std::string &name);
void del_node_and_arc_nodes(std::shared_ptr<Node> node);
std::shared_ptr<Node> add_new_node(std::shared_ptr<Instance> instance);
- /* indegree edges */
- std::unordered_map<std::string, std::unordered_set<std::shared_ptr<ArcNode>>> arc_nodes;
+ /* Indegree edges. */
+ std::unordered_map<std::string, std::unordered_set<std::string>> in_edges;
+ /* Store all edges. */
+ std::unordered_map<std::pair<std::string, std::string>, std::shared_ptr<ArcNode>, pair_hash> arc_nodes;
+ /* Store all points. */
std::unordered_map<std::string, std::shared_ptr<Node>> nodes;
std::shared_ptr<Node> head;
};
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index ecd7778..496166b 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -20,9 +20,8 @@ static const DataRingBuf* get_ring_buf(std::shared_ptr<Instance> instance) {
return instance->get_interface()->get_ring_buf();
}
-void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
+void InstanceRunHandler::run_instance(std::vector<std::string> &deps, InstanceRun run) {
std::vector<const DataRingBuf*> input_data;
- std::vector<std::string> deps = instance->get_deps();
for (size_t i = 0; i < deps.size(); ++i) {
std::shared_ptr<Instance> ins = memory_store.get_instance(deps[i]);
input_data.emplace_back(get_ring_buf(ins));
@@ -30,62 +29,167 @@ void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
Param param;
param.ring_bufs = input_data.data();
param.len = input_data.size();
- instance->get_interface()->run(&param);
+ run(&param);
}
-void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance, uint64_t time) {
- /* To check if an instance is enabled, enable() is called in the PluginManager. */
- schedule_queue.push(ScheduleInstance{instance, time});
- INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
+void InstanceRunHandler::enable_instance(const std::string &name) {
+ std::queue<std::shared_ptr<Node>> instance_node_queue;
+ auto dep_handler = memory_store.get_dep_handler();
+ instance_node_queue.push(dep_handler.get_node(name));
+ std::vector<std::string> new_enabled;
+ bool enabled = true;
+ while (!instance_node_queue.empty()) {
+ auto node = instance_node_queue.front();
+ instance_node_queue.pop();
+ if (node->instance->get_enabled()) {
+ continue;
+ }
+ auto cur_name = node->instance->get_name();
+ node->instance->set_enabled(true);
+ new_enabled.emplace_back(cur_name);
+ if (!node->instance->get_interface()->enable()) {
+ enabled = false;
+ break;
+ }
+ for (auto arc = node->head->next; arc != nullptr; arc = arc->next) {
+ if (!arc->is_exist) {
+ continue;
+ }
+ auto cur_node = dep_handler.get_node(arc->to);
+ in_degree[arc->to]++;
+ instance_node_queue.push(cur_node);
+ }
+ }
+ if (!enabled) {
+ for (auto &enabled_name : new_enabled) {
+ auto instance = dep_handler.get_instance(enabled_name);
+ instance->get_interface()->disable();
+ instance->set_enabled(false);
+ if (enabled_name == name) {
+ continue;
+ }
+ in_degree[enabled_name]--;
+ }
+ } else {
+ for (auto &enabled_name : new_enabled) {
+ auto instance = dep_handler.get_instance(enabled_name);
+ schedule_queue.push(ScheduleInstance{instance, time});
+ INFO("[InstanceRunHandler] " << enabled_name << " instance insert into schedule queue at time " << time);
+ }
+ }
}
-void InstanceRunHandler::delete_instance(std::shared_ptr<Instance> instance) {
- instance->get_interface()->disable();
- instance->set_enabled(false);
- INFO("[PluginManager] " << instance->get_name() << " instance delete from running queue.");
+void InstanceRunHandler::disable_instance(const std::string &name) {
+ if (in_degree[name] != 0) {
+ return;
+ }
+ std::queue<std::shared_ptr<Node>> instance_node_queue;
+ auto dep_handler = memory_store.get_dep_handler();
+ instance_node_queue.push(dep_handler.get_node(name));
+ while (!instance_node_queue.empty()) {
+ auto node = instance_node_queue.front();
+ auto &instance = node->instance;
+ instance_node_queue.pop();
+ auto cur_name = instance->get_name();
+ instance->set_enabled(false);
+ instance->get_interface()->disable();
+ INFO("[InstanceRunHandler] " << cur_name << " instance disabled at time " << time);
+ for (auto arc = node->head->next; arc != nullptr; arc = arc->next) {
+ auto cur_node = dep_handler.get_node(arc->to);
+ arc->is_exist = arc->init;
+ /* The instance can be closed only when the indegree is 0. */
+ if (in_degree[arc->to] <= 0 || --in_degree[arc->to] != 0) {
+ continue;
+ }
+ instance_node_queue.push(cur_node);
+ }
+ }
}
-void InstanceRunHandler::handle_instance(uint64_t time) {
- InstanceRunMessage msg;
+void InstanceRunHandler::handle_instance() {
+ std::shared_ptr<InstanceRunMessage> msg;
while(this->recv_queue_try_pop(msg)){
- std::shared_ptr<Instance> instance = msg.get_instance();
- switch (msg.get_type()){
+ std::shared_ptr<Instance> instance = msg->get_instance();
+ switch (msg->get_type()){
case RunType::ENABLED: {
- insert_instance(std::move(instance), time);
+ enable_instance(instance->get_name());
break;
}
case RunType::DISABLED: {
- delete_instance(std::move(instance));
+ disable_instance(instance->get_name());
break;
}
}
+ msg->notify_one();
}
}
-void InstanceRunHandler::schedule(uint64_t time) {
+void InstanceRunHandler::change_instance_state(const std::string &name, std::vector<std::string> &deps,
+ std::vector<std::string> &after_deps) {
+ for (auto &dep : deps) {
+ if (std::find(after_deps.begin(), after_deps.end(), dep) != after_deps.end()) {
+ continue;
+ }
+ auto instance = memory_store.get_instance(dep);
+ if (instance == nullptr) {
+ ERROR("[InstanceRunHandler] ilegal dependency: " << dep);
+ continue;
+ }
+ /* Disable the instance that is not required. */
+ if (instance->get_enabled()) {
+ memory_store.delete_edge(name, instance->get_name());
+ in_degree[instance->get_name()]--;
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::DISABLED, instance);
+ recv_queue_push(std::move(msg));
+ }
+ }
+ for (auto &after_dep : after_deps) {
+ if (std::find(deps.begin(), deps.end(), after_dep) != deps.end()) {
+ continue;
+ }
+ auto instance = memory_store.get_instance(after_dep);
+ if (instance == nullptr) {
+ ERROR("[InstanceRunHandler] ilegal dependency: " << after_dep);
+ continue;
+ }
+ /* Enable the instance that is required. */
+ if (!instance->get_enabled()) {
+ in_degree[instance->get_name()]++;
+ memory_store.add_edge(name, instance->get_name());
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::ENABLED, instance);
+ recv_queue_push(std::move(msg));
+ }
+ }
+}
+
+void InstanceRunHandler::schedule() {
while (!schedule_queue.empty()) {
auto schedule_instance = schedule_queue.top();
+ auto &instance = schedule_instance.instance;
if (schedule_instance.time != time) {
break;
}
schedule_queue.pop();
- if (!schedule_instance.instance->get_enabled()) {
- break;
+ if (!instance->get_enabled()) {
+ continue;
}
- run_instance(schedule_instance.instance);
- schedule_instance.time += schedule_instance.instance->get_interface()->get_period();
+ std::vector<std::string> deps = instance->get_deps();
+ run_instance(deps, instance->get_interface()->run);
+ schedule_instance.time += instance->get_interface()->get_period();
schedule_queue.push(schedule_instance);
+ /* Dynamically change dependencies. */
+ std::vector<std::string> after_deps = instance->get_deps();
+ change_instance_state(instance->get_name(), deps, after_deps);
}
}
void start(InstanceRunHandler *instance_run_handler) {
- unsigned long long time = 0;
INFO("[PluginManager] instance schedule started!");
while(true) {
- instance_run_handler->handle_instance(time);
- instance_run_handler->schedule(time);
+ instance_run_handler->handle_instance();
+ instance_run_handler->schedule();
usleep(instance_run_handler->get_cycle() * 1000);
- time += instance_run_handler->get_cycle();
+ instance_run_handler->add_time(instance_run_handler->get_cycle());
}
}
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index f0bf263..980fd98 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -27,16 +27,30 @@ enum class RunType {
class InstanceRunMessage {
public:
InstanceRunMessage() {}
- InstanceRunMessage(RunType type, std::shared_ptr<Instance> instance) : type(type), instance(instance) {}
+ InstanceRunMessage(RunType type, std::shared_ptr<Instance> instance) : type(type), instance(instance), finish(false) { }
RunType get_type() {
return type;
}
std::shared_ptr<Instance> get_instance() {
return instance;
}
+ void wait() {
+ std::unique_lock<std::mutex> lock(mutex);
+ cond.wait(lock, [this]() {
+ return finish;
+ });
+ }
+ void notify_one() {
+ std::unique_lock<std::mutex> lock(mutex);
+ finish = true;
+ cond.notify_one();
+ }
private:
RunType type;
std::shared_ptr<Instance> instance;
+ std::mutex mutex;
+ std::condition_variable cond;
+ bool finish;
};
class ScheduleInstance {
@@ -51,35 +65,39 @@ public:
/* A handler to schedule instances. */
class InstanceRunHandler {
public:
- explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), cycle(DEFAULT_CYCLE_SIZE) {}
+ using InstanceRun = void (*)(const struct Param*);
+ explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), time(0), cycle(DEFAULT_CYCLE_SIZE) { }
void run();
- void schedule(uint64_t time);
- void handle_instance(uint64_t time);
+ void schedule();
+ void handle_instance();
void set_cycle(int cycle) {
this->cycle = cycle;
}
int get_cycle() {
return cycle;
}
- void recv_queue_push(InstanceRunMessage &msg) {
- this->recv_queue.push(msg);
- }
- void recv_queue_push(InstanceRunMessage &&msg) {
+ void recv_queue_push(std::shared_ptr<InstanceRunMessage> msg) {
this->recv_queue.push(msg);
}
- bool recv_queue_try_pop(InstanceRunMessage &msg) {
+ bool recv_queue_try_pop(std::shared_ptr<InstanceRunMessage> &msg) {
return this->recv_queue.try_pop(msg);
}
+ void add_time(uint64_t period) {
+ time += period;
+ }
+private:
+ void run_instance(std::vector<std::string> &deps, InstanceRun run);
+ void change_instance_state(const std::string &name, std::vector<std::string> &deps, std::vector<std::string> &after_deps);
+ void enable_instance(const std::string &name);
+ void disable_instance(const std::string &name);
private:
- void run_instance(std::shared_ptr<Instance> instance);
- void delete_instance(std::shared_ptr<Instance> instance);
- void insert_instance(std::shared_ptr<Instance> instance, uint64_t time);
-
/* Instance execution queue. */
std::priority_queue<ScheduleInstance> schedule_queue;
- /*Receives messages from the PluginManager. */
- SafeQueue<InstanceRunMessage> recv_queue;
+ /* Receives messages from the PluginManager. */
+ SafeQueue<std::shared_ptr<InstanceRunMessage>> recv_queue;
+ std::unordered_map<std::string, int> in_degree;
MemoryStore &memory_store;
+ uint64_t time;
int cycle;
static const int DEFAULT_CYCLE_SIZE = 10;
};
diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h
index 8aa6933..9dad5b8 100644
--- a/src/plugin_mgr/interface.h
+++ b/src/plugin_mgr/interface.h
@@ -21,11 +21,11 @@ struct DataBuf {
struct DataRingBuf {
/* instance name */
- const char *instance_name;
+ const char *instance_name;
/* buf write index, initial value is -1 */
int index;
/* instance run times */
- uint64_t count;
+ uint64_t count;
struct DataBuf *buf;
int buf_len;
};
diff --git a/src/plugin_mgr/memory_store.h b/src/plugin_mgr/memory_store.h
index 0e862db..a43c461 100644
--- a/src/plugin_mgr/memory_store.h
+++ b/src/plugin_mgr/memory_store.h
@@ -58,8 +58,14 @@ public:
bool have_dep(const std::string &name) {
return dep_handler.have_dep(name);
}
- std::vector<std::string> get_pre_dependencies(const std::string &name) {
- return dep_handler.get_pre_dependencies(name);
+ const DepHandler& get_dep_handler() const {
+ return dep_handler;
+ }
+ void add_edge(const std::string &from, const std::string &to) {
+ dep_handler.add_edge(from, to);
+ }
+ void delete_edge(const std::string &from, const std::string &to) {
+ dep_handler.delete_edge(from, to);
}
private:
/* instance are stored in the form of DAG.
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index 4c5f5fc..9ecaa6d 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -22,7 +22,6 @@ void PluginManager::init(std::shared_ptr<Config> config, std::shared_ptr<SafeQue
this->handler_msg = handler_msg;
this->res_msg = res_msg;
instance_run_handler.reset(new InstanceRunHandler(memory_store));
- pre_load();
}
ErrorCode PluginManager::remove(const std::string &name) {
@@ -204,35 +203,14 @@ ErrorCode PluginManager::instance_enabled(const std::string &name) {
if (instance->get_enabled()) {
return ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED;
}
- std::vector<std::string> pre_dependencies = memory_store.get_pre_dependencies(name);
- std::vector<std::shared_ptr<Instance>> new_enabled;
- bool enabled = true;
- for (int i = pre_dependencies.size() - 1; i >= 0; --i) {
- instance = memory_store.get_instance(pre_dependencies[i]);
- if (instance->get_enabled()) {
- continue;
- }
- new_enabled.emplace_back(instance);
- if (!instance->get_interface()->enable()) {
- enabled = false;
- WARN("[PluginManager] " << instance->get_name() << " instance enabled failed, because instance init environment failed.");
- break;
- }
- }
- if (enabled) {
- for (int i = pre_dependencies.size() - 1; i >= 0; --i) {
- instance = memory_store.get_instance(pre_dependencies[i]);
- if (instance->get_enabled()) {
- continue;
- }
- instance->set_enabled(true);
- instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance));
- }
- return ErrorCode::OK;
+ std::shared_ptr<InstanceRunMessage> msg = std::make_shared<InstanceRunMessage>(RunType::ENABLED, instance);
+ /* Send message to InstanceRunHandler. */
+ instance_run_handler->recv_queue_push(msg);
+ /* Wait for InstanceRunHandler to finsh this task. */
+ msg->wait();
+ if (msg->get_instance()->get_enabled()) {
+ return ErrorCode::OK;
} else {
- for (auto ins : new_enabled) {
- ins->get_interface()->disable();
- }
return ErrorCode::ENABLE_INSTANCE_ENV;
}
}
@@ -248,7 +226,9 @@ ErrorCode PluginManager::instance_disabled(const std::string &name) {
if (!instance->get_enabled()) {
return ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED;
}
- instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::DISABLED, instance));
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::DISABLED, instance);
+ instance_run_handler->recv_queue_push(msg);
+ msg->wait();
return ErrorCode::OK;
}
@@ -397,6 +377,7 @@ bool file_exist(const std::string &file_name) {
int PluginManager::run() {
instance_run_handler->run();
+ pre_load();
while (true) {
Message msg;
Message res;
--
2.33.0

View File

@ -1,115 +0,0 @@
From 2a6837386db74d59e35f2ff54499d79904ae1501 Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Thu, 13 Jun 2024 10:06:04 +0800
Subject: [PATCH 1/2] fix dependency disabled failed and pre-enable illegal
plugin
---
src/plugin_mgr/instance_run_handler.cpp | 17 ++++++++++-------
src/plugin_mgr/instance_run_handler.h | 4 +++-
src/plugin_mgr/plugin_manager.cpp | 12 ++++++------
3 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index 496166b..928b555 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -79,10 +79,11 @@ void InstanceRunHandler::enable_instance(const std::string &name) {
}
}
-void InstanceRunHandler::disable_instance(const std::string &name) {
- if (in_degree[name] != 0) {
+void InstanceRunHandler::disable_instance(const std::string &name, RunType type) {
+ if (type == RunType::INTERNAL_DISABLED && in_degree[name] != 0) {
return;
}
+ in_degree[name] = 0;
std::queue<std::shared_ptr<Node>> instance_node_queue;
auto dep_handler = memory_store.get_dep_handler();
instance_node_queue.push(dep_handler.get_node(name));
@@ -111,12 +112,14 @@ void InstanceRunHandler::handle_instance() {
while(this->recv_queue_try_pop(msg)){
std::shared_ptr<Instance> instance = msg->get_instance();
switch (msg->get_type()){
- case RunType::ENABLED: {
+ case RunType::ENABLED:
+ case RunType::INTERNAL_ENABLED: {
enable_instance(instance->get_name());
break;
}
- case RunType::DISABLED: {
- disable_instance(instance->get_name());
+ case RunType::DISABLED:
+ case RunType::INTERNAL_DISABLED: {
+ disable_instance(instance->get_name(), msg->get_type());
break;
}
}
@@ -139,7 +142,7 @@ void InstanceRunHandler::change_instance_state(const std::string &name, std::vec
if (instance->get_enabled()) {
memory_store.delete_edge(name, instance->get_name());
in_degree[instance->get_name()]--;
- auto msg = std::make_shared<InstanceRunMessage>(RunType::DISABLED, instance);
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::INTERNAL_DISABLED, instance);
recv_queue_push(std::move(msg));
}
}
@@ -156,7 +159,7 @@ void InstanceRunHandler::change_instance_state(const std::string &name, std::vec
if (!instance->get_enabled()) {
in_degree[instance->get_name()]++;
memory_store.add_edge(name, instance->get_name());
- auto msg = std::make_shared<InstanceRunMessage>(RunType::ENABLED, instance);
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::INTERNAL_ENABLED, instance);
recv_queue_push(std::move(msg));
}
}
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index 980fd98..682510b 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -21,6 +21,8 @@
enum class RunType {
ENABLED,
DISABLED,
+ INTERNAL_ENABLED,
+ INTERNAL_DISABLED,
};
/* Message for communication between plugin manager and instance scheduling */
@@ -89,7 +91,7 @@ private:
void run_instance(std::vector<std::string> &deps, InstanceRun run);
void change_instance_state(const std::string &name, std::vector<std::string> &deps, std::vector<std::string> &after_deps);
void enable_instance(const std::string &name);
- void disable_instance(const std::string &name);
+ void disable_instance(const std::string &name, RunType type);
private:
/* Instance execution queue. */
std::priority_queue<ScheduleInstance> schedule_queue;
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index 9ecaa6d..7220f48 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -282,13 +282,13 @@ ErrorCode PluginManager::download(const std::string &name, std::string &res) {
void PluginManager::pre_enable() {
for (size_t i = 0; i < config->get_enable_list_size(); ++i) {
EnableItem item = config->get_enable_list(i);
+ std::string plugin_name = item.get_name();
+ if (!memory_store.is_plugin_exist(plugin_name)) {
+ WARN("[PluginManager] plugin " << plugin_name << " cannot be enabled, because it does not exist.");
+ continue;
+ }
if (item.get_enabled()) {
- std::string name = item.get_name();
- if (!memory_store.is_plugin_exist(name)) {
- WARN("[PluginManager] plugin " << name << " cannot be enabled, because it does not exist.");
- continue;
- }
- std::shared_ptr<Plugin> plugin = memory_store.get_plugin(name);
+ std::shared_ptr<Plugin> plugin = memory_store.get_plugin(plugin_name);
for (size_t j = 0; j < plugin->get_instance_len(); ++j) {
instance_enabled(plugin->get_instance(j)->get_name());
}
--
2.33.0

View File

@ -1,138 +0,0 @@
From 918ef95a2e5f9de71efa55beee35eaa083c31b7d Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Thu, 13 Jun 2024 16:28:19 +0800
Subject: [PATCH 2/2] fix dependency error and add enable failed logs
---
src/plugin_mgr/dep_handler.cpp | 22 +++++++++++++---------
src/plugin_mgr/instance_run_handler.h | 2 ++
src/plugin_mgr/plugin_manager.cpp | 17 ++++++++++++-----
3 files changed, 27 insertions(+), 14 deletions(-)
diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp
index 227ee2f..2b20e7e 100644
--- a/src/plugin_mgr/dep_handler.cpp
+++ b/src/plugin_mgr/dep_handler.cpp
@@ -28,22 +28,26 @@ void DepHandler::add_arc_node(std::shared_ptr<Node> node, const std::vector<std:
std::shared_ptr<ArcNode> arc_head = node->head;
node->cnt = dep_nodes.size();
int real_cnt = 0;
+ bool state = true;
for (auto name : dep_nodes) {
std::string from = node->instance->get_name();
std::shared_ptr<ArcNode> tmp = std::make_shared<ArcNode>(from, name);
tmp->next = arc_head->next;
arc_head->next = tmp;
-
+ tmp->init = true;
if (nodes.count(name)) {
tmp->is_exist = true;
- tmp->init = true;
real_cnt++;
+ if (!nodes[name]->instance->get_state()) {
+ state = false;
+ }
}
in_edges[name].insert(from);
arc_nodes[std::make_pair(from, name)] = tmp;
}
+ /* node->instance->state = true, only all dependencies meet the conditions. */
if (real_cnt == node->cnt) {
- node->instance->set_state(true);
+ node->instance->set_state(state);
}
node->real_cnt = real_cnt;
}
@@ -115,18 +119,17 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr<Node> node) {
}
void DepHandler::update_instance_state(const std::string &name) {
- if (!nodes[name]->instance->get_state() || !in_edges.count(name)) return;
+ if (!in_edges.count(name)) return;
std::unordered_set<std::string> &arcs = in_edges[name];
for (auto &from : arcs) {
auto arc_node = arc_nodes[std::make_pair(from, name)];
if (nodes.count(from)) {
auto tmp = nodes[from];
tmp->real_cnt++;
- if (tmp->real_cnt == tmp->cnt) {
+ if (tmp->real_cnt == tmp->cnt && nodes[name]->instance->get_state()) {
tmp->instance->set_state(true);
}
arc_node->is_exist = true;
- arc_node->init = true;
update_instance_state(tmp->instance->get_name());
}
}
@@ -144,8 +147,9 @@ void DepHandler::query_node_top(const std::string &name, std::vector<std::vector
query.emplace_back(std::vector<std::string>{name});
return;
}
- while (arc_node->next != nullptr) {
- if (arc_node->next->is_exist) {
+ while (arc_node->next != nullptr) {
+ /* Display the edges that exist and the points that do not. */
+ if (arc_node->next->is_exist || !nodes.count(arc_node->next->to)) {
query.emplace_back(std::vector<std::string>{name, arc_node->next->to});
}
arc_node = arc_node->next;
@@ -164,7 +168,7 @@ void DepHandler::query_node_dependency(const std::string &name, std::vector<std:
std::string node_name = node->instance->get_name();
query.emplace_back(std::vector<std::string>{node_name});
for (auto cur = node->head->next; cur != nullptr; cur = cur->next) {
- if (cur->is_exist) {
+ if (cur->is_exist || !nodes.count(cur->to)) {
query.emplace_back(std::vector<std::string>{node_name, cur->to});
}
if (!vis.count(cur->to) && nodes.count(cur->to)) {
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index 682510b..3804a49 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -19,8 +19,10 @@
#include <queue>
enum class RunType {
+ /* Message from PluginManager. */
ENABLED,
DISABLED,
+ /* Message from internal. */
INTERNAL_ENABLED,
INTERNAL_DISABLED,
};
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index 7220f48..c3ff8bf 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -290,16 +290,23 @@ void PluginManager::pre_enable() {
if (item.get_enabled()) {
std::shared_ptr<Plugin> plugin = memory_store.get_plugin(plugin_name);
for (size_t j = 0; j < plugin->get_instance_len(); ++j) {
- instance_enabled(plugin->get_instance(j)->get_name());
+ std::string name = plugin->get_instance(j)->get_name();
+ auto ret = instance_enabled(name);
+ if (ret != ErrorCode::OK) {
+ WARN("[PluginManager] " << name << " instance pre-enabled failed, because " << ErrorText::get_error_text(ret) << ".");
+ } else {
+ INFO("[PluginManager] " << name << " instance pre-enabled.");
+ }
}
} else {
for (size_t j = 0; j < item.get_instance_size(); ++j) {
std::string name = item.get_instance_name(j);
- if (!memory_store.is_instance_exist(name)) {
- WARN("[PluginManager] instance " << name << " cannot be enabled, because it does not exist.");
- continue;
+ auto ret = instance_enabled(name);
+ if (ret != ErrorCode::OK) {
+ WARN("[PluginManager] " << name << " instance pre-enabled failed, because " << ErrorText::get_error_text(ret) << ".");
+ } else {
+ INFO("[PluginManager] " << name << " instance pre-enabled.");
}
- instance_enabled(name);
}
}
}
--
2.33.0

Binary file not shown.

Binary file not shown.

View File

@ -1,18 +1,10 @@
Name: oeAware-manager
Version: v1.0.1
Release: 6
Version: v1.0.2
Release: 1
Summary: OeAware server and client
License: MulanPSL2
URL: https://gitee.com/openeuler/%{name}
Source0: %{name}-%{version}.tar.gz
Patch1: 0001-fix-issues.patch
Patch2: 0002-refactor-plugin-interface.patch
Patch3: 0003-the-client-adapts-to-the-new-interface-and-refactor-.patch
Patch4: 0004-modify-some-interfaces-and-add-interface-comments.patch
Patch5: 0005-add-the-SIGINT-signal-processing-and-singleton-class.patch
Patch6: 0006-add-dynamic-dependencncy-adjustment.patch
Patch7: 0007-fix-dependency-disabled-failed-and-pre-enable-illega.patch
Patch8: 0008-fix-dependency-error-and-add-enable-failed-logs.patch
BuildRequires: cmake make gcc-c++
BuildRequires: boost-devel
@ -48,7 +40,7 @@ install -D -p -m 0644 oeaware.service %{buildroot}%{_unitdir}/oeaware.service
%systemd_preun oeaware.service
%post
%systemd_post oeaware.service
systemctl start oeaware.service
%files
%attr(0750, root, root) %{_bindir}/oeaware
@ -57,6 +49,9 @@ install -D -p -m 0644 oeaware.service %{buildroot}%{_unitdir}/oeaware.service
%attr(0644, root, root) %{_unitdir}/oeaware.service
%changelog
* Thu Jun 20 2024 fly_1997 <flylove7@outlook.com> -v1.0.2-1
- update version to v1.0.2-1
* Fri Jun 14 2024 fly_1997 <flylove7@outlook.com> -v1.0.1-6
- fix dependency error and add enable failed logs
- fix dependency disabled failed and pre-enable illegal plugin