From 3d8160661afa6dfd045dc51577c4bb040c25b591 Mon Sep 17 00:00:00 2001 From: fly_1997 Date: Sat, 25 May 2024 15:48:33 +0800 Subject: [PATCH 1/4] refactor plugin interface --- src/common/default_path.h | 4 +- src/plugin_mgr/config.cpp | 20 +-- src/plugin_mgr/config.h | 8 +- src/plugin_mgr/dep_handler.cpp | 7 +- src/plugin_mgr/dep_handler.h | 6 +- src/plugin_mgr/error_code.cpp | 1 + src/plugin_mgr/error_code.h | 1 + src/plugin_mgr/instance_run_handler.cpp | 181 ++++-------------------- src/plugin_mgr/instance_run_handler.h | 25 ++-- src/plugin_mgr/interface.h | 44 +++--- src/plugin_mgr/message_manager.h | 2 +- src/plugin_mgr/plugin.cpp | 23 ++- src/plugin_mgr/plugin.h | 84 ++--------- src/plugin_mgr/plugin_manager.cpp | 142 +++++++------------ src/plugin_mgr/plugin_manager.h | 12 +- 15 files changed, 166 insertions(+), 394 deletions(-) diff --git a/src/common/default_path.h b/src/common/default_path.h index 4ce68b4..0ef3aae 100644 --- a/src/common/default_path.h +++ b/src/common/default_path.h @@ -14,9 +14,7 @@ #include -const std::string DEFAULT_COLLECTOR_PATH = "/usr/lib64/oeAware-plugin/collector"; -const std::string DEFAULT_SCENARIO_PATH = "/usr/lib64/oeAware-plugin/scenario"; -const std::string DEFAULT_TUNE_PATH = "/usr/lib64/oeAware-plugin/tune"; +const std::string DEFAULT_PLUGIN_PATH = "/usr/lib64/oeAware-plugin"; const std::string DEFAULT_RUN_PATH = "/var/run/oeAware"; const std::string DEFAULT_LOG_PATH = "/var/log/oeAware"; diff --git a/src/plugin_mgr/config.cpp b/src/plugin_mgr/config.cpp index 588eda0..6e0e231 100644 --- a/src/plugin_mgr/config.cpp +++ b/src/plugin_mgr/config.cpp @@ -42,7 +42,7 @@ bool check_plugin_list(YAML::Node plugin_list_item) { return true; } -bool Config::load(const std::string path) { +bool Config::load(const std::string &path) { YAML::Node node; struct stat buffer; if (stat(path.c_str(), &buffer) != 0) { @@ -106,20 +106,6 @@ bool Config::load(const std::string path) { return true; } -std::string get_path(PluginType type) { - switch (type) { - case PluginType::COLLECTOR:{ - return DEFAULT_COLLECTOR_PATH; - } - case PluginType::SCENARIO: { - return DEFAULT_SCENARIO_PATH; - } - case PluginType::TUNE: { - return DEFAULT_TUNE_PATH; - } - default: { - break; - } - } - return ""; +std::string get_path() { + return DEFAULT_PLUGIN_PATH; } \ No newline at end of file diff --git a/src/plugin_mgr/config.h b/src/plugin_mgr/config.h index c2d1f37..3844f71 100644 --- a/src/plugin_mgr/config.h +++ b/src/plugin_mgr/config.h @@ -22,7 +22,7 @@ static int log_levels[] = {0, 10000, 20000, 30000, 40000, 50000, 60000}; class PluginInfo { public: - PluginInfo(std::string name, std::string description, std::string url) : name(name), description(description), url(url) { } + PluginInfo(const std::string &name, const std::string &description, const std::string &url) : name(name), description(description), url(url) { } std::string get_name() const { return this->name; } @@ -52,7 +52,7 @@ namespace std { class EnableItem { public: - EnableItem(std::string name) : name(name), enabled(false) { } + EnableItem(const std::string &name) : name(name), enabled(false) { } void set_enabled(bool enabled) { this->enabled = enabled; } @@ -82,7 +82,7 @@ public: Config() { this->log_level = log_levels[2]; } - bool load(const std::string path); + bool load(const std::string &path); int get_log_level() const { return this->log_level; } @@ -122,7 +122,7 @@ private: std::vector enable_list; }; -std::string get_path(PluginType type); +std::string get_path(); bool create_dir(const std::string &path); #endif \ No newline at end of file diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp index 816056d..c6d0985 100644 --- a/src/plugin_mgr/dep_handler.cpp +++ b/src/plugin_mgr/dep_handler.cpp @@ -39,7 +39,7 @@ void DepHandler::add_arc_node(std::shared_ptr node, const std::vector dep_nodes) { +void DepHandler::add_node(const std::string &name, const std::vector &dep_nodes) { std::shared_ptr cur_node = add_new_node(name); this->nodes[name] = cur_node; add_arc_node(cur_node, dep_nodes); @@ -52,11 +52,10 @@ void DepHandler::del_node(const std::string &name) { } -std::shared_ptr DepHandler::get_node(std::string name) { +std::shared_ptr DepHandler::get_node(const std::string &name) { return this->nodes[name]; } - std::shared_ptr DepHandler::add_new_node(std::string name) { std::shared_ptr cur_node = std::make_shared(name); cur_node->head = std::make_shared(); @@ -80,6 +79,7 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr node) { arc = tmp; } } + void DepHandler::change_arc_nodes(std::string name, bool state) { if (!nodes[name]->state || !arc_nodes.count(name)) return; std::unordered_map, bool> &mp = arc_nodes[name]; @@ -101,7 +101,6 @@ void DepHandler::change_arc_nodes(std::string name, bool state) { } } - void DepHandler::query_all_top(std::vector> &query) { for (auto &p : nodes) { query_node_top(p.first, query); diff --git a/src/plugin_mgr/dep_handler.h b/src/plugin_mgr/dep_handler.h index b81d574..40b8748 100644 --- a/src/plugin_mgr/dep_handler.h +++ b/src/plugin_mgr/dep_handler.h @@ -34,7 +34,7 @@ struct Node { int real_cnt; bool state; // dependency closed-loop Node() : next(nullptr), head(nullptr), cnt(0), real_cnt(0), state(true) {} - Node(std::string name): next(nullptr), head(nullptr), name(name), cnt(0), real_cnt(0), state(true) {} + Node(const std::string &name): next(nullptr), head(nullptr), name(name), cnt(0), real_cnt(0), state(true) {} }; class DepHandler { @@ -43,11 +43,11 @@ public: this->head = std::make_shared(); this->tail = head; } - std::shared_ptr get_node(std::string name); + std::shared_ptr get_node(const std::string &name); bool get_node_state(std::string name) { return this->nodes[name]->state; } - void add_node(const std::string &name, std::vector dep_nodes = {}); + void add_node(const std::string &name, const std::vector &dep_nodes = {}); void del_node(const std::string &name); std::vector get_pre_dependencies(const std::string &name); // query instance dependency diff --git a/src/plugin_mgr/error_code.cpp b/src/plugin_mgr/error_code.cpp index 09f0eb9..8c7af00 100644 --- a/src/plugin_mgr/error_code.cpp +++ b/src/plugin_mgr/error_code.cpp @@ -15,6 +15,7 @@ const std::unordered_map ErrorText::error_codes = { {ErrorCode::ENABLE_INSTANCE_NOT_LOAD, "instance is not loaded"}, {ErrorCode::ENABLE_INSTANCE_UNAVAILABLE, "instance is unavailable"}, {ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED, "instance is already enabled"}, + {ErrorCode::ENABLE_INSTANCE_ENV, "instance init environment failed"}, {ErrorCode::DISABLE_INSTANCE_NOT_LOAD, "instance is not loaded"}, {ErrorCode::DISABLE_INSTANCE_UNAVAILABLE, "instance is unavailable"}, {ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED, "instance is already disabled"}, diff --git a/src/plugin_mgr/error_code.h b/src/plugin_mgr/error_code.h index dd028f3..cdbf542 100644 --- a/src/plugin_mgr/error_code.h +++ b/src/plugin_mgr/error_code.h @@ -18,6 +18,7 @@ enum class ErrorCode { ENABLE_INSTANCE_NOT_LOAD, ENABLE_INSTANCE_UNAVAILABLE, ENABLE_INSTANCE_ALREADY_ENABLED, + ENABLE_INSTANCE_ENV, DISABLE_INSTANCE_NOT_LOAD, DISABLE_INSTANCE_UNAVAILABLE, DISABLE_INSTANCE_ALREADY_DISABLED, diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp index 8ba10db..9eed762 100644 --- a/src/plugin_mgr/instance_run_handler.cpp +++ b/src/plugin_mgr/instance_run_handler.cpp @@ -13,104 +13,45 @@ #include #include -static void* get_ring_buf(std::shared_ptr instance) { +static const void* get_ring_buf(std::shared_ptr instance) { if (instance == nullptr) { return nullptr; } - switch (instance->get_type()) { - case PluginType::COLLECTOR: { - return (std::dynamic_pointer_cast(instance))->get_interface()->get_ring_buf(); - } - case PluginType::SCENARIO: { - return (std::dynamic_pointer_cast(instance))->get_interface()->get_ring_buf(); - } - case PluginType::TUNE: { - break; - } - default: { - break; - } - } - return nullptr; -} - -static void reflash_ring_buf(std::shared_ptr instance) { - (std::dynamic_pointer_cast(instance))->get_interface()->reflash_ring_buf(); -} - -void InstanceRunHandler::run_aware(std::shared_ptr instance, std::vector &deps) { - void *a[MAX_DEPENDENCIES_SIZE]; - for (size_t i = 0; i < deps.size(); ++i) { - std::shared_ptr ins = memory_store.get_instance(deps[i]); - a[i] = get_ring_buf(ins); - } - (std::dynamic_pointer_cast(instance))->get_interface()->aware(a, (int)deps.size()); + return instance->get_interface()->get_ring_buf(); } -void InstanceRunHandler::run_tune(std::shared_ptr instance, std::vector &deps) { - void *a[MAX_DEPENDENCIES_SIZE]; +void InstanceRunHandler::run_instance(std::shared_ptr instance) { + std::vector input_data; + std::vector deps = instance->get_deps(); for (size_t i = 0; i < deps.size(); ++i) { std::shared_ptr ins = memory_store.get_instance(deps[i]); - a[i] = get_ring_buf(ins); + input_data.emplace_back(get_ring_buf(ins)); } - (std::dynamic_pointer_cast(instance))->get_interface()->tune(a, (int)deps.size()); + Param param; + param.args = input_data.data(); + param.len = input_data.size(); + instance->get_interface()->run(¶m); } -void InstanceRunHandler::insert_instance(std::shared_ptr instance) { - switch (instance->get_type()) { - case PluginType::COLLECTOR: { - collector[instance->get_name()] = instance; - (std::dynamic_pointer_cast(instance))->get_interface()->enable(); - break; - } - case PluginType::SCENARIO: { - scenario[instance->get_name()] = instance; - (std::dynamic_pointer_cast(instance))->get_interface()->enable(); - break; - } - case PluginType::TUNE: { - tune[instance->get_name()] = instance; - (std::dynamic_pointer_cast(instance))->get_interface()->enable(); - break; - } - default: { - break; - } - } +void InstanceRunHandler::insert_instance(std::shared_ptr instance, uint64_t time) { + instance->set_enabled(true); + schedule_queue.push(ScheduleInstance{instance, time}); INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue."); } void InstanceRunHandler::delete_instance(std::shared_ptr instance) { - switch (instance->get_type()) { - case PluginType::COLLECTOR: { - collector.erase(instance->get_name()); - (std::dynamic_pointer_cast(instance))->get_interface()->disable(); - break; - } - case PluginType::SCENARIO: { - scenario.erase(instance->get_name()); - (std::dynamic_pointer_cast(instance))->get_interface()->disable(); - break; - } - case PluginType::TUNE: { - tune.erase(instance->get_name()); - (std::dynamic_pointer_cast(instance))->get_interface()->disable(); - break; - } - default: { - break; - } - } + instance->get_interface()->disable(); + instance->set_enabled(false); INFO("[PluginManager] " << instance->get_name() << " instance delete from running queue."); } -void InstanceRunHandler::handle_instance() { +void InstanceRunHandler::handle_instance(uint64_t time) { InstanceRunMessage msg; while(this->recv_queue_try_pop(msg)){ std::shared_ptr instance = msg.get_instance(); switch (msg.get_type()){ case RunType::ENABLED: { - insert_instance(std::move(instance)); + insert_instance(std::move(instance), time); break; } case RunType::DISABLED: { @@ -121,78 +62,19 @@ void InstanceRunHandler::handle_instance() { } } -template -static std::vector get_deps(std::shared_ptr instance) { - std::shared_ptr t_instance = std::dynamic_pointer_cast(instance); - std::string deps = (t_instance)->get_interface()->get_dep(); - std::string dep = ""; - std::vector vec; - for (size_t i = 0; i < deps.length(); ++i) { - if (deps[i] != '-') { - dep += deps[i]; - }else { - vec.emplace_back(dep); - dep = ""; - } - } - vec.emplace_back(dep); - return vec; -} - -void InstanceRunHandler::adjust_collector_queue(const std::vector &deps, const std::vector &m_deps, bool flag) { - for (auto &m_dep : m_deps) { - bool ok = false; - for (auto &dep : deps) { - if (m_dep == dep) { - ok = true; - } +void InstanceRunHandler::schedule(uint64_t time) { + while (!schedule_queue.empty()) { + auto schedule_instance = schedule_queue.top(); + if (schedule_instance.time != time) { + break; } - if (ok) continue; - if (flag) { - if (is_instance_exist(m_dep) && !collector.count(m_dep)) { - this->insert_instance(memory_store.get_instance(m_dep)); - } - } else { - if (is_instance_exist(m_dep) && collector.count(m_dep)) { - this->delete_instance(memory_store.get_instance(m_dep)); - } + schedule_queue.pop(); + if (!schedule_instance.instance->get_enabled()) { + break; } - } -} - -void InstanceRunHandler::check_scenario_dependency(const std::vector &origin_deps, const std::vector &cur_deps) { - adjust_collector_queue(origin_deps, cur_deps, true); - adjust_collector_queue(cur_deps, origin_deps, false); -} - -void InstanceRunHandler::schedule_collector(uint64_t time) { - for (auto &p : collector) { - std::shared_ptr instance = p.second; - int t = (std::dynamic_pointer_cast(instance))->get_interface()->get_cycle(); - if (time % t != 0) return; - reflash_ring_buf(instance); - } -} - -void InstanceRunHandler::schedule_scenario(uint64_t time) { - for (auto &p : scenario) { - std::shared_ptr instance = p.second; - int t = (std::dynamic_pointer_cast(instance))->get_interface()->get_cycle(); - if (time % t != 0) return; - std::vector origin_deps = get_deps(instance); - run_aware(instance, origin_deps); - std::vector cur_deps = get_deps(instance); - check_scenario_dependency(origin_deps, cur_deps); - } -} - -void InstanceRunHandler::schedule_tune(uint64_t time) { - for (auto &p : tune) { - std::shared_ptr instance = p.second; - int t = (std::dynamic_pointer_cast(instance))->get_interface()->get_cycle(); - if (time % t != 0) return; - std::vector deps = get_deps(instance); - run_tune(instance, deps); + run_instance(schedule_instance.instance); + schedule_instance.time += schedule_instance.instance->get_interface()->get_cycle(); + schedule_queue.push(schedule_instance); } } @@ -200,11 +82,8 @@ void start(InstanceRunHandler *instance_run_handler) { unsigned long long time = 0; INFO("[PluginManager] instance schedule started!"); while(true) { - instance_run_handler->handle_instance(); - instance_run_handler->schedule_collector(time); - instance_run_handler->schedule_scenario(time); - instance_run_handler->schedule_tune(time); - + instance_run_handler->handle_instance(time); + instance_run_handler->schedule(time); usleep(instance_run_handler->get_cycle() * 1000); time += instance_run_handler->get_cycle(); } diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h index 83f9f4a..4172e99 100644 --- a/src/plugin_mgr/instance_run_handler.h +++ b/src/plugin_mgr/instance_run_handler.h @@ -20,6 +20,7 @@ #include #include #include +#include #include enum class RunType { @@ -43,15 +44,22 @@ private: std::shared_ptr instance; }; +class ScheduleInstance { +public: + bool operator < (const ScheduleInstance &rhs) const { + return time > rhs.time || (time == rhs.time && instance->get_type() > rhs.instance->get_type()); + } + std::shared_ptr instance; + uint64_t time; +}; + // A handler to schedule plugin instance class InstanceRunHandler { public: InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), cycle(DEFAULT_CYCLE_SIZE) {} void run(); - void schedule_collector(uint64_t time); - void schedule_scenario(uint64_t time); - void schedule_tune(uint64_t time); - void handle_instance(); + void schedule(uint64_t time); + void handle_instance(uint64_t time); void set_cycle(int cycle) { this->cycle = cycle; } @@ -71,16 +79,13 @@ public: return this->recv_queue.try_pop(msg); } private: - void run_aware(std::shared_ptr instance, std::vector &deps); - void run_tune(std::shared_ptr instance, std::vector &deps); + void run_instance(std::shared_ptr instance); void delete_instance(std::shared_ptr instance); - void insert_instance(std::shared_ptr instance); + void insert_instance(std::shared_ptr instance, uint64_t time); void adjust_collector_queue(const std::vector &deps, const std::vector &m_deps, bool flag); void check_scenario_dependency(const std::vector &deps, const std::vector &m_deps); - std::unordered_map> collector; - std::unordered_map> scenario; - std::unordered_map> tune; + std::priority_queue schedule_queue; SafeQueue recv_queue; MemoryStore &memory_store; int cycle; diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h index 075378a..2106698 100644 --- a/src/plugin_mgr/interface.h +++ b/src/plugin_mgr/interface.h @@ -12,40 +12,28 @@ #ifndef PLUGIN_MGR_INTERFACE_H #define PLUGIN_MGR_INTERFACE_H -struct CollectorInterface { - char* (*get_version)(); - char* (*get_name)(); - char* (*get_description)(); - char* (*get_type)(); - int (*get_cycle)(); - char* (*get_dep)(); - void (*enable)(); - void (*disable)(); - void* (*get_ring_buf)(); - void (*reflash_ring_buf)(); +enum PluginType { + COLLECTOR, + SCENARIO, + TUNE, }; -struct ScenarioInterface { - char* (*get_version)(); - char* (*get_name)(); - char* (*get_description)(); - char* (*get_dep)(); - int (*get_cycle)(); - void (*enable)(); - void (*disable)(); - void (*aware)(void*[], int); - void* (*get_ring_buf)(); +struct Param { + void *args; + int len; }; -struct TuneInterface { - char* (*get_version)(); - char* (*get_name)(); - char* (*get_description)(); - char* (*get_dep)(); +struct Interface { + const char* (*get_version)(); + const char* (*get_name)(); + const char* (*get_description)(); + const char* (*get_dep)(); + PluginType (*get_type)(); int (*get_cycle)(); - void (*enable)(); + bool (*enable)(); void (*disable)(); - void (*tune)(void*[], int); + const void* (*get_ring_buf)(); + void (*run)(const Param*); }; #endif // !PLUGIN_MGR_INTERFACE_H diff --git a/src/plugin_mgr/message_manager.h b/src/plugin_mgr/message_manager.h index 416be48..00ea4c7 100644 --- a/src/plugin_mgr/message_manager.h +++ b/src/plugin_mgr/message_manager.h @@ -31,7 +31,7 @@ public: Message() : type(MessageType::EXTERNAL) {} Message(Opt opt) : opt(opt) {} Message(Opt opt, MessageType type) : opt(opt), type(type) {} - Message(Opt opt, std::vector payload) : opt(opt), payload(payload) {} + Message(Opt opt, const std::vector &payload) : opt(opt), payload(payload) {} Opt get_opt() { return this->opt; } diff --git a/src/plugin_mgr/plugin.cpp b/src/plugin_mgr/plugin.cpp index 1a2db0d..bdd8226 100644 --- a/src/plugin_mgr/plugin.cpp +++ b/src/plugin_mgr/plugin.cpp @@ -16,7 +16,7 @@ const std::string Instance::PLUGIN_DISABLED = "close"; const std::string Instance::PLUGIN_STATE_ON = "available"; const std::string Instance::PLUGIN_STATE_OFF = "unavailable"; -int Plugin::load(const std::string dl_path) { +int Plugin::load(const std::string &dl_path) { void *handler = dlopen(dl_path.c_str(), RTLD_LAZY); if (handler == nullptr) { return -1; @@ -30,3 +30,24 @@ std::string Instance::get_info() const { std::string run_text = this->enabled ? PLUGIN_ENABLED : PLUGIN_DISABLED; return name + "(" + state_text + ", " + run_text + ")"; } + +std::vector Instance::get_deps() { + std::vector vec; + if (get_interface()->get_dep == nullptr) { + return vec; + } + std::string deps = get_interface()->get_dep(); + std::string dep = ""; + for (size_t i = 0; i < deps.length(); ++i) { + if (deps[i] != '-') { + dep += deps[i]; + } else { + vec.emplace_back(dep); + dep = ""; + } + } + if (!dep.empty()) { + vec.emplace_back(dep); + } + return vec; +} diff --git a/src/plugin_mgr/plugin.h b/src/plugin_mgr/plugin.h index d59ab8a..ac0e0b5 100644 --- a/src/plugin_mgr/plugin.h +++ b/src/plugin_mgr/plugin.h @@ -17,31 +17,21 @@ #include #include -enum class PluginType { - COLLECTOR, - SCENARIO, - TUNE, - EMPTY, -}; - -std::string plugin_type_to_string(PluginType type); - class Instance { public: - virtual ~Instance() = default; - void set_name(std::string name) { + void set_name(const std::string &name) { this->name = name; } std::string get_name() const { return this->name; } - void set_type(PluginType type) { - this->type = type; - } PluginType get_type() const { - return type; + return interface->get_type(); + } + Interface* get_interface() const { + return this->interface; } - void set_plugin_name(std::string name) { + void set_plugin_name(const std::string &name) { this->plugin_name = name; } std::string get_plugin_name() const { @@ -59,82 +49,35 @@ public: bool get_enabled() const { return this->enabled; } + void set_interface(Interface *interface) { + this->interface = interface; + } std::string get_info() const; + std::vector get_deps(); private: std::string name; std::string plugin_name; - PluginType type; bool state; bool enabled; + Interface *interface; const static std::string PLUGIN_ENABLED; const static std::string PLUGIN_DISABLED; const static std::string PLUGIN_STATE_ON; const static std::string PLUGIN_STATE_OFF; }; -class CollectorInstance : public Instance { -public: - CollectorInstance() : interface(nullptr) { } - ~CollectorInstance() { - interface = nullptr; - } - void set_interface(CollectorInterface *interface) { - this->interface = interface; - } - CollectorInterface* get_interface() const { - return this->interface; - } -private: - CollectorInterface* interface; -}; - -class ScenarioInstance : public Instance { -public: - ScenarioInstance() : interface(nullptr) { } - ~ScenarioInstance() { - interface = nullptr; - } - void set_interface(ScenarioInterface *interface) { - this->interface = interface; - } - ScenarioInterface* get_interface() const { - return this->interface; - } -private: - ScenarioInterface* interface; -}; - -class TuneInstance : public Instance { -public: - TuneInstance() : interface(nullptr) { } - ~TuneInstance() { - interface = nullptr; - } - void set_interface(TuneInterface *interface) { - this->interface = interface; - } - TuneInterface* get_interface() const { - return this->interface; - } -private: - TuneInterface *interface; -}; - class Plugin { public: - Plugin(std::string name, PluginType type) : name(name), type(type), handler(nullptr) { } + Plugin(const std::string &name) : name(name), handler(nullptr) { } ~Plugin() { if (handler != nullptr) { dlclose(handler); } } - int load(const std::string dl_path); + int load(const std::string &dl_path); std::string get_name() const { return this->name; } - PluginType get_type() const { - return this->type; - } void add_instance(std::shared_ptr ins) { instances.emplace_back(ins); } @@ -150,7 +93,6 @@ public: private: std::vector> instances; std::string name; - PluginType type; void *handler; }; diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp index 5324512..1503fc1 100644 --- a/src/plugin_mgr/plugin_manager.cpp +++ b/src/plugin_mgr/plugin_manager.cpp @@ -78,9 +78,8 @@ ErrorCode PluginManager::query_plugin(const std::string &name, std::string &res) return ErrorCode::OK; } -template -int PluginManager::load_dl_instance(std::shared_ptr plugin, T **interface_list) { - int (*get_instance)(T**) = (int(*)(T**))dlsym(plugin->get_handler(), "get_instance"); +int PluginManager::load_dl_instance(std::shared_ptr plugin, Interface **interface_list) { + int (*get_instance)(Interface**) = (int(*)(Interface**))dlsym(plugin->get_handler(), "get_instance"); if (get_instance == nullptr) { ERROR("[PluginManager] dlsym error!\n"); return -1; @@ -90,78 +89,33 @@ int PluginManager::load_dl_instance(std::shared_ptr plugin, T **interfac return len; } -template -std::vector get_dep(T *interface) { - char *deps = interface->get_dep(); - std::vector res; - std::string dep; - for (int i = 0; deps[i] != 0; ++i) { - if (deps[i] != '-') { - dep += deps[i]; - } else { - res.emplace_back(dep); - dep = ""; - } - } - if (!dep.empty()) res.emplace_back(dep); - return res; -} - -template -void PluginManager::save_instance(std::shared_ptr plugin, T *interface_list, int len) { +void PluginManager::save_instance(std::shared_ptr plugin, Interface *interface_list, int len) { if (interface_list == nullptr) return; for (int i = 0; i < len; ++i) { - T *interface = interface_list + i; - std::shared_ptr instance = std::make_shared(); + Interface *interface = interface_list + i; + std::shared_ptr instance = std::make_shared(); std::string name = interface->get_name(); + instance->set_interface(interface); instance->set_name(name); instance->set_plugin_name(plugin->get_name()); - instance->set_type(plugin->get_type()); instance->set_enabled(false); - if (plugin->get_type() == PluginType::COLLECTOR) { - DEBUG("[PluginManager] add node"); - dep_handler.add_node(name); - } else { - dep_handler.add_node(name, get_dep(interface)); - } + dep_handler.add_node(name, instance->get_deps()); instance->set_state(dep_handler.get_node_state(name)); - (std::dynamic_pointer_cast(instance))->set_interface(interface); DEBUG("[PluginManager] Instance: " << name.c_str()); memory_store.add_instance(name, instance); - plugin->add_instance(instance); + plugin->add_instance(instance); } } bool PluginManager::load_instance(std::shared_ptr plugin) { int len = 0; DEBUG("plugin: " << plugin->get_name()); - switch (plugin->get_type()) { - case PluginType::COLLECTOR: { - CollectorInterface *interface_list = nullptr; - len = load_dl_instance(plugin, &interface_list); - if (len == -1) return false; - DEBUG("[PluginManager] save collect instance"); - save_instance(plugin, interface_list, len); - break; - } - case PluginType::SCENARIO: { - ScenarioInterface *interface_list = nullptr; - len = load_dl_instance(plugin, &interface_list); - if (len == -1) return false; - save_instance(plugin, interface_list, len); - break; - } - case PluginType::TUNE: { - TuneInterface *interface_list = nullptr; - len = load_dl_instance(plugin, &interface_list); - if (len == -1) return false; - save_instance(plugin, interface_list, len); - break; - } - default: { - return false; - } + Interface *interface_list = nullptr; + len = load_dl_instance(plugin, &interface_list); + if (len < 0) { + return false; } + save_instance(plugin, interface_list, len); update_instance_state(); return true; } @@ -177,8 +131,8 @@ void PluginManager::update_instance_state() { } } -ErrorCode PluginManager::load_plugin(const std::string &name, PluginType type) { - std::string plugin_path = get_path(type) + "/" + name; +ErrorCode PluginManager::load_plugin(const std::string &name) { + std::string plugin_path = get_path() + "/" + name; if (!file_exist(plugin_path)) { return ErrorCode::LOAD_PLUGIN_FILE_NOT_EXIST; } @@ -191,9 +145,8 @@ ErrorCode PluginManager::load_plugin(const std::string &name, PluginType type) { if (memory_store.is_plugin_exist(name)) { return ErrorCode::LOAD_PLUGIN_EXIST; } - const std::string dl_path = get_path(type) + '/' + name; - std::shared_ptr plugin = std::make_shared(name, type); - int error = plugin->load(dl_path); + std::shared_ptr plugin = std::make_shared(name); + int error = plugin->load(plugin_path); if (error) { return ErrorCode::LOAD_PLUGIN_DLOPEN_FAILED; } @@ -268,16 +221,35 @@ ErrorCode PluginManager::instance_enabled(std::string name) { return ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED; } std::vector pre_dependencies = dep_handler.get_pre_dependencies(name); + std::vector> new_enabled; + bool enabled = true; for (int i = pre_dependencies.size() - 1; i >= 0; --i) { instance = memory_store.get_instance(pre_dependencies[i]); if (instance->get_enabled()) { continue; } - instance->set_enabled(true); - instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance)); - DEBUG("[PluginManager] " << instance->get_name() << " instance enabled."); + new_enabled.emplace_back(instance); + if (!instance->get_interface()->enable()) { + enabled = false; + WARN("[PluginManager] " << instance->get_name() << " instance enabled failed, because instance init environment failed."); + break; + } + } + if (enabled) { + for (int i = pre_dependencies.size() - 1; i >= 0; --i) { + instance = memory_store.get_instance(pre_dependencies[i]); + if (instance->get_enabled()) { + continue; + } + instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance)); + } + return ErrorCode::OK; + } else { + for (auto instance : new_enabled) { + instance->get_interface()->disable(); + } + return ErrorCode::ENABLE_INSTANCE_ENV; } - return ErrorCode::OK; } ErrorCode PluginManager::instance_disabled(std::string name) { @@ -291,7 +263,6 @@ ErrorCode PluginManager::instance_disabled(std::string name) { if (!instance->get_enabled()) { return ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED; } - instance->set_enabled(false); instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::DISABLED, instance)); return ErrorCode::OK; } @@ -331,9 +302,7 @@ ErrorCode PluginManager::add_list(std::string &res) { res += p.first + "\n"; } res += "Installed Plugins:\n"; - res += get_plugin_in_dir(DEFAULT_COLLECTOR_PATH); - res += get_plugin_in_dir(DEFAULT_SCENARIO_PATH); - res += get_plugin_in_dir(DEFAULT_TUNE_PATH); + res += get_plugin_in_dir(DEFAULT_PLUGIN_PATH); return ErrorCode::OK; } @@ -371,15 +340,15 @@ void PluginManager::pre_enable() { } } -void PluginManager::pre_load_plugin(PluginType type) { - std::string path = get_path(type); +void PluginManager::pre_load_plugin() { + std::string path = get_path(); DIR *dir = opendir(path.c_str()); if (dir == nullptr) return; struct dirent *entry; while ((entry = readdir(dir)) != nullptr) { std::string name = entry->d_name; if (end_with(name, ".so")) { - auto ret = load_plugin(name, type); + auto ret = load_plugin(name); if (ret != ErrorCode::OK) { WARN("[PluginManager] " << name << " plugin preload failed, because " << ErrorText::get_error_text(ret) << "."); } else { @@ -391,27 +360,13 @@ void PluginManager::pre_load_plugin(PluginType type) { } void PluginManager::pre_load() { - pre_load_plugin(PluginType::COLLECTOR); - pre_load_plugin(PluginType::SCENARIO); - pre_load_plugin(PluginType::TUNE); + pre_load_plugin(); pre_enable(); } -void* PluginManager::get_data_buffer(std::string name) { +const void* PluginManager::get_data_buffer(std::string name) { std::shared_ptr instance = memory_store.get_instance(name); - switch (instance->get_type()) { - case PluginType::COLLECTOR: { - CollectorInterface *collector_interface = (std::dynamic_pointer_cast(instance))->get_interface(); - return collector_interface->get_ring_buf(); - } - case PluginType::SCENARIO: { - ScenarioInterface *scenario_interface = (std::dynamic_pointer_cast(instance))->get_interface(); - return scenario_interface->get_ring_buf(); - } - default: - return nullptr; - } - return nullptr; + return instance->get_interface()->get_ring_buf(); } std::string PluginManager::instance_dep_check(const std::string &name) { @@ -465,8 +420,7 @@ int PluginManager::run() { switch (msg.get_opt()) { case Opt::LOAD: { std::string plugin_name = msg.get_payload(0); - PluginType type = plugin_types[msg.get_payload(1)]; - ErrorCode ret_code = load_plugin(plugin_name, type); + ErrorCode ret_code = load_plugin(plugin_name); if(ret_code == ErrorCode::OK) { INFO("[PluginManager] " << plugin_name << " plugin loaded."); res.set_opt(Opt::RESPONSE_OK); diff --git a/src/plugin_mgr/plugin_manager.h b/src/plugin_mgr/plugin_manager.h index 709d42e..9339f0d 100644 --- a/src/plugin_mgr/plugin_manager.h +++ b/src/plugin_mgr/plugin_manager.h @@ -33,10 +33,10 @@ public: void pre_load(); void pre_enable(); void init(); - void* get_data_buffer(std::string name); + const void* get_data_buffer(std::string name); private: - void pre_load_plugin(PluginType type); - ErrorCode load_plugin(const std::string &path, PluginType type); + void pre_load_plugin(); + ErrorCode load_plugin(const std::string &path); ErrorCode remove(const std::string &name); ErrorCode query_all_plugins(std::string &res); ErrorCode query_plugin(const std::string &name, std::string &res); @@ -47,10 +47,8 @@ private: ErrorCode add_list(std::string &res); ErrorCode download(const std::string &name, std::string &res); std::string instance_dep_check(const std::string &name); - template - int load_dl_instance(std::shared_ptr plugin, T **interface_list); - template - void save_instance(std::shared_ptr plugin, T *interface_list, int len); + int load_dl_instance(std::shared_ptr plugin, Interface **interface_list); + void save_instance(std::shared_ptr plugin, Interface *interface_list, int len); bool load_instance(std::shared_ptr plugin); void batch_load(); void batch_remove(); -- 2.33.0