oeAware-manager/0002-refactor-plugin-interface.patch
fly_1997 d080eabfc6 refactor instance interfaces and add signal function
(cherry picked from commit 0509a1c1085175fc5f7061da747eed724d59563e)
2024-06-05 11:23:19 +08:00

1033 lines
38 KiB
Diff

From 3d8160661afa6dfd045dc51577c4bb040c25b591 Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Sat, 25 May 2024 15:48:33 +0800
Subject: [PATCH 1/4] refactor plugin interface
---
src/common/default_path.h | 4 +-
src/plugin_mgr/config.cpp | 20 +--
src/plugin_mgr/config.h | 8 +-
src/plugin_mgr/dep_handler.cpp | 7 +-
src/plugin_mgr/dep_handler.h | 6 +-
src/plugin_mgr/error_code.cpp | 1 +
src/plugin_mgr/error_code.h | 1 +
src/plugin_mgr/instance_run_handler.cpp | 181 ++++--------------------
src/plugin_mgr/instance_run_handler.h | 25 ++--
src/plugin_mgr/interface.h | 44 +++---
src/plugin_mgr/message_manager.h | 2 +-
src/plugin_mgr/plugin.cpp | 23 ++-
src/plugin_mgr/plugin.h | 84 ++---------
src/plugin_mgr/plugin_manager.cpp | 142 +++++++------------
src/plugin_mgr/plugin_manager.h | 12 +-
15 files changed, 166 insertions(+), 394 deletions(-)
diff --git a/src/common/default_path.h b/src/common/default_path.h
index 4ce68b4..0ef3aae 100644
--- a/src/common/default_path.h
+++ b/src/common/default_path.h
@@ -14,9 +14,7 @@
#include<string>
-const std::string DEFAULT_COLLECTOR_PATH = "/usr/lib64/oeAware-plugin/collector";
-const std::string DEFAULT_SCENARIO_PATH = "/usr/lib64/oeAware-plugin/scenario";
-const std::string DEFAULT_TUNE_PATH = "/usr/lib64/oeAware-plugin/tune";
+const std::string DEFAULT_PLUGIN_PATH = "/usr/lib64/oeAware-plugin";
const std::string DEFAULT_RUN_PATH = "/var/run/oeAware";
const std::string DEFAULT_LOG_PATH = "/var/log/oeAware";
diff --git a/src/plugin_mgr/config.cpp b/src/plugin_mgr/config.cpp
index 588eda0..6e0e231 100644
--- a/src/plugin_mgr/config.cpp
+++ b/src/plugin_mgr/config.cpp
@@ -42,7 +42,7 @@ bool check_plugin_list(YAML::Node plugin_list_item) {
return true;
}
-bool Config::load(const std::string path) {
+bool Config::load(const std::string &path) {
YAML::Node node;
struct stat buffer;
if (stat(path.c_str(), &buffer) != 0) {
@@ -106,20 +106,6 @@ bool Config::load(const std::string path) {
return true;
}
-std::string get_path(PluginType type) {
- switch (type) {
- case PluginType::COLLECTOR:{
- return DEFAULT_COLLECTOR_PATH;
- }
- case PluginType::SCENARIO: {
- return DEFAULT_SCENARIO_PATH;
- }
- case PluginType::TUNE: {
- return DEFAULT_TUNE_PATH;
- }
- default: {
- break;
- }
- }
- return "";
+std::string get_path() {
+ return DEFAULT_PLUGIN_PATH;
}
\ No newline at end of file
diff --git a/src/plugin_mgr/config.h b/src/plugin_mgr/config.h
index c2d1f37..3844f71 100644
--- a/src/plugin_mgr/config.h
+++ b/src/plugin_mgr/config.h
@@ -22,7 +22,7 @@ static int log_levels[] = {0, 10000, 20000, 30000, 40000, 50000, 60000};
class PluginInfo {
public:
- PluginInfo(std::string name, std::string description, std::string url) : name(name), description(description), url(url) { }
+ PluginInfo(const std::string &name, const std::string &description, const std::string &url) : name(name), description(description), url(url) { }
std::string get_name() const {
return this->name;
}
@@ -52,7 +52,7 @@ namespace std {
class EnableItem {
public:
- EnableItem(std::string name) : name(name), enabled(false) { }
+ EnableItem(const std::string &name) : name(name), enabled(false) { }
void set_enabled(bool enabled) {
this->enabled = enabled;
}
@@ -82,7 +82,7 @@ public:
Config() {
this->log_level = log_levels[2];
}
- bool load(const std::string path);
+ bool load(const std::string &path);
int get_log_level() const {
return this->log_level;
}
@@ -122,7 +122,7 @@ private:
std::vector<EnableItem> enable_list;
};
-std::string get_path(PluginType type);
+std::string get_path();
bool create_dir(const std::string &path);
#endif
\ No newline at end of file
diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp
index 816056d..c6d0985 100644
--- a/src/plugin_mgr/dep_handler.cpp
+++ b/src/plugin_mgr/dep_handler.cpp
@@ -39,7 +39,7 @@ void DepHandler::add_arc_node(std::shared_ptr<Node> node, const std::vector<std:
}
-void DepHandler::add_node(const std::string &name, std::vector<std::string> dep_nodes) {
+void DepHandler::add_node(const std::string &name, const std::vector<std::string> &dep_nodes) {
std::shared_ptr<Node> cur_node = add_new_node(name);
this->nodes[name] = cur_node;
add_arc_node(cur_node, dep_nodes);
@@ -52,11 +52,10 @@ void DepHandler::del_node(const std::string &name) {
}
-std::shared_ptr<Node> DepHandler::get_node(std::string name) {
+std::shared_ptr<Node> DepHandler::get_node(const std::string &name) {
return this->nodes[name];
}
-
std::shared_ptr<Node> DepHandler::add_new_node(std::string name) {
std::shared_ptr<Node> cur_node = std::make_shared<Node>(name);
cur_node->head = std::make_shared<ArcNode>();
@@ -80,6 +79,7 @@ void DepHandler::del_node_and_arc_nodes(std::shared_ptr<Node> node) {
arc = tmp;
}
}
+
void DepHandler::change_arc_nodes(std::string name, bool state) {
if (!nodes[name]->state || !arc_nodes.count(name)) return;
std::unordered_map<std::shared_ptr<ArcNode>, bool> &mp = arc_nodes[name];
@@ -101,7 +101,6 @@ void DepHandler::change_arc_nodes(std::string name, bool state) {
}
}
-
void DepHandler::query_all_top(std::vector<std::vector<std::string>> &query) {
for (auto &p : nodes) {
query_node_top(p.first, query);
diff --git a/src/plugin_mgr/dep_handler.h b/src/plugin_mgr/dep_handler.h
index b81d574..40b8748 100644
--- a/src/plugin_mgr/dep_handler.h
+++ b/src/plugin_mgr/dep_handler.h
@@ -34,7 +34,7 @@ struct Node {
int real_cnt;
bool state; // dependency closed-loop
Node() : next(nullptr), head(nullptr), cnt(0), real_cnt(0), state(true) {}
- Node(std::string name): next(nullptr), head(nullptr), name(name), cnt(0), real_cnt(0), state(true) {}
+ Node(const std::string &name): next(nullptr), head(nullptr), name(name), cnt(0), real_cnt(0), state(true) {}
};
class DepHandler {
@@ -43,11 +43,11 @@ public:
this->head = std::make_shared<Node>();
this->tail = head;
}
- std::shared_ptr<Node> get_node(std::string name);
+ std::shared_ptr<Node> get_node(const std::string &name);
bool get_node_state(std::string name) {
return this->nodes[name]->state;
}
- void add_node(const std::string &name, std::vector<std::string> dep_nodes = {});
+ void add_node(const std::string &name, const std::vector<std::string> &dep_nodes = {});
void del_node(const std::string &name);
std::vector<std::string> get_pre_dependencies(const std::string &name);
// query instance dependency
diff --git a/src/plugin_mgr/error_code.cpp b/src/plugin_mgr/error_code.cpp
index 09f0eb9..8c7af00 100644
--- a/src/plugin_mgr/error_code.cpp
+++ b/src/plugin_mgr/error_code.cpp
@@ -15,6 +15,7 @@ const std::unordered_map<ErrorCode, std::string> ErrorText::error_codes = {
{ErrorCode::ENABLE_INSTANCE_NOT_LOAD, "instance is not loaded"},
{ErrorCode::ENABLE_INSTANCE_UNAVAILABLE, "instance is unavailable"},
{ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED, "instance is already enabled"},
+ {ErrorCode::ENABLE_INSTANCE_ENV, "instance init environment failed"},
{ErrorCode::DISABLE_INSTANCE_NOT_LOAD, "instance is not loaded"},
{ErrorCode::DISABLE_INSTANCE_UNAVAILABLE, "instance is unavailable"},
{ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED, "instance is already disabled"},
diff --git a/src/plugin_mgr/error_code.h b/src/plugin_mgr/error_code.h
index dd028f3..cdbf542 100644
--- a/src/plugin_mgr/error_code.h
+++ b/src/plugin_mgr/error_code.h
@@ -18,6 +18,7 @@ enum class ErrorCode {
ENABLE_INSTANCE_NOT_LOAD,
ENABLE_INSTANCE_UNAVAILABLE,
ENABLE_INSTANCE_ALREADY_ENABLED,
+ ENABLE_INSTANCE_ENV,
DISABLE_INSTANCE_NOT_LOAD,
DISABLE_INSTANCE_UNAVAILABLE,
DISABLE_INSTANCE_ALREADY_DISABLED,
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index 8ba10db..9eed762 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -13,104 +13,45 @@
#include <thread>
#include <unistd.h>
-static void* get_ring_buf(std::shared_ptr<Instance> instance) {
+static const void* get_ring_buf(std::shared_ptr<Instance> instance) {
if (instance == nullptr) {
return nullptr;
}
- switch (instance->get_type()) {
- case PluginType::COLLECTOR: {
- return (std::dynamic_pointer_cast<CollectorInstance>(instance))->get_interface()->get_ring_buf();
- }
- case PluginType::SCENARIO: {
- return (std::dynamic_pointer_cast<ScenarioInstance>(instance))->get_interface()->get_ring_buf();
- }
- case PluginType::TUNE: {
- break;
- }
- default: {
- break;
- }
- }
- return nullptr;
-}
-
-static void reflash_ring_buf(std::shared_ptr<Instance> instance) {
- (std::dynamic_pointer_cast<CollectorInstance>(instance))->get_interface()->reflash_ring_buf();
-}
-
-void InstanceRunHandler::run_aware(std::shared_ptr<Instance> instance, std::vector<std::string> &deps) {
- void *a[MAX_DEPENDENCIES_SIZE];
- for (size_t i = 0; i < deps.size(); ++i) {
- std::shared_ptr<Instance> ins = memory_store.get_instance(deps[i]);
- a[i] = get_ring_buf(ins);
- }
- (std::dynamic_pointer_cast<ScenarioInstance>(instance))->get_interface()->aware(a, (int)deps.size());
+ return instance->get_interface()->get_ring_buf();
}
-void InstanceRunHandler::run_tune(std::shared_ptr<Instance> instance, std::vector<std::string> &deps) {
- void *a[MAX_DEPENDENCIES_SIZE];
+void InstanceRunHandler::run_instance(std::shared_ptr<Instance> instance) {
+ std::vector<const void*> input_data;
+ std::vector<std::string> deps = instance->get_deps();
for (size_t i = 0; i < deps.size(); ++i) {
std::shared_ptr<Instance> ins = memory_store.get_instance(deps[i]);
- a[i] = get_ring_buf(ins);
+ input_data.emplace_back(get_ring_buf(ins));
}
- (std::dynamic_pointer_cast<TuneInstance>(instance))->get_interface()->tune(a, (int)deps.size());
+ Param param;
+ param.args = input_data.data();
+ param.len = input_data.size();
+ instance->get_interface()->run(&param);
}
-void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance) {
- switch (instance->get_type()) {
- case PluginType::COLLECTOR: {
- collector[instance->get_name()] = instance;
- (std::dynamic_pointer_cast<CollectorInstance>(instance))->get_interface()->enable();
- break;
- }
- case PluginType::SCENARIO: {
- scenario[instance->get_name()] = instance;
- (std::dynamic_pointer_cast<ScenarioInstance>(instance))->get_interface()->enable();
- break;
- }
- case PluginType::TUNE: {
- tune[instance->get_name()] = instance;
- (std::dynamic_pointer_cast<TuneInstance>(instance))->get_interface()->enable();
- break;
- }
- default: {
- break;
- }
- }
+void InstanceRunHandler::insert_instance(std::shared_ptr<Instance> instance, uint64_t time) {
+ instance->set_enabled(true);
+ schedule_queue.push(ScheduleInstance{instance, time});
INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
}
void InstanceRunHandler::delete_instance(std::shared_ptr<Instance> instance) {
- switch (instance->get_type()) {
- case PluginType::COLLECTOR: {
- collector.erase(instance->get_name());
- (std::dynamic_pointer_cast<CollectorInstance>(instance))->get_interface()->disable();
- break;
- }
- case PluginType::SCENARIO: {
- scenario.erase(instance->get_name());
- (std::dynamic_pointer_cast<ScenarioInstance>(instance))->get_interface()->disable();
- break;
- }
- case PluginType::TUNE: {
- tune.erase(instance->get_name());
- (std::dynamic_pointer_cast<TuneInstance>(instance))->get_interface()->disable();
- break;
- }
- default: {
- break;
- }
- }
+ instance->get_interface()->disable();
+ instance->set_enabled(false);
INFO("[PluginManager] " << instance->get_name() << " instance delete from running queue.");
}
-void InstanceRunHandler::handle_instance() {
+void InstanceRunHandler::handle_instance(uint64_t time) {
InstanceRunMessage msg;
while(this->recv_queue_try_pop(msg)){
std::shared_ptr<Instance> instance = msg.get_instance();
switch (msg.get_type()){
case RunType::ENABLED: {
- insert_instance(std::move(instance));
+ insert_instance(std::move(instance), time);
break;
}
case RunType::DISABLED: {
@@ -121,78 +62,19 @@ void InstanceRunHandler::handle_instance() {
}
}
-template<typename T>
-static std::vector<std::string> get_deps(std::shared_ptr<Instance> instance) {
- std::shared_ptr<T> t_instance = std::dynamic_pointer_cast<T>(instance);
- std::string deps = (t_instance)->get_interface()->get_dep();
- std::string dep = "";
- std::vector<std::string> vec;
- for (size_t i = 0; i < deps.length(); ++i) {
- if (deps[i] != '-') {
- dep += deps[i];
- }else {
- vec.emplace_back(dep);
- dep = "";
- }
- }
- vec.emplace_back(dep);
- return vec;
-}
-
-void InstanceRunHandler::adjust_collector_queue(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps, bool flag) {
- for (auto &m_dep : m_deps) {
- bool ok = false;
- for (auto &dep : deps) {
- if (m_dep == dep) {
- ok = true;
- }
+void InstanceRunHandler::schedule(uint64_t time) {
+ while (!schedule_queue.empty()) {
+ auto schedule_instance = schedule_queue.top();
+ if (schedule_instance.time != time) {
+ break;
}
- if (ok) continue;
- if (flag) {
- if (is_instance_exist(m_dep) && !collector.count(m_dep)) {
- this->insert_instance(memory_store.get_instance(m_dep));
- }
- } else {
- if (is_instance_exist(m_dep) && collector.count(m_dep)) {
- this->delete_instance(memory_store.get_instance(m_dep));
- }
+ schedule_queue.pop();
+ if (!schedule_instance.instance->get_enabled()) {
+ break;
}
- }
-}
-
-void InstanceRunHandler::check_scenario_dependency(const std::vector<std::string> &origin_deps, const std::vector<std::string> &cur_deps) {
- adjust_collector_queue(origin_deps, cur_deps, true);
- adjust_collector_queue(cur_deps, origin_deps, false);
-}
-
-void InstanceRunHandler::schedule_collector(uint64_t time) {
- for (auto &p : collector) {
- std::shared_ptr<Instance> instance = p.second;
- int t = (std::dynamic_pointer_cast<CollectorInstance>(instance))->get_interface()->get_cycle();
- if (time % t != 0) return;
- reflash_ring_buf(instance);
- }
-}
-
-void InstanceRunHandler::schedule_scenario(uint64_t time) {
- for (auto &p : scenario) {
- std::shared_ptr<Instance> instance = p.second;
- int t = (std::dynamic_pointer_cast<ScenarioInstance>(instance))->get_interface()->get_cycle();
- if (time % t != 0) return;
- std::vector<std::string> origin_deps = get_deps<ScenarioInstance>(instance);
- run_aware(instance, origin_deps);
- std::vector<std::string> cur_deps = get_deps<ScenarioInstance>(instance);
- check_scenario_dependency(origin_deps, cur_deps);
- }
-}
-
-void InstanceRunHandler::schedule_tune(uint64_t time) {
- for (auto &p : tune) {
- std::shared_ptr<Instance> instance = p.second;
- int t = (std::dynamic_pointer_cast<TuneInstance>(instance))->get_interface()->get_cycle();
- if (time % t != 0) return;
- std::vector<std::string> deps = get_deps<TuneInstance>(instance);
- run_tune(instance, deps);
+ run_instance(schedule_instance.instance);
+ schedule_instance.time += schedule_instance.instance->get_interface()->get_cycle();
+ schedule_queue.push(schedule_instance);
}
}
@@ -200,11 +82,8 @@ void start(InstanceRunHandler *instance_run_handler) {
unsigned long long time = 0;
INFO("[PluginManager] instance schedule started!");
while(true) {
- instance_run_handler->handle_instance();
- instance_run_handler->schedule_collector(time);
- instance_run_handler->schedule_scenario(time);
- instance_run_handler->schedule_tune(time);
-
+ instance_run_handler->handle_instance(time);
+ instance_run_handler->schedule(time);
usleep(instance_run_handler->get_cycle() * 1000);
time += instance_run_handler->get_cycle();
}
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index 83f9f4a..4172e99 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -20,6 +20,7 @@
#include <string>
#include <vector>
#include <unordered_map>
+#include <queue>
#include <memory>
enum class RunType {
@@ -43,15 +44,22 @@ private:
std::shared_ptr<Instance> instance;
};
+class ScheduleInstance {
+public:
+ bool operator < (const ScheduleInstance &rhs) const {
+ return time > rhs.time || (time == rhs.time && instance->get_type() > rhs.instance->get_type());
+ }
+ std::shared_ptr<Instance> instance;
+ uint64_t time;
+};
+
// A handler to schedule plugin instance
class InstanceRunHandler {
public:
InstanceRunHandler(MemoryStore &memory_store) : memory_store(memory_store), cycle(DEFAULT_CYCLE_SIZE) {}
void run();
- void schedule_collector(uint64_t time);
- void schedule_scenario(uint64_t time);
- void schedule_tune(uint64_t time);
- void handle_instance();
+ void schedule(uint64_t time);
+ void handle_instance(uint64_t time);
void set_cycle(int cycle) {
this->cycle = cycle;
}
@@ -71,16 +79,13 @@ public:
return this->recv_queue.try_pop(msg);
}
private:
- void run_aware(std::shared_ptr<Instance> instance, std::vector<std::string> &deps);
- void run_tune(std::shared_ptr<Instance> instance, std::vector<std::string> &deps);
+ void run_instance(std::shared_ptr<Instance> instance);
void delete_instance(std::shared_ptr<Instance> instance);
- void insert_instance(std::shared_ptr<Instance> instance);
+ void insert_instance(std::shared_ptr<Instance> instance, uint64_t time);
void adjust_collector_queue(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps, bool flag);
void check_scenario_dependency(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps);
- std::unordered_map<std::string, std::shared_ptr<Instance>> collector;
- std::unordered_map<std::string, std::shared_ptr<Instance>> scenario;
- std::unordered_map<std::string, std::shared_ptr<Instance>> tune;
+ std::priority_queue<ScheduleInstance> schedule_queue;
SafeQueue<InstanceRunMessage> recv_queue;
MemoryStore &memory_store;
int cycle;
diff --git a/src/plugin_mgr/interface.h b/src/plugin_mgr/interface.h
index 075378a..2106698 100644
--- a/src/plugin_mgr/interface.h
+++ b/src/plugin_mgr/interface.h
@@ -12,40 +12,28 @@
#ifndef PLUGIN_MGR_INTERFACE_H
#define PLUGIN_MGR_INTERFACE_H
-struct CollectorInterface {
- char* (*get_version)();
- char* (*get_name)();
- char* (*get_description)();
- char* (*get_type)();
- int (*get_cycle)();
- char* (*get_dep)();
- void (*enable)();
- void (*disable)();
- void* (*get_ring_buf)();
- void (*reflash_ring_buf)();
+enum PluginType {
+ COLLECTOR,
+ SCENARIO,
+ TUNE,
};
-struct ScenarioInterface {
- char* (*get_version)();
- char* (*get_name)();
- char* (*get_description)();
- char* (*get_dep)();
- int (*get_cycle)();
- void (*enable)();
- void (*disable)();
- void (*aware)(void*[], int);
- void* (*get_ring_buf)();
+struct Param {
+ void *args;
+ int len;
};
-struct TuneInterface {
- char* (*get_version)();
- char* (*get_name)();
- char* (*get_description)();
- char* (*get_dep)();
+struct Interface {
+ const char* (*get_version)();
+ const char* (*get_name)();
+ const char* (*get_description)();
+ const char* (*get_dep)();
+ PluginType (*get_type)();
int (*get_cycle)();
- void (*enable)();
+ bool (*enable)();
void (*disable)();
- void (*tune)(void*[], int);
+ const void* (*get_ring_buf)();
+ void (*run)(const Param*);
};
#endif // !PLUGIN_MGR_INTERFACE_H
diff --git a/src/plugin_mgr/message_manager.h b/src/plugin_mgr/message_manager.h
index 416be48..00ea4c7 100644
--- a/src/plugin_mgr/message_manager.h
+++ b/src/plugin_mgr/message_manager.h
@@ -31,7 +31,7 @@ public:
Message() : type(MessageType::EXTERNAL) {}
Message(Opt opt) : opt(opt) {}
Message(Opt opt, MessageType type) : opt(opt), type(type) {}
- Message(Opt opt, std::vector<std::string> payload) : opt(opt), payload(payload) {}
+ Message(Opt opt, const std::vector<std::string> &payload) : opt(opt), payload(payload) {}
Opt get_opt() {
return this->opt;
}
diff --git a/src/plugin_mgr/plugin.cpp b/src/plugin_mgr/plugin.cpp
index 1a2db0d..bdd8226 100644
--- a/src/plugin_mgr/plugin.cpp
+++ b/src/plugin_mgr/plugin.cpp
@@ -16,7 +16,7 @@ const std::string Instance::PLUGIN_DISABLED = "close";
const std::string Instance::PLUGIN_STATE_ON = "available";
const std::string Instance::PLUGIN_STATE_OFF = "unavailable";
-int Plugin::load(const std::string dl_path) {
+int Plugin::load(const std::string &dl_path) {
void *handler = dlopen(dl_path.c_str(), RTLD_LAZY);
if (handler == nullptr) {
return -1;
@@ -30,3 +30,24 @@ std::string Instance::get_info() const {
std::string run_text = this->enabled ? PLUGIN_ENABLED : PLUGIN_DISABLED;
return name + "(" + state_text + ", " + run_text + ")";
}
+
+std::vector<std::string> Instance::get_deps() {
+ std::vector<std::string> vec;
+ if (get_interface()->get_dep == nullptr) {
+ return vec;
+ }
+ std::string deps = get_interface()->get_dep();
+ std::string dep = "";
+ for (size_t i = 0; i < deps.length(); ++i) {
+ if (deps[i] != '-') {
+ dep += deps[i];
+ } else {
+ vec.emplace_back(dep);
+ dep = "";
+ }
+ }
+ if (!dep.empty()) {
+ vec.emplace_back(dep);
+ }
+ return vec;
+}
diff --git a/src/plugin_mgr/plugin.h b/src/plugin_mgr/plugin.h
index d59ab8a..ac0e0b5 100644
--- a/src/plugin_mgr/plugin.h
+++ b/src/plugin_mgr/plugin.h
@@ -17,31 +17,21 @@
#include <memory>
#include <dlfcn.h>
-enum class PluginType {
- COLLECTOR,
- SCENARIO,
- TUNE,
- EMPTY,
-};
-
-std::string plugin_type_to_string(PluginType type);
-
class Instance {
public:
- virtual ~Instance() = default;
- void set_name(std::string name) {
+ void set_name(const std::string &name) {
this->name = name;
}
std::string get_name() const {
return this->name;
}
- void set_type(PluginType type) {
- this->type = type;
- }
PluginType get_type() const {
- return type;
+ return interface->get_type();
+ }
+ Interface* get_interface() const {
+ return this->interface;
}
- void set_plugin_name(std::string name) {
+ void set_plugin_name(const std::string &name) {
this->plugin_name = name;
}
std::string get_plugin_name() const {
@@ -59,82 +49,35 @@ public:
bool get_enabled() const {
return this->enabled;
}
+ void set_interface(Interface *interface) {
+ this->interface = interface;
+ }
std::string get_info() const;
+ std::vector<std::string> get_deps();
private:
std::string name;
std::string plugin_name;
- PluginType type;
bool state;
bool enabled;
+ Interface *interface;
const static std::string PLUGIN_ENABLED;
const static std::string PLUGIN_DISABLED;
const static std::string PLUGIN_STATE_ON;
const static std::string PLUGIN_STATE_OFF;
};
-class CollectorInstance : public Instance {
-public:
- CollectorInstance() : interface(nullptr) { }
- ~CollectorInstance() {
- interface = nullptr;
- }
- void set_interface(CollectorInterface *interface) {
- this->interface = interface;
- }
- CollectorInterface* get_interface() const {
- return this->interface;
- }
-private:
- CollectorInterface* interface;
-};
-
-class ScenarioInstance : public Instance {
-public:
- ScenarioInstance() : interface(nullptr) { }
- ~ScenarioInstance() {
- interface = nullptr;
- }
- void set_interface(ScenarioInterface *interface) {
- this->interface = interface;
- }
- ScenarioInterface* get_interface() const {
- return this->interface;
- }
-private:
- ScenarioInterface* interface;
-};
-
-class TuneInstance : public Instance {
-public:
- TuneInstance() : interface(nullptr) { }
- ~TuneInstance() {
- interface = nullptr;
- }
- void set_interface(TuneInterface *interface) {
- this->interface = interface;
- }
- TuneInterface* get_interface() const {
- return this->interface;
- }
-private:
- TuneInterface *interface;
-};
-
class Plugin {
public:
- Plugin(std::string name, PluginType type) : name(name), type(type), handler(nullptr) { }
+ Plugin(const std::string &name) : name(name), handler(nullptr) { }
~Plugin() {
if (handler != nullptr) {
dlclose(handler);
}
}
- int load(const std::string dl_path);
+ int load(const std::string &dl_path);
std::string get_name() const {
return this->name;
}
- PluginType get_type() const {
- return this->type;
- }
void add_instance(std::shared_ptr<Instance> ins) {
instances.emplace_back(ins);
}
@@ -150,7 +93,6 @@ public:
private:
std::vector<std::shared_ptr<Instance>> instances;
std::string name;
- PluginType type;
void *handler;
};
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index 5324512..1503fc1 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -78,9 +78,8 @@ ErrorCode PluginManager::query_plugin(const std::string &name, std::string &res)
return ErrorCode::OK;
}
-template <typename T>
-int PluginManager::load_dl_instance(std::shared_ptr<Plugin> plugin, T **interface_list) {
- int (*get_instance)(T**) = (int(*)(T**))dlsym(plugin->get_handler(), "get_instance");
+int PluginManager::load_dl_instance(std::shared_ptr<Plugin> plugin, Interface **interface_list) {
+ int (*get_instance)(Interface**) = (int(*)(Interface**))dlsym(plugin->get_handler(), "get_instance");
if (get_instance == nullptr) {
ERROR("[PluginManager] dlsym error!\n");
return -1;
@@ -90,78 +89,33 @@ int PluginManager::load_dl_instance(std::shared_ptr<Plugin> plugin, T **interfac
return len;
}
-template<typename T>
-std::vector<std::string> get_dep(T *interface) {
- char *deps = interface->get_dep();
- std::vector<std::string> res;
- std::string dep;
- for (int i = 0; deps[i] != 0; ++i) {
- if (deps[i] != '-') {
- dep += deps[i];
- } else {
- res.emplace_back(dep);
- dep = "";
- }
- }
- if (!dep.empty()) res.emplace_back(dep);
- return res;
-}
-
-template<typename T, typename U>
-void PluginManager::save_instance(std::shared_ptr<Plugin> plugin, T *interface_list, int len) {
+void PluginManager::save_instance(std::shared_ptr<Plugin> plugin, Interface *interface_list, int len) {
if (interface_list == nullptr) return;
for (int i = 0; i < len; ++i) {
- T *interface = interface_list + i;
- std::shared_ptr<Instance> instance = std::make_shared<U>();
+ Interface *interface = interface_list + i;
+ std::shared_ptr<Instance> instance = std::make_shared<Instance>();
std::string name = interface->get_name();
+ instance->set_interface(interface);
instance->set_name(name);
instance->set_plugin_name(plugin->get_name());
- instance->set_type(plugin->get_type());
instance->set_enabled(false);
- if (plugin->get_type() == PluginType::COLLECTOR) {
- DEBUG("[PluginManager] add node");
- dep_handler.add_node(name);
- } else {
- dep_handler.add_node(name, get_dep<T>(interface));
- }
+ dep_handler.add_node(name, instance->get_deps());
instance->set_state(dep_handler.get_node_state(name));
- (std::dynamic_pointer_cast<U>(instance))->set_interface(interface);
DEBUG("[PluginManager] Instance: " << name.c_str());
memory_store.add_instance(name, instance);
- plugin->add_instance(instance);
+ plugin->add_instance(instance);
}
}
bool PluginManager::load_instance(std::shared_ptr<Plugin> plugin) {
int len = 0;
DEBUG("plugin: " << plugin->get_name());
- switch (plugin->get_type()) {
- case PluginType::COLLECTOR: {
- CollectorInterface *interface_list = nullptr;
- len = load_dl_instance<CollectorInterface>(plugin, &interface_list);
- if (len == -1) return false;
- DEBUG("[PluginManager] save collect instance");
- save_instance<CollectorInterface, CollectorInstance>(plugin, interface_list, len);
- break;
- }
- case PluginType::SCENARIO: {
- ScenarioInterface *interface_list = nullptr;
- len = load_dl_instance<ScenarioInterface>(plugin, &interface_list);
- if (len == -1) return false;
- save_instance<ScenarioInterface, ScenarioInstance>(plugin, interface_list, len);
- break;
- }
- case PluginType::TUNE: {
- TuneInterface *interface_list = nullptr;
- len = load_dl_instance<TuneInterface>(plugin, &interface_list);
- if (len == -1) return false;
- save_instance<TuneInterface, TuneInstance>(plugin, interface_list, len);
- break;
- }
- default: {
- return false;
- }
+ Interface *interface_list = nullptr;
+ len = load_dl_instance(plugin, &interface_list);
+ if (len < 0) {
+ return false;
}
+ save_instance(plugin, interface_list, len);
update_instance_state();
return true;
}
@@ -177,8 +131,8 @@ void PluginManager::update_instance_state() {
}
}
-ErrorCode PluginManager::load_plugin(const std::string &name, PluginType type) {
- std::string plugin_path = get_path(type) + "/" + name;
+ErrorCode PluginManager::load_plugin(const std::string &name) {
+ std::string plugin_path = get_path() + "/" + name;
if (!file_exist(plugin_path)) {
return ErrorCode::LOAD_PLUGIN_FILE_NOT_EXIST;
}
@@ -191,9 +145,8 @@ ErrorCode PluginManager::load_plugin(const std::string &name, PluginType type) {
if (memory_store.is_plugin_exist(name)) {
return ErrorCode::LOAD_PLUGIN_EXIST;
}
- const std::string dl_path = get_path(type) + '/' + name;
- std::shared_ptr<Plugin> plugin = std::make_shared<Plugin>(name, type);
- int error = plugin->load(dl_path);
+ std::shared_ptr<Plugin> plugin = std::make_shared<Plugin>(name);
+ int error = plugin->load(plugin_path);
if (error) {
return ErrorCode::LOAD_PLUGIN_DLOPEN_FAILED;
}
@@ -268,16 +221,35 @@ ErrorCode PluginManager::instance_enabled(std::string name) {
return ErrorCode::ENABLE_INSTANCE_ALREADY_ENABLED;
}
std::vector<std::string> pre_dependencies = dep_handler.get_pre_dependencies(name);
+ std::vector<std::shared_ptr<Instance>> new_enabled;
+ bool enabled = true;
for (int i = pre_dependencies.size() - 1; i >= 0; --i) {
instance = memory_store.get_instance(pre_dependencies[i]);
if (instance->get_enabled()) {
continue;
}
- instance->set_enabled(true);
- instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance));
- DEBUG("[PluginManager] " << instance->get_name() << " instance enabled.");
+ new_enabled.emplace_back(instance);
+ if (!instance->get_interface()->enable()) {
+ enabled = false;
+ WARN("[PluginManager] " << instance->get_name() << " instance enabled failed, because instance init environment failed.");
+ break;
+ }
+ }
+ if (enabled) {
+ for (int i = pre_dependencies.size() - 1; i >= 0; --i) {
+ instance = memory_store.get_instance(pre_dependencies[i]);
+ if (instance->get_enabled()) {
+ continue;
+ }
+ instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::ENABLED, instance));
+ }
+ return ErrorCode::OK;
+ } else {
+ for (auto instance : new_enabled) {
+ instance->get_interface()->disable();
+ }
+ return ErrorCode::ENABLE_INSTANCE_ENV;
}
- return ErrorCode::OK;
}
ErrorCode PluginManager::instance_disabled(std::string name) {
@@ -291,7 +263,6 @@ ErrorCode PluginManager::instance_disabled(std::string name) {
if (!instance->get_enabled()) {
return ErrorCode::DISABLE_INSTANCE_ALREADY_DISABLED;
}
- instance->set_enabled(false);
instance_run_handler->recv_queue_push(InstanceRunMessage(RunType::DISABLED, instance));
return ErrorCode::OK;
}
@@ -331,9 +302,7 @@ ErrorCode PluginManager::add_list(std::string &res) {
res += p.first + "\n";
}
res += "Installed Plugins:\n";
- res += get_plugin_in_dir(DEFAULT_COLLECTOR_PATH);
- res += get_plugin_in_dir(DEFAULT_SCENARIO_PATH);
- res += get_plugin_in_dir(DEFAULT_TUNE_PATH);
+ res += get_plugin_in_dir(DEFAULT_PLUGIN_PATH);
return ErrorCode::OK;
}
@@ -371,15 +340,15 @@ void PluginManager::pre_enable() {
}
}
-void PluginManager::pre_load_plugin(PluginType type) {
- std::string path = get_path(type);
+void PluginManager::pre_load_plugin() {
+ std::string path = get_path();
DIR *dir = opendir(path.c_str());
if (dir == nullptr) return;
struct dirent *entry;
while ((entry = readdir(dir)) != nullptr) {
std::string name = entry->d_name;
if (end_with(name, ".so")) {
- auto ret = load_plugin(name, type);
+ auto ret = load_plugin(name);
if (ret != ErrorCode::OK) {
WARN("[PluginManager] " << name << " plugin preload failed, because " << ErrorText::get_error_text(ret) << ".");
} else {
@@ -391,27 +360,13 @@ void PluginManager::pre_load_plugin(PluginType type) {
}
void PluginManager::pre_load() {
- pre_load_plugin(PluginType::COLLECTOR);
- pre_load_plugin(PluginType::SCENARIO);
- pre_load_plugin(PluginType::TUNE);
+ pre_load_plugin();
pre_enable();
}
-void* PluginManager::get_data_buffer(std::string name) {
+const void* PluginManager::get_data_buffer(std::string name) {
std::shared_ptr<Instance> instance = memory_store.get_instance(name);
- switch (instance->get_type()) {
- case PluginType::COLLECTOR: {
- CollectorInterface *collector_interface = (std::dynamic_pointer_cast<CollectorInstance>(instance))->get_interface();
- return collector_interface->get_ring_buf();
- }
- case PluginType::SCENARIO: {
- ScenarioInterface *scenario_interface = (std::dynamic_pointer_cast<ScenarioInstance>(instance))->get_interface();
- return scenario_interface->get_ring_buf();
- }
- default:
- return nullptr;
- }
- return nullptr;
+ return instance->get_interface()->get_ring_buf();
}
std::string PluginManager::instance_dep_check(const std::string &name) {
@@ -465,8 +420,7 @@ int PluginManager::run() {
switch (msg.get_opt()) {
case Opt::LOAD: {
std::string plugin_name = msg.get_payload(0);
- PluginType type = plugin_types[msg.get_payload(1)];
- ErrorCode ret_code = load_plugin(plugin_name, type);
+ ErrorCode ret_code = load_plugin(plugin_name);
if(ret_code == ErrorCode::OK) {
INFO("[PluginManager] " << plugin_name << " plugin loaded.");
res.set_opt(Opt::RESPONSE_OK);
diff --git a/src/plugin_mgr/plugin_manager.h b/src/plugin_mgr/plugin_manager.h
index 709d42e..9339f0d 100644
--- a/src/plugin_mgr/plugin_manager.h
+++ b/src/plugin_mgr/plugin_manager.h
@@ -33,10 +33,10 @@ public:
void pre_load();
void pre_enable();
void init();
- void* get_data_buffer(std::string name);
+ const void* get_data_buffer(std::string name);
private:
- void pre_load_plugin(PluginType type);
- ErrorCode load_plugin(const std::string &path, PluginType type);
+ void pre_load_plugin();
+ ErrorCode load_plugin(const std::string &path);
ErrorCode remove(const std::string &name);
ErrorCode query_all_plugins(std::string &res);
ErrorCode query_plugin(const std::string &name, std::string &res);
@@ -47,10 +47,8 @@ private:
ErrorCode add_list(std::string &res);
ErrorCode download(const std::string &name, std::string &res);
std::string instance_dep_check(const std::string &name);
- template <typename T>
- int load_dl_instance(std::shared_ptr<Plugin> plugin, T **interface_list);
- template <typename T, typename U>
- void save_instance(std::shared_ptr<Plugin> plugin, T *interface_list, int len);
+ int load_dl_instance(std::shared_ptr<Plugin> plugin, Interface **interface_list);
+ void save_instance(std::shared_ptr<Plugin> plugin, Interface *interface_list, int len);
bool load_instance(std::shared_ptr<Plugin> plugin);
void batch_load();
void batch_remove();
--
2.33.0