From 40bb095943d23c0bf271c633fc7ec6379925762a Mon Sep 17 00:00:00 2001 From: fly_1997 Date: Wed, 5 Jun 2024 11:08:32 +0800 Subject: [PATCH] add dynamic dependencncy adjustment --- src/plugin_mgr/dep_handler.cpp | 108 ++++++++-------- src/plugin_mgr/dep_handler.h | 37 ++++-- src/plugin_mgr/instance_run_handler.cpp | 156 ++++++++++++++++++++---- src/plugin_mgr/instance_run_handler.h | 48 +++++--- src/plugin_mgr/interface.h | 4 +- src/plugin_mgr/memory_store.h | 10 +- src/plugin_mgr/plugin_manager.cpp | 41 ++----- 7 files changed, 270 insertions(+), 134 deletions(-) diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp index 1006175..227ee2f 100644 --- a/src/plugin_mgr/dep_handler.cpp +++ b/src/plugin_mgr/dep_handler.cpp @@ -29,25 +29,47 @@ void DepHandler::add_arc_node(std::shared_ptr node, const std::vectorcnt = dep_nodes.size(); int real_cnt = 0; for (auto name : dep_nodes) { - std::shared_ptr tmp = std::make_shared(); - tmp->arc_name = name; - tmp->node_name = node->instance->get_name(); + std::string from = node->instance->get_name(); + std::shared_ptr tmp = std::make_shared(from, name); tmp->next = arc_head->next; arc_head->next = tmp; if (nodes.count(name)) { tmp->is_exist = true; - arc_nodes[name].insert(tmp); + tmp->init = true; real_cnt++; - } else { - tmp->is_exist = false; - arc_nodes[name].insert(tmp); } + in_edges[name].insert(from); + arc_nodes[std::make_pair(from, name)] = tmp; } if (real_cnt == node->cnt) { node->instance->set_state(true); } node->real_cnt = real_cnt; +} + +void DepHandler::add_edge(const std::string &from, const std::string &to) { + if (in_edges[to].find(from) != in_edges[to].end()) { + auto arc_node = arc_nodes[std::make_pair(from, to)]; + arc_node->is_exist = true; + return; + } + + auto arc_node = std::make_shared(from, to); + auto node = nodes[from]; + arc_node->next = node->head->next; + node->head->next = arc_node; + arc_node->is_exist = true; + arc_nodes[std::make_pair(from, to)] = arc_node; + in_edges[to].insert(from); +} + +void DepHandler::delete_edge(const std::string &from, const std::string &to) { + if (in_edges[to].find(from) == in_edges[to].end()) { + return; + } + auto arc_node = arc_nodes[std::make_pair(from, to)]; + arc_node->is_exist = false; } void DepHandler::add_node(std::shared_ptr instance) { @@ -81,10 +103,11 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr node) { while(arc) { std::shared_ptr tmp = arc->next; if (arc != node->head){ - std::string name = arc->arc_name; - arc_nodes[name].erase(arc); - if (arc_nodes[name].empty()) { - arc_nodes.erase(name); + std::string name = arc->to; + in_edges[name].erase(arc->from); + arc_nodes.erase(std::make_pair(arc->from, arc->to)); + if (in_edges[name].empty()) { + in_edges.erase(name); } } arc = tmp; @@ -92,15 +115,18 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr node) { } void DepHandler::update_instance_state(const std::string &name) { - if (!nodes[name]->instance->get_state() || !arc_nodes.count(name)) return; - std::unordered_set> &arcs = arc_nodes[name]; - for (auto &arc_node : arcs) { - if (nodes.count(arc_node->node_name)) { - auto tmp = nodes[arc_node->node_name]; + if (!nodes[name]->instance->get_state() || !in_edges.count(name)) return; + std::unordered_set &arcs = in_edges[name]; + for (auto &from : arcs) { + auto arc_node = arc_nodes[std::make_pair(from, name)]; + if (nodes.count(from)) { + auto tmp = nodes[from]; tmp->real_cnt++; if (tmp->real_cnt == tmp->cnt) { tmp->instance->set_state(true); } + arc_node->is_exist = true; + arc_node->init = true; update_instance_state(tmp->instance->get_name()); } } @@ -113,54 +139,38 @@ void DepHandler::query_all_dependencies(std::vector> &q } void DepHandler::query_node_top(const std::string &name, std::vector> &query) { - std::shared_ptr p = nodes[name]->head; - if (p->next == nullptr) { + std::shared_ptr arc_node = nodes[name]->head; + if (arc_node->next == nullptr) { query.emplace_back(std::vector{name}); return; } - while (p->next != nullptr) { - query.emplace_back(std::vector{name, p->next->arc_name}); - p = p->next; + while (arc_node->next != nullptr) { + if (arc_node->next->is_exist) { + query.emplace_back(std::vector{name, arc_node->next->to}); + } + arc_node = arc_node->next; } } void DepHandler::query_node_dependency(const std::string &name, std::vector> &query) { if (!nodes.count(name)) return; - std::queue q; + std::queue instance_queue; std::unordered_set vis; vis.insert(name); - q.push(name); - while (!q.empty()) { - auto node = nodes[q.front()]; - q.pop(); + instance_queue.push(name); + while (!instance_queue.empty()) { + auto node = nodes[instance_queue.front()]; + instance_queue.pop(); std::string node_name = node->instance->get_name(); query.emplace_back(std::vector{node_name}); for (auto cur = node->head->next; cur != nullptr; cur = cur->next) { - query.emplace_back(std::vector{node_name, cur->arc_name}); - if (!vis.count(cur->arc_name) && nodes.count(cur->arc_name)) { - vis.insert(cur->arc_name); - q.push(cur->arc_name); + if (cur->is_exist) { + query.emplace_back(std::vector{node_name, cur->to}); } - } - } -} - -std::vector DepHandler::get_pre_dependencies(const std::string &name) { - std::vector res; - std::queue> q; - std::unordered_set vis; - vis.insert(name); - q.push(nodes[name]); - while (!q.empty()) { - auto &node = q.front(); - q.pop(); - res.emplace_back(node->instance->get_name()); - for (auto arc_node = node->head->next; arc_node != nullptr; arc_node = arc_node->next) { - if (!vis.count(arc_node->arc_name)) { - vis.insert(arc_node->arc_name); - q.push(nodes[arc_node->arc_name]); + if (!vis.count(cur->to) && nodes.count(cur->to)) { + vis.insert(cur->to); + instance_queue.push(cur->to); } } } - return res; } diff --git a/src/plugin_mgr/dep_handler.h b/src/plugin_mgr/dep_handler.h index a7a8ef6..322e3b3 100644 --- a/src/plugin_mgr/dep_handler.h +++ b/src/plugin_mgr/dep_handler.h @@ -18,10 +18,12 @@ struct ArcNode { std::shared_ptr next; - std::string arc_name; - std::string node_name; - bool is_exist; - ArcNode() : next(nullptr), is_exist(false) { } + std::string from; + std::string to; + bool is_exist; /* Whether this edge exists. */ + bool init; /* The initial state of the edge. */ + ArcNode() { } + ArcNode(const std::string &from, const std::string &to) : next(nullptr), from(from), to(to), is_exist(false), init(false) { } }; // a instance node @@ -29,11 +31,19 @@ struct Node { std::shared_ptr next; std::shared_ptr head; std::shared_ptr instance; - int cnt; - int real_cnt; + int cnt; /* Number of dependencies required for loading. */ + int real_cnt; /* Actual number of dependencies during loading. */ Node(): next(nullptr), head(nullptr), cnt(0), real_cnt(0) { } }; +struct pair_hash { + std::size_t operator() (const std::pair &pair) const { + auto h1 = std::hash{}(pair.first); + auto h2 = std::hash{}(pair.second); + return h1 ^ h2; + } +}; + class DepHandler { public: DepHandler() { @@ -43,19 +53,23 @@ public: bool get_node_state(const std::string &name) { return this->nodes[name]->instance->get_state(); } + void delete_edge(const std::string &from, const std::string &to); + void add_edge(const std::string &from, const std::string &to); void add_instance(std::shared_ptr instance); void delete_instance(const std::string &name); bool is_instance_exist(const std::string &name); std::shared_ptr get_instance(const std::string &name) const { + if (!nodes.count(name)) { + return nullptr; + } return nodes.at(name)->instance; } void query_node_dependency(const std::string &name, std::vector> &query); void query_all_dependencies(std::vector> &query); /* check whether the instance has dependencies */ bool have_dep(const std::string &name) { - return arc_nodes.count(name); + return in_edges.count(name); } - std::vector get_pre_dependencies(const std::string &name); private: void add_node(std::shared_ptr instance); void del_node(const std::string &name); @@ -64,8 +78,11 @@ private: void update_instance_state(const std::string &name); void del_node_and_arc_nodes(std::shared_ptr node); std::shared_ptr add_new_node(std::shared_ptr instance); - /* indegree edges */ - std::unordered_map>> arc_nodes; + /* Indegree edges. */ + std::unordered_map> in_edges; + /* Store all edges. */ + std::unordered_map, std::shared_ptr, pair_hash> arc_nodes; + /* Store all points. */ std::unordered_map> nodes; std::shared_ptr head; }; diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp index ecd7778..496166b 100644 --- a/src/plugin_mgr/instance_run_handler.cpp +++ b/src/plugin_mgr/instance_run_handler.cpp @@ -20,9 +20,8 @@ static const DataRingBuf* get_ring_buf(std::shared_ptr instance) { return instance->get_interface()->get_ring_buf(); } -void InstanceRunHandler::run_instance(std::shared_ptr instance) { +void InstanceRunHandler::run_instance(std::vector &deps, InstanceRun run) { std::vector input_data; - std::vector deps = instance->get_deps(); for (size_t i = 0; i < deps.size(); ++i) { std::shared_ptr ins = memory_store.get_instance(deps[i]); input_data.emplace_back(get_ring_buf(ins)); @@ -30,62 +29,167 @@ void InstanceRunHandler::run_instance(std::shared_ptr instance) { Param param; param.ring_bufs = input_data.data(); param.len = input_data.size(); - instance->get_interface()->run(¶m); + run(¶m); } -void InstanceRunHandler::insert_instance(std::shared_ptr instance, uint64_t time) { - /* To check if an instance is enabled, enable() is called in the PluginManager. */ - schedule_queue.push(ScheduleInstance{instance, time}); - INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue."); +void InstanceRunHandler::enable_instance(const std::string &name) { + std::queue> instance_node_queue; + auto dep_handler = memory_store.get_dep_handler(); + instance_node_queue.push(dep_handler.get_node(name)); + std::vector new_enabled; + bool enabled = true; + while (!instance_node_queue.empty()) { + auto node = instance_node_queue.front(); + instance_node_queue.pop(); + if (node->instance->get_enabled()) { + continue; + } + auto cur_name = node->instance->get_name(); + node->instance->set_enabled(true); + new_enabled.emplace_back(cur_name); + if (!node->instance->get_interface()->enable()) { + enabled = false; + break; + } + for (auto arc = node->head->next; arc != nullptr; arc = arc->next) { + if (!arc->is_exist) { + continue; + } + auto cur_node = dep_handler.get_node(arc->to); + in_degree[arc->to]++; + instance_node_queue.push(cur_node); + } + } + if (!enabled) { + for (auto &enabled_name : new_enabled) { + auto instance = dep_handler.get_instance(enabled_name); + instance->get_interface()->disable(); + instance->set_enabled(false); + if (enabled_name == name) { + continue; + } + in_degree[enabled_name]--; + } + } else { + for (auto &enabled_name : new_enabled) { + auto instance = dep_handler.get_instance(enabled_name); + schedule_queue.push(ScheduleInstance{instance, time}); + INFO("[InstanceRunHandler] " << enabled_name << " instance insert into schedule queue at time " << time); + } + } } -void InstanceRunHandler::delete_instance(std::shared_ptr instance) { - instance->get_interface()->disable(); - instance->set_enabled(false); - INFO("[PluginManager] " << instance->get_name() << " instance delete from running queue."); +void InstanceRunHandler::disable_instance(const std::string &name) { + if (in_degree[name] != 0) { + return; + } + std::queue> instance_node_queue; + auto dep_handler = memory_store.get_dep_handler(); + instance_node_queue.push(dep_handler.get_node(name)); + while (!instance_node_queue.empty()) { + auto node = instance_node_queue.front(); + auto &instance = node->instance; + instance_node_queue.pop(); + auto cur_name = instance->get_name(); + instance->set_enabled(false); + instance->get_interface()->disable(); + INFO("[InstanceRunHandler] " << cur_name << " instance disabled at time " << time); + for (auto arc = node->head->next; arc != nullptr; arc = arc->next) { + auto cur_node = dep_handler.get_node(arc->to); + arc->is_exist = arc->init; + /* The instance can be closed only when the indegree is 0. */ + if (in_degree[arc->to] <= 0 || --in_degree[arc->to] != 0) { + continue; + } + instance_node_queue.push(cur_node); + } + } } -void InstanceRunHandler::handle_instance(uint64_t time) { - InstanceRunMessage msg; +void InstanceRunHandler::handle_instance() { + std::shared_ptr msg; while(this->recv_queue_try_pop(msg)){ - std::shared_ptr instance = msg.get_instance(); - switch (msg.get_type()){ + std::shared_ptr instance = msg->get_instance(); + switch (msg->get_type()){ case RunType::ENABLED: { - insert_instance(std::move(instance), time); + enable_instance(instance->get_name()); break; } case RunType::DISABLED: { - delete_instance(std::move(instance)); + disable_instance(instance->get_name()); break; } } + msg->notify_one(); } } -void InstanceRunHandler::schedule(uint64_t time) { +void InstanceRunHandler::change_instance_state(const std::string &name, std::vector &deps, + std::vector &after_deps) { + for (auto &dep : deps) { + if (std::find(after_deps.begin(), after_deps.end(), dep) != after_deps.end()) { + continue; + } + auto instance = memory_store.get_instance(dep); + if (instance == nullptr) { + ERROR("[InstanceRunHandler] ilegal dependency: " << dep); + continue; + } + /* Disable the instance that is not required. */ + if (instance->get_enabled()) { + memory_store.delete_edge(name, instance->get_name()); + in_degree[instance->get_name()]--; + auto msg = std::make_shared(RunType::DISABLED, instance); + recv_queue_push(std::move(msg)); + } + } + for (auto &after_dep : after_deps) { + if (std::find(deps.begin(), deps.end(), after_dep) != deps.end()) { + continue; + } + auto instance = memory_store.get_instance(after_dep); + if (instance == nullptr) { + ERROR("[InstanceRunHandler] ilegal dependency: " << after_dep); + continue; + } + /* Enable the instance that is required. */ + if (!instance->get_enabled()) { + in_degree[instance->get_name()]++; + memory_store.add_edge(name, instance->get_name()); + auto msg = std::make_shared(RunType::ENABLED, instance); + recv_queue_push(std::move(msg)); + } + } +} + +void InstanceRunHandler::schedule() { while (!schedule_queue.empty()) { auto schedule_instance = schedule_queue.top(); + auto &instance = schedule_instance.instance; if (schedule_instance.time != time) { break; } schedule_queue.pop(); - if (!schedule_instance.instance->get_enabled()) { - break; + if (!instance->get_enabled()) { + continue; } - run_instance(schedule_instance.instance); - schedule_instance.time += schedule_instance.instance->get_interface()->get_period(); + std::vector deps = instance->get_deps(); + run_instance(deps, instance->get_interface()->run); + schedule_instance.time += instance->get_interface()->get_period(); schedule_queue.push(schedule_instance); + /* Dynamically change dependencies. */ + std::vector after_deps = instance->get_deps(); + change_instance_state(instance->get_name(), deps, after_deps); } } void start(InstanceRunHandler *instance_run_handler) { - unsigned long long time = 0; INFO("[PluginManager] instance schedule started!"); while(true) { - instance_run_handler->handle_instance(time); - instance_run_handler->schedule(time); + instance_run_handler->handle_instance(); + instance_run_handler->schedule(); usleep(instance_run_handler->get_cycle() * 1000); - time += instance_run_handler->get_cycle(); + instance_run_handler->add_time(instance_run_handler->get_cycle()); } } diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h index f0bf263..980fd98 100644 --- a/src/plugin_mgr/instance_run_handler.h +++ b/src/plugin_mgr/instance_run_handler.h @@ -27,16 +27,30 @@ enum class RunType { class InstanceRunMessage { public: InstanceRunMessage() {} - InstanceRunMessage(RunType type, std::shared_ptr instance) : type(type), instance(instance) {} + InstanceRunMessage(RunType type, std::shared_ptr instance) : type(type), instance(instance), finish(false) { } RunType get_type() { return type; } std::shared_ptr get_instance() { return instance; } + void wait() { + std::unique_lock lock(mutex); + cond.wait(lock, [this]() { + return finish; + }); + } + void notify_one() { + std::unique_lock lock(mutex); + finish = true; + cond.notify_one(); + } private: RunType type; std::shared_ptr instance; + std::mutex mutex; + std::condition_variable cond; + bool finish; }; class ScheduleInstance { @@ -51,35 +65,39 @@ public: /* A handler to schedule instances. */ class InstanceRunHandler { public: - explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), cycle(DEFAULT_CYCLE_SIZE) {} + using InstanceRun = void (*)(const struct Param*); + explicit InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), time(0), cycle(DEFAULT_CYCLE_SIZE) { } void run(); - void schedule(uint64_t time); - void handle_instance(uint64_t time); + void schedule(); + void handle_instance(); void set_cycle(int cycle) { this->cycle = cycle; } int get_cycle() { return cycle; } - void recv_queue_push(InstanceRunMessage &msg) { - this->recv_queue.push(msg); - } - void recv_queue_push(InstanceRunMessage &&msg) { + void recv_queue_push(std::shared_ptr msg) { this->recv_queue.push(msg); } - bool recv_queue_try_pop(InstanceRunMessage &msg) { + bool recv_queue_try_pop(std::shared_ptr &msg) { return this->recv_queue.try_pop(msg); } + void add_time(uint64_t period) { + time += period; + } +private: + void run_instance(std::vector &deps, InstanceRun run); + void change_instance_state(const std::string &name, std::vector &deps, std::vector &after_deps); + void enable_instance(const std::string &name); + void disable_instance(const std::string &name); private: - void run_instance(std::shared_ptr instance); - void delete_instance(std::shared_ptr instance); - void insert_instance(std::shared_ptr instance, uint64_t time); - /* Instance execution queue. */ std::priority_queue schedule_queue; - /*Receives messages from the PluginManager. */ - SafeQueue recv_queue; + /* Receives messages from the PluginManager. */ + SafeQueue> recv_queue; + std::unordered_map in_degree; MemoryStore &memory_store; + uint64_t time; int cycle; static const int DEFAULT_CYCLE_SIZE = 10; }; diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h index 8aa6933..9dad5b8 100644 --- a/src/plugin_mgr/interface.h +++ b/src/plugin_mgr/interface.h @@ -21,11 +21,11 @@ struct DataBuf { struct DataRingBuf { /* instance name */ - const char *instance_name; + const char *instance_name; /* buf write index, initial value is -1 */ int index; /* instance run times */ - uint64_t count; + uint64_t count; struct DataBuf *buf; int buf_len; }; diff --git a/src/plugin_mgr/memory_store.h b/src/plugin_mgr/memory_store.h index 0e862db..a43c461 100644 --- a/src/plugin_mgr/memory_store.h +++ b/src/plugin_mgr/memory_store.h @@ -58,8 +58,14 @@ public: bool have_dep(const std::string &name) { return dep_handler.have_dep(name); } - std::vector get_pre_dependencies(const std::string &name) { - return dep_handler.get_pre_dependencies(name); + const DepHandler& get_dep_handler() const { + return dep_handler; + } + void add_edge(const std::string &from, const std::string &to) { + dep_handler.add_edge(from, to); + } + void delete_edge(const std::string &from, const std::string &to) { + dep_handler.delete_edge(from, to); } private: /* instance are stored in the form of DAG. diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp index 4c5f5fc..9ecaa6d 100644 --- a/src/plugin_mgr/plugin_manager.cpp +++ b/src/plugin_mgr/plugin_manager.cpp @@ -22,7 +22,6 @@ void PluginManager::init(std::shared_ptr config, std::shared_ptrhandler_msg = handler_msg; this->res_msg = res_msg; instance_run_handler.reset(new InstanceRunHandler(memory_store)); - pre_load(); } ErrorCode PluginManager::remove(const std::string &name) { @@ -204,35 +203,14 @@ ErrorCode PluginManager::instance_enabled(const std::string &name) { if (instance->get_enabled()) { return ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED; } - std::vector pre_dependencies = memory_store.get_pre_dependencies(name); - std::vector> new_enabled; - bool enabled = true; - for (int i = pre_dependencies.size() - 1; i >= 0; --i) { - instance = memory_store.get_instance(pre_dependencies[i]); - if (instance->get_enabled()) { - continue; - } - new_enabled.emplace_back(instance); - if (!instance->get_interface()->enable()) { - enabled = false; - WARN("[PluginManager] " << instance->get_name() << " instance enabled failed, because instance init environment failed."); - break; - } - } - if (enabled) { - for (int i = pre_dependencies.size() - 1; i >= 0; --i) { - instance = memory_store.get_instance(pre_dependencies[i]); - if (instance->get_enabled()) { - continue; - } - instance->set_enabled(true); - instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance)); - } - return ErrorCode::OK; + std::shared_ptr msg = std::make_shared(RunType::ENABLED, instance); + /* Send message to InstanceRunHandler. */ + instance_run_handler->recv_queue_push(msg); + /* Wait for InstanceRunHandler to finsh this task. */ + msg->wait(); + if (msg->get_instance()->get_enabled()) { + return ErrorCode::OK; } else { - for (auto ins : new_enabled) { - ins->get_interface()->disable(); - } return ErrorCode::ENABLE_INSTANCE_ENV; } } @@ -248,7 +226,9 @@ ErrorCode PluginManager::instance_disabled(const std::string &name) { if (!instance->get_enabled()) { return ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED; } - instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::DISABLED, instance)); + auto msg = std::make_shared(RunType::DISABLED, instance); + instance_run_handler->recv_queue_push(msg); + msg->wait(); return ErrorCode::OK; } @@ -397,6 +377,7 @@ bool file_exist(const std::string &file_name) { int PluginManager::run() { instance_run_handler->run(); + pre_load(); while (true) { Message msg; Message res; -- 2.33.0