2 //! @file dispaatcher.cpp
3 //! @brief snmpagent - l7vsd message dispatcher
5 // Copyright (C) 2008 NTT COMWARE Corporation.
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
11 #include "dispatcher.h"
12 #include "store_mibdata.h"
14 #define SLEEPNANOTIME (1000000)
15 #define DISPATCH_LOOP_COUNT (10)
20 l7ag_dispatcher::l7ag_dispatcher()
32 l7ag_dispatcher::l7ag_dispatcher(MessageQueue *msgque, TrapQueue *trapque)
44 l7ag_dispatcher::~l7ag_dispatcher()
46 if (NULL != msg)free(msg);
53 l7ag_dispatcher::run(void *args)
55 l7ag_dispatcher *dispatcher = static_cast<l7ag_dispatcher *>(args);
59 req.tv_nsec = SLEEPNANOTIME;
61 while (!dispatcher->stop_flag) {
62 int ret = nanosleep(&req, NULL);
64 break; // nanosleep failed
66 if (!dispatcher->message_que->empty()) {
68 if (NULL == dispatcher->msg) {
69 dispatcher->msg_size = dispatcher->message_que->front().size();
70 dispatcher->msg = static_cast<char *>(malloc((sizeof(char) * dispatcher->msg_size)));
71 memcpy(dispatcher->msg, dispatcher->message_que->front().data().get(), dispatcher->msg_size);
73 dispatcher->msg_pos = dispatcher->msg_size;
74 dispatcher->msg_size = dispatcher->msg_size + dispatcher->message_que->front().size();
75 dispatcher->msg = static_cast<char *>(realloc(dispatcher->msg, (sizeof(char) * dispatcher->msg_size)));
76 //キューから取得(前に取得しているデータに追加取得)
77 memcpy(&dispatcher->msg[dispatcher->msg_pos], dispatcher->message_que->front().data().get(), dispatcher->message_que->front().size());
80 dispatcher->message_que->pop();
82 l7ag_message_header *header;
83 header = (l7ag_message_header *)dispatcher->msg;
84 if (header->size <= dispatcher->msg_size) {
86 dispatcher->dispatch_message();
87 //ディスパッチしたら、あまりデータを前に詰める
89 size_t next_size = dispatcher->msg_size - header->size;
90 char *msg_bak = static_cast<char *>(malloc(sizeof(char) * next_size));
91 memcpy(msg_bak, &dispatcher->msg[header->size], next_size);
93 dispatcher->msg = static_cast<char *>(realloc(dispatcher->msg, sizeof(char) * next_size));
95 memcpy(dispatcher->msg, msg_bak, next_size);
96 dispatcher->msg_size = next_size;
101 // TODO 複数のキューでたまる速度が遅かった場合消える?
102 if (NULL != dispatcher->msg) {
103 free(dispatcher->msg);
104 dispatcher->msg = NULL;
109 dispatcher->stop_flag = false;
118 l7ag_dispatcher::start()
121 if (0 != pthread_create(&dispatch_thread, NULL, run, this)) {
131 l7ag_dispatcher::stop()
134 // void* thread_return = NULL;
138 // if ( 0 != pthread_join( dispatch_thread, &thread_return ) ) {
139 if (0 != pthread_join(dispatch_thread, NULL)) {
150 l7ag_dispatcher::dispatch_message()
152 if (dispatch_functions.empty()) {
153 init_dispatcher_function_map();
156 char *p_message = msg;
157 l7ag_message_header *msg_header = reinterpret_cast<l7ag_message_header *>(msg);
158 if (msg_header->magic[0] != 0x4d || msg_header->magic[1] != 0x47) return;
160 //まず最初のpayload_headerにポインタをずらず
161 p_message = p_message + sizeof(l7ag_message_header);
163 for (unsigned long long i = 0; i < msg_header->payload_count; ++i) {
164 //payload_headerとpayloadは1:1
165 //payload_headerのmessage_idでディスパッチ先を決める
166 l7ag_payload_header *payload_header = reinterpret_cast<l7ag_payload_header *>(p_message);
167 if (payload_header->magic[0] != 0x50 || payload_header->magic[1] != 0x59) return;
169 char *payload = p_message + sizeof(l7ag_payload_header);
171 std::map< int, boost::function<void(void *)> >::iterator it = dispatch_functions.find(payload_header->message_id);
172 if (dispatch_functions.end() != it) {
176 p_message = p_message + payload_header->payload_datasize;
184 l7ag_dispatcher::process_setting_command(void *p_data)
186 l7ag_settingcommand_message *command = (l7ag_settingcommand_message *)p_data;
188 if (COMMAND_LOGLEVEL_CHANGE == command->command_id) {
190 l7ag_changeloglevel_parameter *param = reinterpret_cast<l7ag_changeloglevel_parameter *>(&command->data[0]);
191 change_loglevel(param->log_category, param->log_level);
192 } else if (COMMAND_SETTINGFILE_RELOAD == command->command_id) {
202 l7ag_dispatcher::process_trap_request(void *p_data)
204 l7ag_traprequest_message *command = (l7ag_traprequest_message *)p_data;
207 char *data_oid = static_cast<char *>(calloc((OIDDATASIZE + 1), sizeof(char)));
208 memcpy(data_oid, command->oid, OIDDATASIZE);
209 trap_req.oid = data_oid;
211 char *data_msg = static_cast<char *>(calloc((TRAPREQUESTMESSAGESIZE + 1), sizeof(char)));
212 memcpy(data_msg, command->message, TRAPREQUESTMESSAGESIZE);
213 trap_req.message = data_msg;
214 trap_que->push(trap_req);
218 * MIBデータ収集結果(VirtualService)
221 l7ag_dispatcher::process_mib_collect_response_vs(void *p_data)
223 bool writeflag = false;
224 l7ag_mibdata_payload_vs *command = (l7ag_mibdata_payload_vs *)p_data;
225 if (command->magic[0] != 0x56 || command->magic[1] != 0x53) return;
226 if (command->vs_count > 0) {
227 int vscount = l7ag_store_mibdata::getInstance().getVSdatacount();
228 for (int i = 0; i < vscount; ++i) {
229 if (l7ag_store_mibdata::getInstance().getVSmibdata(i)->index == command->index) {
230 l7ag_store_mibdata::getInstance().updateVSmibdata(i, command);
235 l7ag_store_mibdata::getInstance().addVSmibdata(command);
237 if (command->index == command->vs_count) {
238 // delete other index
239 l7ag_store_mibdata::getInstance().updateVSmibdata(command->vs_count, NULL);
242 l7ag_store_mibdata::getInstance().clearVSmibdata();
247 * MIBデータ収集結果(RealServer)
250 l7ag_dispatcher::process_mib_collect_response_rs(void *p_data)
252 bool writeflag = false;
253 l7ag_mibdata_payload_rs *command = (l7ag_mibdata_payload_rs *)p_data;
254 if (command->magic[0] != 0x52 || command->magic[1] != 0x53) return;
255 if (command->rs_count > 0) {
256 int rscount = l7ag_store_mibdata::getInstance().getRSdatacount();
257 for (int i = 0; i < rscount; ++i) {
258 if (l7ag_store_mibdata::getInstance().getRSmibdata(i)->index == command->index) {
259 l7ag_store_mibdata::getInstance().updateRSmibdata(i, command);
264 l7ag_store_mibdata::getInstance().addRSmibdata(command);
266 if (command->index == command->rs_count) {
267 // delete other index
268 l7ag_store_mibdata::getInstance().updateRSmibdata(command->rs_count, NULL);
271 l7ag_store_mibdata::getInstance().clearRSmibdata();
279 l7ag_dispatcher::change_loglevel(unsigned long long category, unsigned long long loglevel)
282 logger.setLogLevel(static_cast<l7vs::LOG_CATEGORY_TAG>(category), static_cast<l7vs::LOG_LEVEL_TAG>(loglevel));
289 l7ag_dispatcher::reload_configure()
291 l7vs::Parameter parameter;
292 parameter.read_file(l7vs::PARAM_COMP_SNMPAGENT);
296 l7ag_dispatcher::init_dispatcher_function_map()
298 dispatch_functions.clear();
300 dispatch_functions[MESSAGE_ID_COMMANDREQUEST] = boost::bind(&l7ag_dispatcher::process_setting_command, this, _1);
301 dispatch_functions[MESSAGE_ID_TRAPREQUEST] = boost::bind(&l7ag_dispatcher::process_trap_request, this, _1);
302 dispatch_functions[MESSAGE_ID_MIBCOLLECTRESPONSE_VS] = boost::bind(&l7ag_dispatcher::process_mib_collect_response_vs, this, _1);
303 dispatch_functions[MESSAGE_ID_MIBCOLLECTRESPONSE_RS] = boost::bind(&l7ag_dispatcher::process_mib_collect_response_rs, this, _1);