diff --git a/0006-add-dynamic-dependencncy-adjustment.patch b/0006-add-dynamic-dependencncy-adjustment.patch new file mode 100644 index 0000000..df86a90 --- /dev/null +++ b/0006-add-dynamic-dependencncy-adjustment.patch @@ -0,0 +1,682 @@ +From 40bb095943d23c0bf271c633fc7ec6379925762a Mon Sep 17 00:00:00 2001 +From: fly_1997 +Date: Wed, 5 Jun 2024 11:08:32 +0800 +Subject: [PATCH] add dynamic dependencncy adjustment + +--- + src/plugin_mgr/dep_handler.cpp | 108 ++++++++-------- + src/plugin_mgr/dep_handler.h | 37 ++++-- + src/plugin_mgr/instance_run_handler.cpp | 156 ++++++++++++++++++++---- + src/plugin_mgr/instance_run_handler.h | 48 +++++--- + src/plugin_mgr/interface.h | 4 +- + src/plugin_mgr/memory_store.h | 10 +- + src/plugin_mgr/plugin_manager.cpp | 41 ++----- + 7 files changed, 270 insertions(+), 134 deletions(-) + +diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp +index 1006175..227ee2f 100644 +--- a/src/plugin_mgr/dep_handler.cpp ++++ b/src/plugin_mgr/dep_handler.cpp +@@ -29,25 +29,47 @@ void DepHandler::add_arc_node(std::shared_ptr node, const std::vectorcnt = dep_nodes.size(); + int real_cnt = 0; + for (auto name : dep_nodes) { +- std::shared_ptr tmp = std::make_shared(); +- tmp->arc_name = name; +- tmp->node_name = node->instance->get_name(); ++ std::string from = node->instance->get_name(); ++ std::shared_ptr tmp = std::make_shared(from, name); + tmp->next = arc_head->next; + arc_head->next = tmp; + + if (nodes.count(name)) { + tmp->is_exist = true; +- arc_nodes[name].insert(tmp); ++ tmp->init = true; + real_cnt++; +- } else { +- tmp->is_exist = false; +- arc_nodes[name].insert(tmp); + } ++ in_edges[name].insert(from); ++ arc_nodes[std::make_pair(from, name)] = tmp; + } + if (real_cnt == node->cnt) { + node->instance->set_state(true); + } + node->real_cnt = real_cnt; ++} ++ ++void DepHandler::add_edge(const std::string &from, const std::string &to) { ++ if (in_edges[to].find(from) != in_edges[to].end()) { ++ auto arc_node = arc_nodes[std::make_pair(from, to)]; ++ arc_node->is_exist = true; ++ return; ++ } ++ ++ auto arc_node = std::make_shared(from, to); ++ auto node = nodes[from]; ++ arc_node->next = node->head->next; ++ node->head->next = arc_node; ++ arc_node->is_exist = true; ++ arc_nodes[std::make_pair(from, to)] = arc_node; ++ in_edges[to].insert(from); ++} ++ ++void DepHandler::delete_edge(const std::string &from, const std::string &to) { ++ if (in_edges[to].find(from) == in_edges[to].end()) { ++ return; ++ } ++ auto arc_node = arc_nodes[std::make_pair(from, to)]; ++ arc_node->is_exist = false; + } + + void DepHandler::add_node(std::shared_ptr instance) { +@@ -81,10 +103,11 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr node) { + while(arc) { + std::shared_ptr tmp = arc->next; + if (arc != node->head){ +- std::string name = arc->arc_name; +- arc_nodes[name].erase(arc); +- if (arc_nodes[name].empty()) { +- arc_nodes.erase(name); ++ std::string name = arc->to; ++ in_edges[name].erase(arc->from); ++ arc_nodes.erase(std::make_pair(arc->from, arc->to)); ++ if (in_edges[name].empty()) { ++ in_edges.erase(name); + } + } + arc = tmp; +@@ -92,15 +115,18 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr node) { + } + + void DepHandler::update_instance_state(const std::string &name) { +- if (!nodes[name]->instance->get_state() || !arc_nodes.count(name)) return; +- std::unordered_set> &arcs = arc_nodes[name]; +- for (auto &arc_node : arcs) { +- if (nodes.count(arc_node->node_name)) { +- auto tmp = nodes[arc_node->node_name]; ++ if (!nodes[name]->instance->get_state() || !in_edges.count(name)) return; ++ std::unordered_set &arcs = in_edges[name]; ++ for (auto &from : arcs) { ++ auto arc_node = arc_nodes[std::make_pair(from, name)]; ++ if (nodes.count(from)) { ++ auto tmp = nodes[from]; + tmp->real_cnt++; + if (tmp->real_cnt == tmp->cnt) { + tmp->instance->set_state(true); + } ++ arc_node->is_exist = true; ++ arc_node->init = true; + update_instance_state(tmp->instance->get_name()); + } + } +@@ -113,54 +139,38 @@ void DepHandler::query_all_dependencies(std::vector> &q + } + + void DepHandler::query_node_top(const std::string &name, std::vector> &query) { +- std::shared_ptr p = nodes[name]->head; +- if (p->next == nullptr) { ++ std::shared_ptr arc_node = nodes[name]->head; ++ if (arc_node->next == nullptr) { + query.emplace_back(std::vector{name}); + return; + } +- while (p->next != nullptr) { +- query.emplace_back(std::vector{name, p->next->arc_name}); +- p = p->next; ++ while (arc_node->next != nullptr) { ++ if (arc_node->next->is_exist) { ++ query.emplace_back(std::vector{name, arc_node->next->to}); ++ } ++ arc_node = arc_node->next; + } + } + + void DepHandler::query_node_dependency(const std::string &name, std::vector> &query) { + if (!nodes.count(name)) return; +- std::queue q; ++ std::queue instance_queue; + std::unordered_set vis; + vis.insert(name); +- q.push(name); +- while (!q.empty()) { +- auto node = nodes[q.front()]; +- q.pop(); ++ instance_queue.push(name); ++ while (!instance_queue.empty()) { ++ auto node = nodes[instance_queue.front()]; ++ instance_queue.pop(); + std::string node_name = node->instance->get_name(); + query.emplace_back(std::vector{node_name}); + for (auto cur = node->head->next; cur != nullptr; cur = cur->next) { +- query.emplace_back(std::vector{node_name, cur->arc_name}); +- if (!vis.count(cur->arc_name) && nodes.count(cur->arc_name)) { +- vis.insert(cur->arc_name); +- q.push(cur->arc_name); ++ if (cur->is_exist) { ++ query.emplace_back(std::vector{node_name, cur->to}); + } +- } +- } +-} +- +-std::vector DepHandler::get_pre_dependencies(const std::string &name) { +- std::vector res; +- std::queue> q; +- std::unordered_set vis; +- vis.insert(name); +- q.push(nodes[name]); +- while (!q.empty()) { +- auto &node = q.front(); +- q.pop(); +- res.emplace_back(node->instance->get_name()); +- for (auto arc_node = node->head->next; arc_node != nullptr; arc_node = arc_node->next) { +- if (!vis.count(arc_node->arc_name)) { +- vis.insert(arc_node->arc_name); +- q.push(nodes[arc_node->arc_name]); ++ if (!vis.count(cur->to) && nodes.count(cur->to)) { ++ vis.insert(cur->to); ++ instance_queue.push(cur->to); + } + } + } +- return res; + } +diff --git a/src/plugin_mgr/dep_handler.h b/src/plugin_mgr/dep_handler.h +index a7a8ef6..322e3b3 100644 +--- a/src/plugin_mgr/dep_handler.h ++++ b/src/plugin_mgr/dep_handler.h +@@ -18,10 +18,12 @@ + + struct ArcNode { + std::shared_ptr next; +- std::string arc_name; +- std::string node_name; +- bool is_exist; +- ArcNode() : next(nullptr), is_exist(false) { } ++ std::string from; ++ std::string to; ++ bool is_exist; /* Whether this edge exists. */ ++ bool init; /* The initial state of the edge. */ ++ ArcNode() { } ++ ArcNode(const std::string &from, const std::string &to) : next(nullptr), from(from), to(to), is_exist(false), init(false) { } + }; + + // a instance node +@@ -29,11 +31,19 @@ struct Node { + std::shared_ptr next; + std::shared_ptr head; + std::shared_ptr instance; +- int cnt; +- int real_cnt; ++ int cnt; /* Number of dependencies required for loading. */ ++ int real_cnt; /* Actual number of dependencies during loading. */ + Node(): next(nullptr), head(nullptr), cnt(0), real_cnt(0) { } + }; + ++struct pair_hash { ++ std::size_t operator() (const std::pair &pair) const { ++ auto h1 = std::hash{}(pair.first); ++ auto h2 = std::hash{}(pair.second); ++ return h1 ^ h2; ++ } ++}; ++ + class DepHandler { + public: + DepHandler() { +@@ -43,19 +53,23 @@ public: + bool get_node_state(const std::string &name) { + return this->nodes[name]->instance->get_state(); + } ++ void delete_edge(const std::string &from, const std::string &to); ++ void add_edge(const std::string &from, const std::string &to); + void add_instance(std::shared_ptr instance); + void delete_instance(const std::string &name); + bool is_instance_exist(const std::string &name); + std::shared_ptr get_instance(const std::string &name) const { ++ if (!nodes.count(name)) { ++ return nullptr; ++ } + return nodes.at(name)->instance; + } + void query_node_dependency(const std::string &name, std::vector> &query); + void query_all_dependencies(std::vector> &query); + /* check whether the instance has dependencies */ + bool have_dep(const std::string &name) { +- return arc_nodes.count(name); ++ return in_edges.count(name); + } +- std::vector get_pre_dependencies(const std::string &name); + private: + void add_node(std::shared_ptr instance); + void del_node(const std::string &name); +@@ -64,8 +78,11 @@ private: + void update_instance_state(const std::string &name); + void del_node_and_arc_nodes(std::shared_ptr node); + std::shared_ptr add_new_node(std::shared_ptr instance); +- /* indegree edges */ +- std::unordered_map>> arc_nodes; ++ /* Indegree edges. */ ++ std::unordered_map> in_edges; ++ /* Store all edges. */ ++ std::unordered_map, std::shared_ptr, pair_hash> arc_nodes; ++ /* Store all points. */ + std::unordered_map> nodes; + std::shared_ptr head; + }; +diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp +index ecd7778..496166b 100644 +--- a/src/plugin_mgr/instance_run_handler.cpp ++++ b/src/plugin_mgr/instance_run_handler.cpp +@@ -20,9 +20,8 @@ static const DataRingBuf* get_ring_buf(std::shared_ptr instance) { + return instance->get_interface()->get_ring_buf(); + } + +-void InstanceRunHandler::run_instance(std::shared_ptr instance) { ++void InstanceRunHandler::run_instance(std::vector &deps, InstanceRun run) { + std::vector input_data; +- std::vector deps = instance->get_deps(); + for (size_t i = 0; i < deps.size(); ++i) { + std::shared_ptr ins = memory_store.get_instance(deps[i]); + input_data.emplace_back(get_ring_buf(ins)); +@@ -30,62 +29,167 @@ void InstanceRunHandler::run_instance(std::shared_ptr instance) { + Param param; + param.ring_bufs = input_data.data(); + param.len = input_data.size(); +- instance->get_interface()->run(¶m); ++ run(¶m); + } + +-void InstanceRunHandler::insert_instance(std::shared_ptr instance, uint64_t time) { +- /* To check if an instance is enabled, enable() is called in the PluginManager. */ +- schedule_queue.push(ScheduleInstance{instance, time}); +- INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue."); ++void InstanceRunHandler::enable_instance(const std::string &name) { ++ std::queue> instance_node_queue; ++ auto dep_handler = memory_store.get_dep_handler(); ++ instance_node_queue.push(dep_handler.get_node(name)); ++ std::vector new_enabled; ++ bool enabled = true; ++ while (!instance_node_queue.empty()) { ++ auto node = instance_node_queue.front(); ++ instance_node_queue.pop(); ++ if (node->instance->get_enabled()) { ++ continue; ++ } ++ auto cur_name = node->instance->get_name(); ++ node->instance->set_enabled(true); ++ new_enabled.emplace_back(cur_name); ++ if (!node->instance->get_interface()->enable()) { ++ enabled = false; ++ break; ++ } ++ for (auto arc = node->head->next; arc != nullptr; arc = arc->next) { ++ if (!arc->is_exist) { ++ continue; ++ } ++ auto cur_node = dep_handler.get_node(arc->to); ++ in_degree[arc->to]++; ++ instance_node_queue.push(cur_node); ++ } ++ } ++ if (!enabled) { ++ for (auto &enabled_name : new_enabled) { ++ auto instance = dep_handler.get_instance(enabled_name); ++ instance->get_interface()->disable(); ++ instance->set_enabled(false); ++ if (enabled_name == name) { ++ continue; ++ } ++ in_degree[enabled_name]--; ++ } ++ } else { ++ for (auto &enabled_name : new_enabled) { ++ auto instance = dep_handler.get_instance(enabled_name); ++ schedule_queue.push(ScheduleInstance{instance, time}); ++ INFO("[InstanceRunHandler] " << enabled_name << " instance insert into schedule queue at time " << time); ++ } ++ } + } + +-void InstanceRunHandler::delete_instance(std::shared_ptr instance) { +- instance->get_interface()->disable(); +- instance->set_enabled(false); +- INFO("[PluginManager] " << instance->get_name() << " instance delete from running queue."); ++void InstanceRunHandler::disable_instance(const std::string &name) { ++ if (in_degree[name] != 0) { ++ return; ++ } ++ std::queue> instance_node_queue; ++ auto dep_handler = memory_store.get_dep_handler(); ++ instance_node_queue.push(dep_handler.get_node(name)); ++ while (!instance_node_queue.empty()) { ++ auto node = instance_node_queue.front(); ++ auto &instance = node->instance; ++ instance_node_queue.pop(); ++ auto cur_name = instance->get_name(); ++ instance->set_enabled(false); ++ instance->get_interface()->disable(); ++ INFO("[InstanceRunHandler] " << cur_name << " instance disabled at time " << time); ++ for (auto arc = node->head->next; arc != nullptr; arc = arc->next) { ++ auto cur_node = dep_handler.get_node(arc->to); ++ arc->is_exist = arc->init; ++ /* The instance can be closed only when the indegree is 0. */ ++ if (in_degree[arc->to] <= 0 || --in_degree[arc->to] != 0) { ++ continue; ++ } ++ instance_node_queue.push(cur_node); ++ } ++ } + } + +-void InstanceRunHandler::handle_instance(uint64_t time) { +- InstanceRunMessage msg; ++void InstanceRunHandler::handle_instance() { ++ std::shared_ptr msg; + while(this->recv_queue_try_pop(msg)){ +- std::shared_ptr instance = msg.get_instance(); +- switch (msg.get_type()){ ++ std::shared_ptr instance = msg->get_instance(); ++ switch (msg->get_type()){ + case RunType::ENABLED: { +- insert_instance(std::move(instance), time); ++ enable_instance(instance->get_name()); + break; + } + case RunType::DISABLED: { +- delete_instance(std::move(instance)); ++ disable_instance(instance->get_name()); + break; + } + } ++ msg->notify_one(); + } + } + +-void InstanceRunHandler::schedule(uint64_t time) { ++void InstanceRunHandler::change_instance_state(const std::string &name, std::vector &deps, ++ std::vector &after_deps) { ++ for (auto &dep : deps) { ++ if (std::find(after_deps.begin(), after_deps.end(), dep) != after_deps.end()) { ++ continue; ++ } ++ auto instance = memory_store.get_instance(dep); ++ if (instance == nullptr) { ++ ERROR("[InstanceRunHandler] ilegal dependency: " << dep); ++ continue; ++ } ++ /* Disable the instance that is not required. */ ++ if (instance->get_enabled()) { ++ memory_store.delete_edge(name, instance->get_name()); ++ in_degree[instance->get_name()]--; ++ auto msg = std::make_shared(RunType::DISABLED, instance); ++ recv_queue_push(std::move(msg)); ++ } ++ } ++ for (auto &after_dep : after_deps) { ++ if (std::find(deps.begin(), deps.end(), after_dep) != deps.end()) { ++ continue; ++ } ++ auto instance = memory_store.get_instance(after_dep); ++ if (instance == nullptr) { ++ ERROR("[InstanceRunHandler] ilegal dependency: " << after_dep); ++ continue; ++ } ++ /* Enable the instance that is required. */ ++ if (!instance->get_enabled()) { ++ in_degree[instance->get_name()]++; ++ memory_store.add_edge(name, instance->get_name()); ++ auto msg = std::make_shared(RunType::ENABLED, instance); ++ recv_queue_push(std::move(msg)); ++ } ++ } ++} ++ ++void InstanceRunHandler::schedule() { + while (!schedule_queue.empty()) { + auto schedule_instance = schedule_queue.top(); ++ auto &instance = schedule_instance.instance; + if (schedule_instance.time != time) { + break; + } + schedule_queue.pop(); +- if (!schedule_instance.instance->get_enabled()) { +- break; ++ if (!instance->get_enabled()) { ++ continue; + } +- run_instance(schedule_instance.instance); +- schedule_instance.time += schedule_instance.instance->get_interface()->get_period(); ++ std::vector deps = instance->get_deps(); ++ run_instance(deps, instance->get_interface()->run); ++ schedule_instance.time += instance->get_interface()->get_period(); + schedule_queue.push(schedule_instance); ++ /* Dynamically change dependencies. */ ++ std::vector after_deps = instance->get_deps(); ++ change_instance_state(instance->get_name(), deps, after_deps); + } + } + + void start(InstanceRunHandler *instance_run_handler) { +- unsigned long long time = 0; + INFO("[PluginManager] instance schedule started!"); + while(true) { +- instance_run_handler->handle_instance(time); +- instance_run_handler->schedule(time); ++ instance_run_handler->handle_instance(); ++ instance_run_handler->schedule(); + usleep(instance_run_handler->get_cycle() * 1000); +- time += instance_run_handler->get_cycle(); ++ instance_run_handler->add_time(instance_run_handler->get_cycle()); + } + } + +diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h +index f0bf263..980fd98 100644 +--- a/src/plugin_mgr/instance_run_handler.h ++++ b/src/plugin_mgr/instance_run_handler.h +@@ -27,16 +27,30 @@ enum class RunType { + class InstanceRunMessage { + public: + InstanceRunMessage() {} +- InstanceRunMessage(RunType type, std::shared_ptr instance) : type(type), instance(instance) {} ++ InstanceRunMessage(RunType type, std::shared_ptr instance) : type(type), instance(instance), finish(false) { } + RunType get_type() { + return type; + } + std::shared_ptr get_instance() { + return instance; + } ++ void wait() { ++ std::unique_lock lock(mutex); ++ cond.wait(lock, [this]() { ++ return finish; ++ }); ++ } ++ void notify_one() { ++ std::unique_lock lock(mutex); ++ finish = true; ++ cond.notify_one(); ++ } + private: + RunType type; + std::shared_ptr instance; ++ std::mutex mutex; ++ std::condition_variable cond; ++ bool finish; + }; + + class ScheduleInstance { +@@ -51,35 +65,39 @@ public: + /* A handler to schedule instances. */ + class InstanceRunHandler { + public: +- explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), cycle(DEFAULT_CYCLE_SIZE) {} ++ using InstanceRun = void (*)(const struct Param*); ++ explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), time(0), cycle(DEFAULT_CYCLE_SIZE) { } + void run(); +- void schedule(uint64_t time); +- void handle_instance(uint64_t time); ++ void schedule(); ++ void handle_instance(); + void set_cycle(int cycle) { + this->cycle = cycle; + } + int get_cycle() { + return cycle; + } +- void recv_queue_push(InstanceRunMessage &msg) { +- this->recv_queue.push(msg); +- } +- void recv_queue_push(InstanceRunMessage &&msg) { ++ void recv_queue_push(std::shared_ptr msg) { + this->recv_queue.push(msg); + } +- bool recv_queue_try_pop(InstanceRunMessage &msg) { ++ bool recv_queue_try_pop(std::shared_ptr &msg) { + return this->recv_queue.try_pop(msg); + } ++ void add_time(uint64_t period) { ++ time += period; ++ } ++private: ++ void run_instance(std::vector &deps, InstanceRun run); ++ void change_instance_state(const std::string &name, std::vector &deps, std::vector &after_deps); ++ void enable_instance(const std::string &name); ++ void disable_instance(const std::string &name); + private: +- void run_instance(std::shared_ptr instance); +- void delete_instance(std::shared_ptr instance); +- void insert_instance(std::shared_ptr instance, uint64_t time); +- + /* Instance execution queue. */ + std::priority_queue schedule_queue; +- /*Receives messages from the PluginManager. */ +- SafeQueue recv_queue; ++ /* Receives messages from the PluginManager. */ ++ SafeQueue> recv_queue; ++ std::unordered_map in_degree; + MemoryStore &memory_store; ++ uint64_t time; + int cycle; + static const int DEFAULT_CYCLE_SIZE = 10; + }; +diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h +index 8aa6933..9dad5b8 100644 +--- a/src/plugin_mgr/interface.h ++++ b/src/plugin_mgr/interface.h +@@ -21,11 +21,11 @@ struct DataBuf { + + struct DataRingBuf { + /* instance name */ +- const char *instance_name; ++ const char *instance_name; + /* buf write index, initial value is -1 */ + int index; + /* instance run times */ +- uint64_t count; ++ uint64_t count; + struct DataBuf *buf; + int buf_len; + }; +diff --git a/src/plugin_mgr/memory_store.h b/src/plugin_mgr/memory_store.h +index 0e862db..a43c461 100644 +--- a/src/plugin_mgr/memory_store.h ++++ b/src/plugin_mgr/memory_store.h +@@ -58,8 +58,14 @@ public: + bool have_dep(const std::string &name) { + return dep_handler.have_dep(name); + } +- std::vector get_pre_dependencies(const std::string &name) { +- return dep_handler.get_pre_dependencies(name); ++ const DepHandler& get_dep_handler() const { ++ return dep_handler; ++ } ++ void add_edge(const std::string &from, const std::string &to) { ++ dep_handler.add_edge(from, to); ++ } ++ void delete_edge(const std::string &from, const std::string &to) { ++ dep_handler.delete_edge(from, to); + } + private: + /* instance are stored in the form of DAG. +diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp +index 4c5f5fc..9ecaa6d 100644 +--- a/src/plugin_mgr/plugin_manager.cpp ++++ b/src/plugin_mgr/plugin_manager.cpp +@@ -22,7 +22,6 @@ void PluginManager::init(std::shared_ptr config, std::shared_ptrhandler_msg = handler_msg; + this->res_msg = res_msg; + instance_run_handler.reset(new InstanceRunHandler(memory_store)); +- pre_load(); + } + + ErrorCode PluginManager::remove(const std::string &name) { +@@ -204,35 +203,14 @@ ErrorCode PluginManager::instance_enabled(const std::string &name) { + if (instance->get_enabled()) { + return ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED; + } +- std::vector pre_dependencies = memory_store.get_pre_dependencies(name); +- std::vector> new_enabled; +- bool enabled = true; +- for (int i = pre_dependencies.size() - 1; i >= 0; --i) { +- instance = memory_store.get_instance(pre_dependencies[i]); +- if (instance->get_enabled()) { +- continue; +- } +- new_enabled.emplace_back(instance); +- if (!instance->get_interface()->enable()) { +- enabled = false; +- WARN("[PluginManager] " << instance->get_name() << " instance enabled failed, because instance init environment failed."); +- break; +- } +- } +- if (enabled) { +- for (int i = pre_dependencies.size() - 1; i >= 0; --i) { +- instance = memory_store.get_instance(pre_dependencies[i]); +- if (instance->get_enabled()) { +- continue; +- } +- instance->set_enabled(true); +- instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance)); +- } +- return ErrorCode::OK; ++ std::shared_ptr msg = std::make_shared(RunType::ENABLED, instance); ++ /* Send message to InstanceRunHandler. */ ++ instance_run_handler->recv_queue_push(msg); ++ /* Wait for InstanceRunHandler to finsh this task. */ ++ msg->wait(); ++ if (msg->get_instance()->get_enabled()) { ++ return ErrorCode::OK; + } else { +- for (auto ins : new_enabled) { +- ins->get_interface()->disable(); +- } + return ErrorCode::ENABLE_INSTANCE_ENV; + } + } +@@ -248,7 +226,9 @@ ErrorCode PluginManager::instance_disabled(const std::string &name) { + if (!instance->get_enabled()) { + return ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED; + } +- instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::DISABLED, instance)); ++ auto msg = std::make_shared(RunType::DISABLED, instance); ++ instance_run_handler->recv_queue_push(msg); ++ msg->wait(); + return ErrorCode::OK; + } + +@@ -397,6 +377,7 @@ bool file_exist(const std::string &file_name) { + + int PluginManager::run() { + instance_run_handler->run(); ++ pre_load(); + while (true) { + Message msg; + Message res; +-- +2.33.0 + diff --git a/oeAware-manager.spec b/oeAware-manager.spec index 85534c4..ffb2f13 100644 --- a/oeAware-manager.spec +++ b/oeAware-manager.spec @@ -1,6 +1,6 @@ Name: oeAware-manager Version: v1.0.1 -Release: 3 +Release: 4 Summary: OeAware server and client License: MulanPSL2 URL: https://gitee.com/openeuler/%{name} @@ -10,7 +10,7 @@ Patch2: 0002-refactor-plugin-interface.patch Patch3: 0003-the-client-adapts-to-the-new-interface-and-refactor-.patch Patch4: 0004-modify-some-interfaces-and-add-interface-comments.patch Patch5: 0005-add-the-SIGINT-signal-processing-and-singleton-class.patch - +Patch6: 0006-add-dynamic-dependencncy-adjustment.patch BuildRequires: cmake make gcc-c++ BuildRequires: boost-devel BuildRequires: curl-devel @@ -54,6 +54,9 @@ install -D -p -m 0644 oeaware.service %{buildroot}%{_unitdir}/oeaware.service %attr(0644, root, root) %{_unitdir}/oeaware.service %changelog +* Wed June 5 2024 fly_1997 -v1.0.1-4 +- add dynamic dependencncy adjustment + * Fri May 31 2024 fly_1997 -v1.0.1-3 - refactor instance interfaces and add signal function