OSDN Git Service

Copyrightの表記を以下に統一
[ultramonkey-l7/ultramonkey-l7-v3.git] / snmpagent / dispatcher.cpp
1 //
2 //!    @file    dispaatcher.cpp
3 //!    @brief    snmpagent - l7vsd message dispatcher
4 //
5 //    Copyright (C) 2008  NTT COMWARE Corporation.
6 //
7 //    Distributed under the Boost Software License, Version 1.0. (See accompanying
8 //    file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 #include <sys/time.h>
11 #include "dispatcher.h"
12 #include "store_mibdata.h"
13
14 #define SLEEPNANOTIME       (1000000)
15 #define DISPATCH_LOOP_COUNT (10)
16
17 /*!
18  *
19  */
20 l7ag_dispatcher::l7ag_dispatcher()
21 {
22         stop_flag   = false;
23         msg_size    = 0;
24         msg         = NULL;
25         message_que = NULL;
26         trap_que    = NULL;
27 }
28
29 /*!
30  *
31  */
32 l7ag_dispatcher::l7ag_dispatcher(MessageQueue *msgque, TrapQueue *trapque)
33 {
34         stop_flag   = false;
35         msg_size    = 0;
36         msg         = NULL;
37         message_que = msgque;
38         trap_que    = trapque;
39 }
40
41 /*!
42  *
43  */
44 l7ag_dispatcher::~l7ag_dispatcher()
45 {
46         if (NULL != msg)free(msg);
47 }
48
49 /*!
50  *
51  */
52 void*
53 l7ag_dispatcher::run(void *args)
54 {
55         l7ag_dispatcher *dispatcher = static_cast<l7ag_dispatcher *>(args);
56
57         struct timespec req;
58         req.tv_sec = 0;
59         req.tv_nsec = SLEEPNANOTIME;
60
61         while (!dispatcher->stop_flag) {
62                 int ret = nanosleep(&req, NULL);
63                 if (ret == -1) {
64                         break; // nanosleep failed
65                 }
66                 if (!dispatcher->message_que->empty()) {
67                         //まず、取得するための領域確保
68                         if (NULL == dispatcher->msg) {
69                                 dispatcher->msg_size = dispatcher->message_que->front().size();
70                                 dispatcher->msg      = static_cast<char *>(malloc((sizeof(char) * dispatcher->msg_size)));
71                                 memcpy(dispatcher->msg, dispatcher->message_que->front().data().get(), dispatcher->msg_size);
72                         } else {
73                                 dispatcher->msg_pos  = dispatcher->msg_size;
74                                 dispatcher->msg_size = dispatcher->msg_size + dispatcher->message_que->front().size();
75                                 dispatcher->msg      = static_cast<char *>(realloc(dispatcher->msg, (sizeof(char) * dispatcher->msg_size)));
76                                 //キューから取得(前に取得しているデータに追加取得)
77                                 memcpy(&dispatcher->msg[dispatcher->msg_pos], dispatcher->message_que->front().data().get(), dispatcher->message_que->front().size());
78                         }
79                         //popする
80                         dispatcher->message_que->pop();
81                         //ヘッダのサイズ参照
82                         l7ag_message_header *header;
83                         header = (l7ag_message_header *)dispatcher->msg;
84                         if (header->size <= dispatcher->msg_size) {
85                                 //データサイズが十分ならディスパッチ
86                                 dispatcher->dispatch_message();
87                                 //ディスパッチしたら、あまりデータを前に詰める
88                                 //いったんバックアップ
89                                 size_t  next_size = dispatcher->msg_size - header->size;
90                                 char   *msg_bak   = static_cast<char *>(malloc(sizeof(char) * next_size));
91                                 memcpy(msg_bak, &dispatcher->msg[header->size], next_size);
92                                 //領域のサイズ変更
93                                 dispatcher->msg = static_cast<char *>(realloc(dispatcher->msg, sizeof(char) * next_size));
94                                 //リストアする
95                                 memcpy(dispatcher->msg, msg_bak, next_size);
96                                 dispatcher->msg_size = next_size;
97                                 free(msg_bak);
98                         }
99                         //データサイズが足りなければ次回持ち越し
100                 } else {
101                         // TODO 複数のキューでたまる速度が遅かった場合消える?
102                         if (NULL != dispatcher->msg) {
103                                 free(dispatcher->msg);
104                                 dispatcher->msg = NULL;
105                         }
106                 }
107         }
108
109         dispatcher->stop_flag = false;
110
111         return NULL;
112 }
113
114 /*!
115  *
116  */
117 bool
118 l7ag_dispatcher::start()
119 {
120         bool    retbool = true;
121         if (0 != pthread_create(&dispatch_thread, NULL, run, this)) {
122                 retbool = false;
123         }
124         return  retbool;
125 }
126
127 /*!
128  *
129  */
130 bool
131 l7ag_dispatcher::stop()
132 {
133         bool    retbool       = true;
134 //    void*   thread_return = NULL;
135
136         stop_flag = true;
137         //join
138 //    if ( 0 != pthread_join( dispatch_thread, &thread_return ) ) {
139         if (0 != pthread_join(dispatch_thread, NULL)) {
140                 //join error
141                 retbool = false;
142         }
143         return  retbool;
144 }
145
146 /*!
147  * ディスパッチ入口
148  */
149 void
150 l7ag_dispatcher::dispatch_message()
151 {
152         if (dispatch_functions.empty()) {
153                 init_dispatcher_function_map();
154         }
155
156         char   *p_message = msg;
157         l7ag_message_header    *msg_header = reinterpret_cast<l7ag_message_header *>(msg);
158         if (msg_header->magic[0] != 0x4d || msg_header->magic[1] != 0x47) return;
159
160         //まず最初のpayload_headerにポインタをずらず
161         p_message = p_message + sizeof(l7ag_message_header);
162
163         for (unsigned long long i = 0; i < msg_header->payload_count; ++i) {
164                 //payload_headerとpayloadは1:1
165                 //payload_headerのmessage_idでディスパッチ先を決める
166                 l7ag_payload_header *payload_header = reinterpret_cast<l7ag_payload_header *>(p_message);
167                 if (payload_header->magic[0] != 0x50 || payload_header->magic[1] != 0x59) return;
168                 //payloadにポインタをずらす
169                 char *payload = p_message + sizeof(l7ag_payload_header);
170
171                 std::map< int, boost::function<void(void *)> >::iterator it = dispatch_functions.find(payload_header->message_id);
172                 if (dispatch_functions.end() != it) {
173                         it->second(payload);
174                 }
175                 //次のデータにポインタをずらす
176                 p_message = p_message + payload_header->payload_datasize;
177         }
178 }
179
180 /*!
181  * 設定コマンド
182  */
183 void
184 l7ag_dispatcher::process_setting_command(void *p_data)
185 {
186         l7ag_settingcommand_message *command = (l7ag_settingcommand_message *)p_data;
187         //コマンド種別をみる
188         if (COMMAND_LOGLEVEL_CHANGE == command->command_id) {
189                 //ログレベル変更
190                 l7ag_changeloglevel_parameter  *param = reinterpret_cast<l7ag_changeloglevel_parameter *>(&command->data[0]);
191                 change_loglevel(param->log_category, param->log_level);
192         } else if (COMMAND_SETTINGFILE_RELOAD == command->command_id) {
193                 //設定ファイル再読み込み
194                 reload_configure();
195         }
196 }
197
198 /*!
199  * Trap送信リクエスト
200  */
201 void
202 l7ag_dispatcher::process_trap_request(void *p_data)
203 {
204         l7ag_traprequest_message *command = (l7ag_traprequest_message *)p_data;
205         trapdata    trap_req;
206         //OIDを取得
207         char *data_oid = static_cast<char *>(calloc((OIDDATASIZE + 1), sizeof(char)));
208         memcpy(data_oid, command->oid, OIDDATASIZE);
209         trap_req.oid   = data_oid;
210         //messageを取得
211         char *data_msg = static_cast<char *>(calloc((TRAPREQUESTMESSAGESIZE + 1), sizeof(char)));
212         memcpy(data_msg, command->message, TRAPREQUESTMESSAGESIZE);
213         trap_req.message = data_msg;
214         trap_que->push(trap_req);
215 }
216
217 /*!
218  * MIBデータ収集結果(VirtualService)
219  */
220 void
221 l7ag_dispatcher::process_mib_collect_response_vs(void *p_data)
222 {
223         bool    writeflag = false;
224         l7ag_mibdata_payload_vs *command = (l7ag_mibdata_payload_vs *)p_data;
225         if (command->magic[0] != 0x56 || command->magic[1] != 0x53) return;
226         if (command->vs_count > 0) {
227                 int vscount = l7ag_store_mibdata::getInstance().getVSdatacount();
228                 for (int i = 0; i < vscount; ++i) {
229                         if (l7ag_store_mibdata::getInstance().getVSmibdata(i)->index == command->index) {
230                                 l7ag_store_mibdata::getInstance().updateVSmibdata(i, command);
231                                 writeflag = true;
232                         }
233                 }
234                 if (!writeflag) {
235                         l7ag_store_mibdata::getInstance().addVSmibdata(command);
236                 }
237                 if (command->index == command->vs_count) {
238                         // delete other index
239                         l7ag_store_mibdata::getInstance().updateVSmibdata(command->vs_count, NULL);
240                 }
241         } else {
242                 l7ag_store_mibdata::getInstance().clearVSmibdata();
243         }
244 }
245
246 /*!
247  * MIBデータ収集結果(RealServer)
248  */
249 void
250 l7ag_dispatcher::process_mib_collect_response_rs(void *p_data)
251 {
252         bool    writeflag = false;
253         l7ag_mibdata_payload_rs *command = (l7ag_mibdata_payload_rs *)p_data;
254         if (command->magic[0] != 0x52 || command->magic[1] != 0x53) return;
255         if (command->rs_count > 0) {
256                 int rscount = l7ag_store_mibdata::getInstance().getRSdatacount();
257                 for (int i = 0; i < rscount; ++i) {
258                         if (l7ag_store_mibdata::getInstance().getRSmibdata(i)->index == command->index) {
259                                 l7ag_store_mibdata::getInstance().updateRSmibdata(i, command);
260                                 writeflag = true;
261                         }
262                 }
263                 if (!writeflag) {
264                         l7ag_store_mibdata::getInstance().addRSmibdata(command);
265                 }
266                 if (command->index == command->rs_count) {
267                         // delete other index
268                         l7ag_store_mibdata::getInstance().updateRSmibdata(command->rs_count, NULL);
269                 }
270         } else {
271                 l7ag_store_mibdata::getInstance().clearRSmibdata();
272         }
273 }
274
275 /*!
276  *
277  */
278 void
279 l7ag_dispatcher::change_loglevel(unsigned long long category, unsigned long long loglevel)
280 {
281         l7vs::Logger    logger;
282         logger.setLogLevel(static_cast<l7vs::LOG_CATEGORY_TAG>(category), static_cast<l7vs::LOG_LEVEL_TAG>(loglevel));
283 }
284
285 /*!
286  *
287  */
288 void
289 l7ag_dispatcher::reload_configure()
290 {
291         l7vs::Parameter        parameter;
292         parameter.read_file(l7vs::PARAM_COMP_SNMPAGENT);
293 }
294
295 void
296 l7ag_dispatcher::init_dispatcher_function_map()
297 {
298         dispatch_functions.clear();
299
300         dispatch_functions[MESSAGE_ID_COMMANDREQUEST]        = boost::bind(&l7ag_dispatcher::process_setting_command, this, _1);
301         dispatch_functions[MESSAGE_ID_TRAPREQUEST]           = boost::bind(&l7ag_dispatcher::process_trap_request, this, _1);
302         dispatch_functions[MESSAGE_ID_MIBCOLLECTRESPONSE_VS] = boost::bind(&l7ag_dispatcher::process_mib_collect_response_vs, this, _1);
303         dispatch_functions[MESSAGE_ID_MIBCOLLECTRESPONSE_RS] = boost::bind(&l7ag_dispatcher::process_mib_collect_response_rs, this, _1);
304 }