oeAware-manager/0006-add-dynamic-dependencncy-adjustment.patch
fly_1997 dcf1666776 add dynamic dependencncy adjustment
(cherry picked from commit b3d9c6c317ab9c19dfc3c31464d7115f8e733596)
2024-06-05 11:52:57 +08:00

683 lines
27 KiB
Diff

From 40bb095943d23c0bf271c633fc7ec6379925762a Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Wed, 5 Jun 2024 11:08:32 +0800
Subject: [PATCH] add dynamic dependencncy adjustment
---
src/plugin_mgr/dep_handler.cpp | 108 ++++++++--------
src/plugin_mgr/dep_handler.h | 37 ++++--
src/plugin_mgr/instance_run_handler.cpp | 156 ++++++++++++++++++++----
src/plugin_mgr/instance_run_handler.h | 48 +++++---
src/plugin_mgr/interface.h | 4 +-
src/plugin_mgr/memory_store.h | 10 +-
src/plugin_mgr/plugin_manager.cpp | 41 ++-----
7 files changed, 270 insertions(+), 134 deletions(-)
diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp
index 1006175..227ee2f 100644
--- a/src/plugin_mgr/dep_handler.cpp
+++ b/src/plugin_mgr/dep_handler.cpp
@@ -29,25 +29,47 @@ void DepHandler::add_arc_node(std::shared_ptr<Node> node, const std::vector<std:
node->cnt = dep_nodes.size();
int real_cnt = 0;
for (auto name : dep_nodes) {
- std::shared_ptr<ArcNode> tmp = std::make_shared<ArcNode>();
- tmp->arc_name = name;
- tmp->node_name = node->instance->get_name();
+ std::string from = node->instance->get_name();
+ std::shared_ptr<ArcNode> tmp = std::make_shared<ArcNode>(from, name);
tmp->next = arc_head->next;
arc_head->next = tmp;
if (nodes.count(name)) {
tmp->is_exist = true;
- arc_nodes[name].insert(tmp);
+ tmp->init = true;
real_cnt++;
- } else {
- tmp->is_exist = false;
- arc_nodes[name].insert(tmp);
}
+ in_edges[name].insert(from);
+ arc_nodes[std::make_pair(from, name)] = tmp;
}
if (real_cnt == node->cnt) {
node->instance->set_state(true);
}
node->real_cnt = real_cnt;
+}
+
+void DepHandler::add_edge(const std::string &from, const std::string &to) {
+ if (in_edges[to].find(from) != in_edges[to].end()) {
+ auto arc_node = arc_nodes[std::make_pair(from, to)];
+ arc_node->is_exist = true;
+ return;
+ }
+
+ auto arc_node = std::make_shared<ArcNode>(from, to);
+ auto node = nodes[from];
+ arc_node->next = node->head->next;
+ node->head->next = arc_node;
+ arc_node->is_exist = true;
+ arc_nodes[std::make_pair(from, to)] = arc_node;
+ in_edges[to].insert(from);
+}
+
+void DepHandler::delete_edge(const std::string &from, const std::string &to) {
+ if (in_edges[to].find(from) == in_edges[to].end()) {
+ return;
+ }
+ auto arc_node = arc_nodes[std::make_pair(from, to)];
+ arc_node->is_exist = false;
}
void DepHandler::add_node(std::shared_ptr<Instance> instance) {
@@ -81,10 +103,11 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr<Node> node) {
while(arc) {
std::shared_ptr<ArcNode> tmp = arc->next;
if (arc != node->head){
- std::string name = arc->arc_name;
- arc_nodes[name].erase(arc);
- if (arc_nodes[name].empty()) {
- arc_nodes.erase(name);
+ std::string name = arc->to;
+ in_edges[name].erase(arc->from);
+ arc_nodes.erase(std::make_pair(arc->from, arc->to));
+ if (in_edges[name].empty()) {
+ in_edges.erase(name);
}
}
arc = tmp;
@@ -92,15 +115,18 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr<Node> node) {
}
void DepHandler::update_instance_state(const std::string &name) {
- if (!nodes[name]->instance->get_state() || !arc_nodes.count(name)) return;
- std::unordered_set<std::shared_ptr<ArcNode>> &arcs = arc_nodes[name];
- for (auto &arc_node : arcs) {
- if (nodes.count(arc_node->node_name)) {
- auto tmp = nodes[arc_node->node_name];
+ if (!nodes[name]->instance->get_state() || !in_edges.count(name)) return;
+ std::unordered_set<std::string> &arcs = in_edges[name];
+ for (auto &from : arcs) {
+ auto arc_node = arc_nodes[std::make_pair(from, name)];
+ if (nodes.count(from)) {
+ auto tmp = nodes[from];
tmp->real_cnt++;
if (tmp->real_cnt == tmp->cnt) {
tmp->instance->set_state(true);
}
+ arc_node->is_exist = true;
+ arc_node->init = true;
update_instance_state(tmp->instance->get_name());
}
}
@@ -113,54 +139,38 @@ void DepHandler::query_all_dependencies(std::vector<std::vector<std::string>> &q
}
void DepHandler::query_node_top(const std::string &name, std::vector<std::vector<std::string>> &query) {
- std::shared_ptr<ArcNode> p = nodes[name]->head;
- if (p->next == nullptr) {
+ std::shared_ptr<ArcNode> arc_node = nodes[name]->head;
+ if (arc_node->next == nullptr) {
query.emplace_back(std::vector<std::string>{name});
return;
}
- while (p->next != nullptr) {
- query.emplace_back(std::vector<std::string>{name, p->next->arc_name});
- p = p->next;
+ while (arc_node->next != nullptr) {
+ if (arc_node->next->is_exist) {
+ query.emplace_back(std::vector<std::string>{name, arc_node->next->to});
+ }
+ arc_node = arc_node->next;
}
}
void DepHandler::query_node_dependency(const std::string &name, std::vector<std::vector<std::string>> &query) {
if (!nodes.count(name)) return;
- std::queue<std::string> q;
+ std::queue<std::string> instance_queue;
std::unordered_set<std::string> vis;
vis.insert(name);
- q.push(name);
- while (!q.empty()) {
- auto node = nodes[q.front()];
- q.pop();
+ instance_queue.push(name);
+ while (!instance_queue.empty()) {
+ auto node = nodes[instance_queue.front()];
+ instance_queue.pop();
std::string node_name = node->instance->get_name();
query.emplace_back(std::vector<std::string>{node_name});
for (auto cur = node->head->next; cur != nullptr; cur = cur->next) {
- query.emplace_back(std::vector<std::string>{node_name, cur->arc_name});
- if (!vis.count(cur->arc_name) && nodes.count(cur->arc_name)) {
- vis.insert(cur->arc_name);
- q.push(cur->arc_name);
+ if (cur->is_exist) {
+ query.emplace_back(std::vector<std::string>{node_name, cur->to});
}
- }
- }
-}
-
-std::vector<std::string> DepHandler::get_pre_dependencies(const std::string &name) {
- std::vector<std::string> res;
- std::queue<std::shared_ptr<Node>> q;
- std::unordered_set<std::string> vis;
- vis.insert(name);
- q.push(nodes[name]);
- while (!q.empty()) {
- auto &node = q.front();
- q.pop();
- res.emplace_back(node->instance->get_name());
- for (auto arc_node = node->head->next; arc_node != nullptr; arc_node = arc_node->next) {
- if (!vis.count(arc_node->arc_name)) {
- vis.insert(arc_node->arc_name);
- q.push(nodes[arc_node->arc_name]);
+ if (!vis.count(cur->to) && nodes.count(cur->to)) {
+ vis.insert(cur->to);
+ instance_queue.push(cur->to);
}
}
}
- return res;
}
diff --git a/src/plugin_mgr/dep_handler.h b/src/plugin_mgr/dep_handler.h
index a7a8ef6..322e3b3 100644
--- a/src/plugin_mgr/dep_handler.h
+++ b/src/plugin_mgr/dep_handler.h
@@ -18,10 +18,12 @@
struct ArcNode {
std::shared_ptr<ArcNode> next;
- std::string arc_name;
- std::string node_name;
- bool is_exist;
- ArcNode() : next(nullptr), is_exist(false) { }
+ std::string from;
+ std::string to;
+ bool is_exist; /* Whether this edge exists. */
+ bool init; /* The initial state of the edge. */
+ ArcNode() { }
+ ArcNode(const std::string &from, const std::string &to) : next(nullptr), from(from), to(to), is_exist(false), init(false) { }
};
// a instance node
@@ -29,11 +31,19 @@ struct Node {
std::shared_ptr<Node> next;
std::shared_ptr<ArcNode> head;
std::shared_ptr<Instance> instance;
- int cnt;
- int real_cnt;
+ int cnt; /* Number of dependencies required for loading. */
+ int real_cnt; /* Actual number of dependencies during loading. */
Node(): next(nullptr), head(nullptr), cnt(0), real_cnt(0) { }
};
+struct pair_hash {
+ std::size_t operator() (const std::pair<std::string, std::string> &pair) const {
+ auto h1 = std::hash<std::string>{}(pair.first);
+ auto h2 = std::hash<std::string>{}(pair.second);
+ return h1 ^ h2;
+ }
+};
+
class DepHandler {
public:
DepHandler() {
@@ -43,19 +53,23 @@ public:
bool get_node_state(const std::string &name) {
return this->nodes[name]->instance->get_state();
}
+ void delete_edge(const std::string &from, const std::string &to);
+ void add_edge(const std::string &from, const std::string &to);
void add_instance(std::shared_ptr<Instance> instance);
void delete_instance(const std::string &name);
bool is_instance_exist(const std::string &name);
std::shared_ptr<Instance> get_instance(const std::string &name) const {
+ if (!nodes.count(name)) {
+ return nullptr;
+ }
return nodes.at(name)->instance;
}
void query_node_dependency(const std::string &name, std::vector<std::vector<std::string>> &query);
void query_all_dependencies(std::vector<std::vector<std::string>> &query);
/* check whether the instance has dependencies */
bool have_dep(const std::string &name) {
- return arc_nodes.count(name);
+ return in_edges.count(name);
}
- std::vector<std::string> get_pre_dependencies(const std::string &name);
private:
void add_node(std::shared_ptr<Instance> instance);
void del_node(const std::string &name);
@@ -64,8 +78,11 @@ private:
void update_instance_state(const std::string &name);
void del_node_and_arc_nodes(std::shared_ptr<Node> node);
std::shared_ptr<Node> add_new_node(std::shared_ptr<Instance> instance);
- /* indegree edges */
- std::unordered_map<std::string, std::unordered_set<std::shared_ptr<ArcNode>>> arc_nodes;
+ /* Indegree edges. */
+ std::unordered_map<std::string, std::unordered_set<std::string>> in_edges;
+ /* Store all edges. */
+ std::unordered_map<std::pair<std::string, std::string>, std::shared_ptr<ArcNode>, pair_hash> arc_nodes;
+ /* Store all points. */
std::unordered_map<std::string, std::shared_ptr<Node>> nodes;
std::shared_ptr<Node> head;
};
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index ecd7778..496166b 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -20,9 +20,8 @@ static const DataRingBuf* get_ring_buf(std::shared_ptr<Instance> instance) {
return instance->get_interface()->get_ring_buf();
}
-void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
+void InstanceRunHandler::run_instance(std::vector<std::string> &deps, InstanceRun run) {
std::vector<const DataRingBuf*> input_data;
- std::vector<std::string> deps = instance->get_deps();
for (size_t i = 0; i < deps.size(); ++i) {
std::shared_ptr<Instance> ins = memory_store.get_instance(deps[i]);
input_data.emplace_back(get_ring_buf(ins));
@@ -30,62 +29,167 @@ void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
Param param;
param.ring_bufs = input_data.data();
param.len = input_data.size();
- instance->get_interface()->run(&param);
+ run(&param);
}
-void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance, uint64_t time) {
- /* To check if an instance is enabled, enable() is called in the PluginManager. */
- schedule_queue.push(ScheduleInstance{instance, time});
- INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
+void InstanceRunHandler::enable_instance(const std::string &name) {
+ std::queue<std::shared_ptr<Node>> instance_node_queue;
+ auto dep_handler = memory_store.get_dep_handler();
+ instance_node_queue.push(dep_handler.get_node(name));
+ std::vector<std::string> new_enabled;
+ bool enabled = true;
+ while (!instance_node_queue.empty()) {
+ auto node = instance_node_queue.front();
+ instance_node_queue.pop();
+ if (node->instance->get_enabled()) {
+ continue;
+ }
+ auto cur_name = node->instance->get_name();
+ node->instance->set_enabled(true);
+ new_enabled.emplace_back(cur_name);
+ if (!node->instance->get_interface()->enable()) {
+ enabled = false;
+ break;
+ }
+ for (auto arc = node->head->next; arc != nullptr; arc = arc->next) {
+ if (!arc->is_exist) {
+ continue;
+ }
+ auto cur_node = dep_handler.get_node(arc->to);
+ in_degree[arc->to]++;
+ instance_node_queue.push(cur_node);
+ }
+ }
+ if (!enabled) {
+ for (auto &enabled_name : new_enabled) {
+ auto instance = dep_handler.get_instance(enabled_name);
+ instance->get_interface()->disable();
+ instance->set_enabled(false);
+ if (enabled_name == name) {
+ continue;
+ }
+ in_degree[enabled_name]--;
+ }
+ } else {
+ for (auto &enabled_name : new_enabled) {
+ auto instance = dep_handler.get_instance(enabled_name);
+ schedule_queue.push(ScheduleInstance{instance, time});
+ INFO("[InstanceRunHandler] " << enabled_name << " instance insert into schedule queue at time " << time);
+ }
+ }
}
-void InstanceRunHandler::delete_instance(std::shared_ptr<Instance> instance) {
- instance->get_interface()->disable();
- instance->set_enabled(false);
- INFO("[PluginManager] " << instance->get_name() << " instance delete from running queue.");
+void InstanceRunHandler::disable_instance(const std::string &name) {
+ if (in_degree[name] != 0) {
+ return;
+ }
+ std::queue<std::shared_ptr<Node>> instance_node_queue;
+ auto dep_handler = memory_store.get_dep_handler();
+ instance_node_queue.push(dep_handler.get_node(name));
+ while (!instance_node_queue.empty()) {
+ auto node = instance_node_queue.front();
+ auto &instance = node->instance;
+ instance_node_queue.pop();
+ auto cur_name = instance->get_name();
+ instance->set_enabled(false);
+ instance->get_interface()->disable();
+ INFO("[InstanceRunHandler] " << cur_name << " instance disabled at time " << time);
+ for (auto arc = node->head->next; arc != nullptr; arc = arc->next) {
+ auto cur_node = dep_handler.get_node(arc->to);
+ arc->is_exist = arc->init;
+ /* The instance can be closed only when the indegree is 0. */
+ if (in_degree[arc->to] <= 0 || --in_degree[arc->to] != 0) {
+ continue;
+ }
+ instance_node_queue.push(cur_node);
+ }
+ }
}
-void InstanceRunHandler::handle_instance(uint64_t time) {
- InstanceRunMessage msg;
+void InstanceRunHandler::handle_instance() {
+ std::shared_ptr<InstanceRunMessage> msg;
while(this->recv_queue_try_pop(msg)){
- std::shared_ptr<Instance> instance = msg.get_instance();
- switch (msg.get_type()){
+ std::shared_ptr<Instance> instance = msg->get_instance();
+ switch (msg->get_type()){
case RunType::ENABLED: {
- insert_instance(std::move(instance), time);
+ enable_instance(instance->get_name());
break;
}
case RunType::DISABLED: {
- delete_instance(std::move(instance));
+ disable_instance(instance->get_name());
break;
}
}
+ msg->notify_one();
}
}
-void InstanceRunHandler::schedule(uint64_t time) {
+void InstanceRunHandler::change_instance_state(const std::string &name, std::vector<std::string> &deps,
+ std::vector<std::string> &after_deps) {
+ for (auto &dep : deps) {
+ if (std::find(after_deps.begin(), after_deps.end(), dep) != after_deps.end()) {
+ continue;
+ }
+ auto instance = memory_store.get_instance(dep);
+ if (instance == nullptr) {
+ ERROR("[InstanceRunHandler] ilegal dependency: " << dep);
+ continue;
+ }
+ /* Disable the instance that is not required. */
+ if (instance->get_enabled()) {
+ memory_store.delete_edge(name, instance->get_name());
+ in_degree[instance->get_name()]--;
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::DISABLED, instance);
+ recv_queue_push(std::move(msg));
+ }
+ }
+ for (auto &after_dep : after_deps) {
+ if (std::find(deps.begin(), deps.end(), after_dep) != deps.end()) {
+ continue;
+ }
+ auto instance = memory_store.get_instance(after_dep);
+ if (instance == nullptr) {
+ ERROR("[InstanceRunHandler] ilegal dependency: " << after_dep);
+ continue;
+ }
+ /* Enable the instance that is required. */
+ if (!instance->get_enabled()) {
+ in_degree[instance->get_name()]++;
+ memory_store.add_edge(name, instance->get_name());
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::ENABLED, instance);
+ recv_queue_push(std::move(msg));
+ }
+ }
+}
+
+void InstanceRunHandler::schedule() {
while (!schedule_queue.empty()) {
auto schedule_instance = schedule_queue.top();
+ auto &instance = schedule_instance.instance;
if (schedule_instance.time != time) {
break;
}
schedule_queue.pop();
- if (!schedule_instance.instance->get_enabled()) {
- break;
+ if (!instance->get_enabled()) {
+ continue;
}
- run_instance(schedule_instance.instance);
- schedule_instance.time += schedule_instance.instance->get_interface()->get_period();
+ std::vector<std::string> deps = instance->get_deps();
+ run_instance(deps, instance->get_interface()->run);
+ schedule_instance.time += instance->get_interface()->get_period();
schedule_queue.push(schedule_instance);
+ /* Dynamically change dependencies. */
+ std::vector<std::string> after_deps = instance->get_deps();
+ change_instance_state(instance->get_name(), deps, after_deps);
}
}
void start(InstanceRunHandler *instance_run_handler) {
- unsigned long long time = 0;
INFO("[PluginManager] instance schedule started!");
while(true) {
- instance_run_handler->handle_instance(time);
- instance_run_handler->schedule(time);
+ instance_run_handler->handle_instance();
+ instance_run_handler->schedule();
usleep(instance_run_handler->get_cycle() * 1000);
- time += instance_run_handler->get_cycle();
+ instance_run_handler->add_time(instance_run_handler->get_cycle());
}
}
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index f0bf263..980fd98 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -27,16 +27,30 @@ enum class RunType {
class InstanceRunMessage {
public:
InstanceRunMessage() {}
- InstanceRunMessage(RunType type, std::shared_ptr<Instance> instance) : type(type), instance(instance) {}
+ InstanceRunMessage(RunType type, std::shared_ptr<Instance> instance) : type(type), instance(instance), finish(false) { }
RunType get_type() {
return type;
}
std::shared_ptr<Instance> get_instance() {
return instance;
}
+ void wait() {
+ std::unique_lock<std::mutex> lock(mutex);
+ cond.wait(lock, [this]() {
+ return finish;
+ });
+ }
+ void notify_one() {
+ std::unique_lock<std::mutex> lock(mutex);
+ finish = true;
+ cond.notify_one();
+ }
private:
RunType type;
std::shared_ptr<Instance> instance;
+ std::mutex mutex;
+ std::condition_variable cond;
+ bool finish;
};
class ScheduleInstance {
@@ -51,35 +65,39 @@ public:
/* A handler to schedule instances. */
class InstanceRunHandler {
public:
- explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), cycle(DEFAULT_CYCLE_SIZE) {}
+ using InstanceRun = void (*)(const struct Param*);
+ explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), time(0), cycle(DEFAULT_CYCLE_SIZE) { }
void run();
- void schedule(uint64_t time);
- void handle_instance(uint64_t time);
+ void schedule();
+ void handle_instance();
void set_cycle(int cycle) {
this->cycle = cycle;
}
int get_cycle() {
return cycle;
}
- void recv_queue_push(InstanceRunMessage &msg) {
- this->recv_queue.push(msg);
- }
- void recv_queue_push(InstanceRunMessage &&msg) {
+ void recv_queue_push(std::shared_ptr<InstanceRunMessage> msg) {
this->recv_queue.push(msg);
}
- bool recv_queue_try_pop(InstanceRunMessage &msg) {
+ bool recv_queue_try_pop(std::shared_ptr<InstanceRunMessage> &msg) {
return this->recv_queue.try_pop(msg);
}
+ void add_time(uint64_t period) {
+ time += period;
+ }
+private:
+ void run_instance(std::vector<std::string> &deps, InstanceRun run);
+ void change_instance_state(const std::string &name, std::vector<std::string> &deps, std::vector<std::string> &after_deps);
+ void enable_instance(const std::string &name);
+ void disable_instance(const std::string &name);
private:
- void run_instance(std::shared_ptr<Instance> instance);
- void delete_instance(std::shared_ptr<Instance> instance);
- void insert_instance(std::shared_ptr<Instance> instance, uint64_t time);
-
/* Instance execution queue. */
std::priority_queue<ScheduleInstance> schedule_queue;
- /*Receives messages from the PluginManager. */
- SafeQueue<InstanceRunMessage> recv_queue;
+ /* Receives messages from the PluginManager. */
+ SafeQueue<std::shared_ptr<InstanceRunMessage>> recv_queue;
+ std::unordered_map<std::string, int> in_degree;
MemoryStore &memory_store;
+ uint64_t time;
int cycle;
static const int DEFAULT_CYCLE_SIZE = 10;
};
diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h
index 8aa6933..9dad5b8 100644
--- a/src/plugin_mgr/interface.h
+++ b/src/plugin_mgr/interface.h
@@ -21,11 +21,11 @@ struct DataBuf {
struct DataRingBuf {
/* instance name */
- const char *instance_name;
+ const char *instance_name;
/* buf write index, initial value is -1 */
int index;
/* instance run times */
- uint64_t count;
+ uint64_t count;
struct DataBuf *buf;
int buf_len;
};
diff --git a/src/plugin_mgr/memory_store.h b/src/plugin_mgr/memory_store.h
index 0e862db..a43c461 100644
--- a/src/plugin_mgr/memory_store.h
+++ b/src/plugin_mgr/memory_store.h
@@ -58,8 +58,14 @@ public:
bool have_dep(const std::string &name) {
return dep_handler.have_dep(name);
}
- std::vector<std::string> get_pre_dependencies(const std::string &name) {
- return dep_handler.get_pre_dependencies(name);
+ const DepHandler& get_dep_handler() const {
+ return dep_handler;
+ }
+ void add_edge(const std::string &from, const std::string &to) {
+ dep_handler.add_edge(from, to);
+ }
+ void delete_edge(const std::string &from, const std::string &to) {
+ dep_handler.delete_edge(from, to);
}
private:
/* instance are stored in the form of DAG.
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index 4c5f5fc..9ecaa6d 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -22,7 +22,6 @@ void PluginManager::init(std::shared_ptr<Config> config, std::shared_ptr<SafeQue
this->handler_msg = handler_msg;
this->res_msg = res_msg;
instance_run_handler.reset(new InstanceRunHandler(memory_store));
- pre_load();
}
ErrorCode PluginManager::remove(const std::string &name) {
@@ -204,35 +203,14 @@ ErrorCode PluginManager::instance_enabled(const std::string &name) {
if (instance->get_enabled()) {
return ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED;
}
- std::vector<std::string> pre_dependencies = memory_store.get_pre_dependencies(name);
- std::vector<std::shared_ptr<Instance>> new_enabled;
- bool enabled = true;
- for (int i = pre_dependencies.size() - 1; i >= 0; --i) {
- instance = memory_store.get_instance(pre_dependencies[i]);
- if (instance->get_enabled()) {
- continue;
- }
- new_enabled.emplace_back(instance);
- if (!instance->get_interface()->enable()) {
- enabled = false;
- WARN("[PluginManager] " << instance->get_name() << " instance enabled failed, because instance init environment failed.");
- break;
- }
- }
- if (enabled) {
- for (int i = pre_dependencies.size() - 1; i >= 0; --i) {
- instance = memory_store.get_instance(pre_dependencies[i]);
- if (instance->get_enabled()) {
- continue;
- }
- instance->set_enabled(true);
- instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance));
- }
- return ErrorCode::OK;
+ std::shared_ptr<InstanceRunMessage> msg = std::make_shared<InstanceRunMessage>(RunType::ENABLED, instance);
+ /* Send message to InstanceRunHandler. */
+ instance_run_handler->recv_queue_push(msg);
+ /* Wait for InstanceRunHandler to finsh this task. */
+ msg->wait();
+ if (msg->get_instance()->get_enabled()) {
+ return ErrorCode::OK;
} else {
- for (auto ins : new_enabled) {
- ins->get_interface()->disable();
- }
return ErrorCode::ENABLE_INSTANCE_ENV;
}
}
@@ -248,7 +226,9 @@ ErrorCode PluginManager::instance_disabled(const std::string &name) {
if (!instance->get_enabled()) {
return ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED;
}
- instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::DISABLED, instance));
+ auto msg = std::make_shared<InstanceRunMessage>(RunType::DISABLED, instance);
+ instance_run_handler->recv_queue_push(msg);
+ msg->wait();
return ErrorCode::OK;
}
@@ -397,6 +377,7 @@ bool file_exist(const std::string &file_name) {
int PluginManager::run() {
instance_run_handler->run();
+ pre_load();
while (true) {
Message msg;
Message res;
--
2.33.0