OSDN Git Service

サーバーconfig.jsonのオートリロード機能を追加
[mmo/main.git] / server / Server.cpp
1 //\r
2 // Server.cpp\r
3 //\r
4 \r
5 #include "Server.hpp"\r
6 #include "version.hpp"\r
7 #include <algorithm>\r
8 #include <boost/make_shared.hpp>\r
9 #include <boost/foreach.hpp>\r
10 #include "../common/Logger.hpp"\r
11 #include "../common/network/Command.hpp"\r
12 #include "../common/network/Utils.hpp"\r
13 \r
14 namespace network {\r
15 \r
16     Server::Server(Config& config) :\r
17                         config_(config),\r
18             endpoint_(tcp::v4(), config.port()),\r
19             acceptor_(io_service_, endpoint_),\r
20             socket_udp_(io_service_, udp::endpoint(udp::v4(), config.port())),\r
21             udp_packet_count_(0)\r
22     {\r
23     }\r
24 \r
25     void Server::Start(CallbackFuncPtr callback)\r
26     {\r
27         callback_ = std::make_shared<CallbackFunc>(\r
28                 [&](network::Command c){\r
29 \r
30             // ログアウト\r
31             if (c.header() == network::header::FatalConnectionError) {\r
32                 if (callback) {\r
33                                         (*callback)(c);\r
34                                 }\r
35             } else if (auto session = c.session().lock()) {\r
36                                 auto read_average = session->GetReadByteAverage();\r
37                                 if (read_average > config_.receive_limit_2()) {\r
38                                         Logger::Info(_T("Banished a session: %d %dbyte/s"), session->id(), read_average);\r
39                                         session->Close();\r
40                                 } else if(read_average > config_.receive_limit_1()) {\r
41                                         Logger::Info(_T("Receive limit exceeded: %d: %d byte/s"), session->id(), read_average);\r
42                                 } else {\r
43                                         if (callback) {\r
44                                                 (*callback)(c);\r
45                                         }\r
46                 }\r
47             }\r
48 \r
49         });\r
50 \r
51         {\r
52         auto new_session = boost::make_shared<ServerSession>(io_service_);\r
53         acceptor_.async_accept(new_session->tcp_socket(),\r
54                               boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));\r
55         }\r
56 \r
57         {\r
58             socket_udp_.async_receive_from(\r
59                 boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,\r
60                 boost::bind(&Server::ReceiveUDP, this,\r
61                   boost::asio::placeholders::error,\r
62                   boost::asio::placeholders::bytes_transferred));\r
63         }\r
64 \r
65         boost::asio::io_service::work work(io_service_);\r
66         io_service_.run();\r
67     }\r
68 \r
69     void Server::Stop()\r
70     {\r
71         io_service_.stop();\r
72                 Logger::Info("stop server");\r
73     }\r
74     void Server::Stop(int innterrupt_type)\r
75     {\r
76         io_service_.stop();\r
77         Logger::Info(_T("stop server innterrupt_type=%d"),innterrupt_type);\r
78     }\r
79 \r
80         int Server::GetUserCount() const\r
81         {\r
82                 auto count = std::count_if(sessions_.begin(), sessions_.end(),\r
83                         [](const SessionWeakPtr& s){ return !s.expired() && s.lock()->online(); });\r
84 \r
85                 return count;\r
86         }\r
87 \r
88         std::string Server::GetStatusJSON() const\r
89         {\r
90                 auto msg = (\r
91                                         boost::format("{\"nam\":\"%s\",\"ver\":\"%d.%d.%d\",\"cnt\":%d,\"cap\":%d,\"stg\":\"%s\"}")\r
92                                                 % config_.server_name()\r
93                                                 % MMO_VERSION_MAJOR % MMO_VERSION_MINOR % MMO_VERSION_REVISION\r
94                                                 % GetUserCount()\r
95                                                 % config_.capacity()\r
96                                                 % config_.stage()\r
97                                         ).str();\r
98 \r
99                 return msg;\r
100         }\r
101 \r
102     bool Server::Empty() const\r
103     {\r
104         return GetUserCount() == 0;\r
105     }\r
106 \r
107         bool Server::IsBlockedAddress(const boost::asio::ip::address& address)\r
108         {\r
109                 BOOST_FOREACH(const auto& pattern, config_.blocking_address_patterns()) {\r
110                         if (network::Utils::MatchWithWildcard(pattern, address.to_string())) {\r
111                                 return true;\r
112                         }\r
113                 }\r
114                 return false;\r
115         }\r
116 \r
117     void Server::ReceiveSession(const SessionPtr& session, const boost::system::error_code& error)\r
118     {\r
119                 \r
120                 config_.Reload();\r
121 \r
122                 const auto address = session->tcp_socket().remote_endpoint().address();\r
123                 if(IsBlockedAddress(address)) {\r
124                         Logger::Info("Blocked IP Address: %s", address);\r
125             session->Close();\r
126 \r
127                 } else if (GetUserCount() >= config_.capacity()) {\r
128                         Logger::Info("Refused Session");\r
129             session->SyncSend(ClientReceiveServerCrowdedError());\r
130             session->Close();\r
131 \r
132         } else {\r
133             session->set_on_receive(callback_);\r
134             session->Start();\r
135             sessions_.push_back(SessionWeakPtr(session));\r
136 \r
137             // クライアント情報を要求\r
138             session->Send(ClientRequestedClientInfo());\r
139         }\r
140 \r
141         auto new_session = boost::make_shared<ServerSession>(io_service_);\r
142          acceptor_.async_accept(new_session->tcp_socket(),\r
143                  boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));\r
144 \r
145                 RefreshSession();\r
146     }\r
147 \r
148         void Server::RefreshSession()\r
149         {\r
150                 // 使用済のセッションのポインタを破棄\r
151         auto it = std::remove_if(sessions_.begin(), sessions_.end(),\r
152                 [](const SessionWeakPtr& ptr){\r
153             return ptr.expired();\r
154         });\r
155         sessions_.erase(it, sessions_.end());\r
156                 Logger::Info("Active connection: %d", GetUserCount());\r
157         }\r
158 \r
159     void Server::SendAll(const Command& command, int channel, bool limited)\r
160     {\r
161         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {\r
162             if (auto session = ptr.lock()) {\r
163                                 if (channel < 0 || (channel >= 0 && session->channel() == channel)) {\r
164                                         if (!limited || session->write_average_limit() > session->GetWriteByteAverage()) {\r
165                                                 session->Send(command);\r
166                                         }\r
167                                 }\r
168             }\r
169         }\r
170     }\r
171 \r
172     void Server::SendOthers(const Command& command, uint32_t self_id, int channel, bool limited)\r
173     {\r
174         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {\r
175             if (auto session = ptr.lock()) {\r
176                                 if (channel < 0 || (channel >= 0 && session->channel() == channel)) {\r
177                                         if (!limited || session->write_average_limit() > session->GetWriteByteAverage()) {\r
178                                                 if (session->id() != self_id) {\r
179                                                         session->Send(command);\r
180                                                 }\r
181                                         }\r
182                                 }\r
183             }\r
184         }\r
185     }\r
186         \r
187     void Server::SendTo(const Command& command, uint32_t user_id)\r
188         {\r
189                 auto it = std::find_if(sessions_.begin(), sessions_.end(),\r
190                         [user_id](SessionWeakPtr& ptr){\r
191                                 return ptr.lock()->id() == user_id;\r
192                         });\r
193                 \r
194                 if (it != sessions_.end()) {\r
195                         it->lock()->Send(command);\r
196                 }\r
197         }\r
198 \r
199     void Server::SendUDPTestPacket(const std::string& ip_address, uint16_t port)\r
200     {\r
201         using boost::asio::ip::udp;\r
202 \r
203         std::stringstream port_str;\r
204         port_str << (int)port;\r
205 \r
206         udp::resolver resolver(io_service_);\r
207         udp::resolver::query query(udp::v4(), ip_address.c_str(), port_str.str().c_str());\r
208         udp::resolver::iterator iterator = resolver.resolve(query);\r
209 \r
210         static char request[] = "MMO UDP Test Packet";\r
211         for (int i = 0; i < UDP_TEST_PACKET_TIME; i++) {\r
212 \r
213             io_service_.post(boost::bind(&Server::DoWriteUDP, this, request, *iterator));\r
214         }\r
215     }\r
216 \r
217     void Server::SendUDP(const std::string& message, const boost::asio::ip::udp::endpoint endpoint)\r
218     {\r
219                 io_service_.post(boost::bind(&Server::DoWriteUDP, this, message, endpoint));\r
220     }\r
221 \r
222     void Server::ReceiveUDP(const boost::system::error_code& error, size_t bytes_recvd)\r
223     {\r
224         if (bytes_recvd > 0) {\r
225             std::string buffer(receive_buf_udp_, bytes_recvd);\r
226             FetchUDP(buffer, sender_endpoint_);\r
227         }\r
228         if (!error) {\r
229           socket_udp_.async_receive_from(\r
230               boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,\r
231               boost::bind(&Server::ReceiveUDP, this,\r
232                 boost::asio::placeholders::error,\r
233                 boost::asio::placeholders::bytes_transferred));\r
234         } else {\r
235             Logger::Error("%s", error.message());\r
236         }\r
237     }\r
238 \r
239     void Server::DoWriteUDP(const std::string& msg, const udp::endpoint& endpoint)\r
240     {\r
241         boost::shared_ptr<std::string> s = \r
242               boost::make_shared<std::string>(msg.data(), msg.size());\r
243 \r
244         socket_udp_.async_send_to(\r
245             boost::asio::buffer(s->data(), s->size()), endpoint,\r
246             boost::bind(&Server::WriteUDP, this,\r
247               boost::asio::placeholders::error, s));\r
248     }\r
249 \r
250     void Server::WriteUDP(const boost::system::error_code& error, boost::shared_ptr<std::string> holder)\r
251     {\r
252 //        if (!error) {\r
253 //            if (!send_queue_.empty()) {\r
254 //                  send_queue_.pop();\r
255 //                  if (!send_queue_.empty())\r
256 //                  {\r
257 //                    boost::asio::async_write(socket_tcp_,\r
258 //                        boost::asio::buffer(send_queue_.front().data(),\r
259 //                          send_queue_.front().size()),\r
260 //                        boost::bind(&Session::WriteTCP, this,\r
261 //                          boost::asio::placeholders::error));\r
262 //                  }\r
263 //            }\r
264 //        } else {\r
265 //            FatalError();\r
266 //        }\r
267     }\r
268 \r
269     void Server::FetchUDP(const std::string& buffer, const boost::asio::ip::udp::endpoint endpoint)\r
270     {\r
271         uint8_t header;\r
272         std::string body;\r
273         SessionWeakPtr session;\r
274 \r
275                 // IPアドレスとポートからセッションを特定\r
276                 auto it = std::find_if(sessions_.begin(), sessions_.end(),\r
277                         [&endpoint](const SessionWeakPtr& session) -> bool {\r
278                                 if (auto session_ptr = session.lock()) {\r
279 \r
280                                         const auto session_endpoint = session_ptr->tcp_socket().remote_endpoint();\r
281                                         const auto session_port = session_ptr->udp_port();\r
282 \r
283                                         return (session_endpoint.address() == endpoint.address() &&\r
284                                                 session_port == endpoint.port());\r
285 \r
286                                 } else {\r
287                                         return false;\r
288                                 }\r
289                         });\r
290 \r
291                 if (it != sessions_.end()) {\r
292                         session = *it;\r
293                         Logger::Debug("Receive UDP Command: %d", session.lock()->id());\r
294                 } else {\r
295                         Logger::Debug("Receive anonymous UDP Command");\r
296                 }\r
297 \r
298         if (buffer.size() > network::Utils::Deserialize(buffer, &header)) {\r
299                         body = buffer.substr(sizeof(header));\r
300                 }\r
301 \r
302         // 復号\r
303         if (session.lock() && header == header::ENCRYPT_HEADER) {\r
304             body.erase(0, sizeof(header));\r
305                         body = session.lock()->encrypter().Decrypt(body);\r
306             Utils::Deserialize(body, &header);\r
307                         body = buffer.substr(sizeof(header));\r
308         }\r
309 \r
310                 if (header == network::header::ServerRequstedStatus) {\r
311                         SendUDP(GetStatusJSON(), endpoint);\r
312                 } else {\r
313                         if (callback_) {\r
314                                 (*callback_)(Command(static_cast<network::header::CommandHeader>(header), body, session));\r
315                         }\r
316                 }\r
317 \r
318     }\r
319 \r
320     void Server::ServerSession::Start()\r
321     {\r
322         online_ = true;\r
323 \r
324         // Nagleアルゴリズムを無効化\r
325         socket_tcp_.set_option(boost::asio::ip::tcp::no_delay(true));\r
326 \r
327                 // バッファサイズを変更 1MiB\r
328                 boost::asio::socket_base::receive_buffer_size option(1048576);\r
329                 socket_tcp_.set_option(option);\r
330 \r
331         // IPアドレスを取得\r
332         global_ip_ = socket_tcp_.remote_endpoint().address().to_string();\r
333 \r
334         boost::asio::async_read_until(socket_tcp_,\r
335             receive_buf_, NETWORK_UTILS_DELIMITOR,\r
336             boost::bind(\r
337               &ServerSession::ReceiveTCP, shared_from_this(),\r
338               boost::asio::placeholders::error));\r
339     }\r
340 }\r