OSDN Git Service

Sceneに標準でProcessInputメソッドを追加
[mmo/main.git] / server / Server.cpp
1 //\r
2 // Server.cpp\r
3 //\r
4 \r
5 #include "Server.hpp"\r
6 #include "version.hpp"\r
7 #include <algorithm>\r
8 #include <boost/make_shared.hpp>\r
9 #include <boost/foreach.hpp>\r
10 #include "../common/Logger.hpp"\r
11 #include "../common/network/Command.hpp"\r
12 #include "../common/network/Utils.hpp"\r
13 \r
14 namespace network {\r
15 \r
16     Server::Server(const Config& config) :\r
17                         config_(config),\r
18             endpoint_(tcp::v4(), config.port()),\r
19             acceptor_(io_service_, endpoint_),\r
20             socket_udp_(io_service_, udp::endpoint(udp::v4(), config.port())),\r
21             udp_packet_count_(0)\r
22     {\r
23     }\r
24 \r
25     void Server::Start(CallbackFuncPtr callback)\r
26     {\r
27         callback_ = std::make_shared<CallbackFunc>(\r
28                 [&](network::Command c){\r
29 \r
30             // ログアウト\r
31             if (c.header() == network::header::FatalConnectionError) {\r
32                 if (callback) {\r
33                                         (*callback)(c);\r
34                                 }\r
35             } else if (auto session = c.session().lock()) {\r
36                                 auto read_average = session->GetReadByteAverage();\r
37                                 if (read_average > config_.receive_limit_2()) {\r
38                                         Logger::Info(_T("Banished session: %d"), session->id());\r
39                                         session->Close();\r
40                                 } else if(read_average > config_.receive_limit_1()) {\r
41                                         Logger::Info(_T("Receive limit exceeded: %d: %d byte/s"), session->id(), read_average);\r
42                                 } else {\r
43                                         if (callback) {\r
44                                                 (*callback)(c);\r
45                                         }\r
46                 }\r
47             }\r
48 \r
49         });\r
50 \r
51         {\r
52         auto new_session = boost::make_shared<ServerSession>(io_service_);\r
53         acceptor_.async_accept(new_session->tcp_socket(),\r
54                               boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));\r
55         }\r
56 \r
57         {\r
58             socket_udp_.async_receive_from(\r
59                 boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,\r
60                 boost::bind(&Server::ReceiveUDP, this,\r
61                   boost::asio::placeholders::error,\r
62                   boost::asio::placeholders::bytes_transferred));\r
63         }\r
64 \r
65         boost::asio::io_service::work work(io_service_);\r
66         io_service_.run();\r
67     }\r
68 \r
69     void Server::Stop()\r
70     {\r
71         io_service_.stop();\r
72                 Logger::Info("stop server");\r
73     }\r
74     void Server::Stop(int innterrupt_type)\r
75     {\r
76         io_service_.stop();\r
77         Logger::Info(_T("stop server innterrupt_type=%d"),innterrupt_type);\r
78     }\r
79 \r
80         int Server::GetUserCount() const\r
81         {\r
82                 auto count = std::count_if(sessions_.begin(), sessions_.end(),\r
83                         [](const SessionWeakPtr& s){ return !s.expired() && s.lock()->online(); });\r
84 \r
85                 return count;\r
86         }\r
87 \r
88         std::string Server::GetStatusJSON() const\r
89         {\r
90                 auto msg = (\r
91                                         boost::format("{\"nam\":\"%s\",\"ver\":\"%d.%d.%d\",\"cnt\":%d,\"cap\":%d,\"stg\":\"%s\"}")\r
92                                                 % config_.server_name()\r
93                                                 % MMO_VERSION_MAJOR % MMO_VERSION_MINOR % MMO_VERSION_REVISION\r
94                                                 % GetUserCount()\r
95                                                 % config_.capacity()\r
96                                                 % config_.stage()\r
97                                         ).str();\r
98 \r
99                 return msg;\r
100         }\r
101 \r
102     bool Server::Empty() const\r
103     {\r
104         return GetUserCount() == 0;\r
105     }\r
106 \r
107         bool Server::IsBlockedAddress(const boost::asio::ip::address& address)\r
108         {\r
109                 BOOST_FOREACH(const auto& pattern, config_.blocking_address_patterns()) {\r
110                         if (network::Utils::MatchWithWildcard(pattern, address.to_string())) {\r
111                                 return true;\r
112                         }\r
113                 }\r
114                 return false;\r
115         }\r
116 \r
117     void Server::ReceiveSession(const SessionPtr& session, const boost::system::error_code& error)\r
118     {\r
119                 const auto address = session->tcp_socket().remote_endpoint().address();\r
120                 if(IsBlockedAddress(address)) {\r
121                         Logger::Info("Blocked IP Address: %s", address);\r
122             session->Close();\r
123 \r
124                 } else if (GetUserCount() >= config_.capacity()) {\r
125                         Logger::Info("Refused Session");\r
126             session->SyncSend(ClientReceiveServerCrowdedError());\r
127             session->Close();\r
128 \r
129         } else {\r
130             session->set_on_receive(callback_);\r
131             session->Start();\r
132             sessions_.push_back(SessionWeakPtr(session));\r
133 \r
134             // クライアント情報を要求\r
135             session->Send(ClientRequestedClientInfo());\r
136         }\r
137 \r
138         auto new_session = boost::make_shared<ServerSession>(io_service_);\r
139          acceptor_.async_accept(new_session->tcp_socket(),\r
140                  boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));\r
141 \r
142                 RefreshSession();\r
143     }\r
144 \r
145         void Server::RefreshSession()\r
146         {\r
147                 // 使用済のセッションのポインタを破棄\r
148         auto it = std::remove_if(sessions_.begin(), sessions_.end(),\r
149                 [](const SessionWeakPtr& ptr){\r
150             return ptr.expired();\r
151         });\r
152         sessions_.erase(it, sessions_.end());\r
153                 Logger::Info("Active connection: %d", GetUserCount());\r
154         }\r
155 \r
156     void Server::SendAll(const Command& command)\r
157     {\r
158         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {\r
159             if (auto session = ptr.lock()) {\r
160                 session->Send(command);\r
161             }\r
162         }\r
163     }\r
164 \r
165     void Server::SendAllLimited(const Command& command)\r
166     {\r
167         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {\r
168             if (auto session = ptr.lock()) {\r
169                                 if (session->write_average_limit() > session->GetWriteByteAverage()) {\r
170                                         session->Send(command);\r
171                                 }\r
172             }\r
173         }\r
174     }\r
175         \r
176     void Server::SendTo(const Command& command, uint32_t user_id)\r
177         {\r
178                 auto it = std::find_if(sessions_.begin(), sessions_.end(),\r
179                         [user_id](SessionWeakPtr& ptr){\r
180                                 return ptr.lock()->id() == user_id;\r
181                         });\r
182                 \r
183                 if (it != sessions_.end()) {\r
184                         it->lock()->Send(command);\r
185                 }\r
186         }\r
187 \r
188     void Server::SendOthers(const Command& command, SessionWeakPtr self_ptr)\r
189     {\r
190         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {\r
191             if (auto session = ptr.lock()) {\r
192                 if (auto self = self_ptr.lock()) {\r
193                     if (*session != *self) {\r
194                         session->Send(command);\r
195                     }\r
196                 }\r
197             }\r
198         }\r
199     }\r
200 \r
201     void Server::SendOthersLimited(const Command& command, SessionWeakPtr self_ptr)\r
202     {\r
203         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {\r
204             if (auto session = ptr.lock()) {\r
205                 if (auto self = self_ptr.lock()) {\r
206                     if (*session != *self) {\r
207                                                 if (session->write_average_limit() > session->GetWriteByteAverage()) {\r
208                                                         session->Send(command);\r
209                                                 }\r
210                     }\r
211                 }\r
212             }\r
213         }\r
214     }\r
215 \r
216     void Server::SendUDPTestPacket(const std::string& ip_address, uint16_t port)\r
217     {\r
218         using boost::asio::ip::udp;\r
219 \r
220         std::stringstream port_str;\r
221         port_str << (int)port;\r
222 \r
223         udp::resolver resolver(io_service_);\r
224         udp::resolver::query query(udp::v4(), ip_address.c_str(), port_str.str().c_str());\r
225         udp::resolver::iterator iterator = resolver.resolve(query);\r
226 \r
227         static char request[] = "MMO UDP Test Packet";\r
228         for (int i = 0; i < UDP_TEST_PACKET_TIME; i++) {\r
229 \r
230             io_service_.post(boost::bind(&Server::DoWriteUDP, this, request, *iterator));\r
231         }\r
232     }\r
233 \r
234     void Server::SendUDP(const std::string& message, const boost::asio::ip::udp::endpoint endpoint)\r
235     {\r
236                 io_service_.post(boost::bind(&Server::DoWriteUDP, this, message, endpoint));\r
237     }\r
238 \r
239     void Server::ReceiveUDP(const boost::system::error_code& error, size_t bytes_recvd)\r
240     {\r
241         if (bytes_recvd > 0) {\r
242             std::string buffer(receive_buf_udp_, bytes_recvd);\r
243             FetchUDP(buffer, sender_endpoint_);\r
244         }\r
245         if (!error) {\r
246           socket_udp_.async_receive_from(\r
247               boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,\r
248               boost::bind(&Server::ReceiveUDP, this,\r
249                 boost::asio::placeholders::error,\r
250                 boost::asio::placeholders::bytes_transferred));\r
251         } else {\r
252             Logger::Error("%s", error.message());\r
253         }\r
254     }\r
255 \r
256     void Server::DoWriteUDP(const std::string& msg, const udp::endpoint& endpoint)\r
257     {\r
258         boost::shared_ptr<std::string> s = \r
259               boost::make_shared<std::string>(msg.data(), msg.size());\r
260 \r
261         socket_udp_.async_send_to(\r
262             boost::asio::buffer(s->data(), s->size()), endpoint,\r
263             boost::bind(&Server::WriteUDP, this,\r
264               boost::asio::placeholders::error, s));\r
265     }\r
266 \r
267     void Server::WriteUDP(const boost::system::error_code& error, boost::shared_ptr<std::string> holder)\r
268     {\r
269 //        if (!error) {\r
270 //            if (!send_queue_.empty()) {\r
271 //                  send_queue_.pop();\r
272 //                  if (!send_queue_.empty())\r
273 //                  {\r
274 //                    boost::asio::async_write(socket_tcp_,\r
275 //                        boost::asio::buffer(send_queue_.front().data(),\r
276 //                          send_queue_.front().size()),\r
277 //                        boost::bind(&Session::WriteTCP, this,\r
278 //                          boost::asio::placeholders::error));\r
279 //                  }\r
280 //            }\r
281 //        } else {\r
282 //            FatalError();\r
283 //        }\r
284     }\r
285 \r
286     void Server::FetchUDP(const std::string& buffer, const boost::asio::ip::udp::endpoint endpoint)\r
287     {\r
288         uint8_t header;\r
289         std::string body;\r
290         SessionWeakPtr session;\r
291 \r
292                 // IPアドレスとポートからセッションを特定\r
293                 auto it = std::find_if(sessions_.begin(), sessions_.end(),\r
294                         [&endpoint](const SessionWeakPtr& session) -> bool {\r
295                                 if (auto session_ptr = session.lock()) {\r
296 \r
297                                         const auto session_endpoint = session_ptr->tcp_socket().remote_endpoint();\r
298                                         const auto session_port = session_ptr->udp_port();\r
299 \r
300                                         return (session_endpoint.address() == endpoint.address() &&\r
301                                                 session_port == endpoint.port());\r
302 \r
303                                 } else {\r
304                                         return false;\r
305                                 }\r
306                         });\r
307 \r
308                 if (it != sessions_.end()) {\r
309                         session = *it;\r
310                         Logger::Debug("Receive UDP Command: %d", session.lock()->id());\r
311                 } else {\r
312                         Logger::Debug("Receive anonymous UDP Command");\r
313                 }\r
314 \r
315         if (buffer.size() > network::Utils::Deserialize(buffer, &header)) {\r
316                         body = buffer.substr(sizeof(header));\r
317                 }\r
318 \r
319         // 復号\r
320         if (session.lock() && header == header::ENCRYPT_HEADER) {\r
321             body.erase(0, sizeof(header));\r
322                         body = session.lock()->encrypter().Decrypt(body);\r
323             Utils::Deserialize(body, &header);\r
324                         body = buffer.substr(sizeof(header));\r
325         }\r
326 \r
327                 if (header == network::header::ServerRequstedStatus) {\r
328                         SendUDP(GetStatusJSON(), endpoint);\r
329                 } else {\r
330                         if (callback_) {\r
331                                 (*callback_)(Command(static_cast<network::header::CommandHeader>(header), body, session));\r
332                         }\r
333                 }\r
334 \r
335     }\r
336 \r
337     void Server::ServerSession::Start()\r
338     {\r
339         online_ = true;\r
340 \r
341         // Nagleアルゴリズムを無効化\r
342         socket_tcp_.set_option(boost::asio::ip::tcp::no_delay(true));\r
343 \r
344                 // バッファサイズを変更 1MiB\r
345                 boost::asio::socket_base::receive_buffer_size option(1048576);\r
346                 socket_tcp_.set_option(option);\r
347 \r
348         // IPアドレスを取得\r
349         global_ip_ = socket_tcp_.remote_endpoint().address().to_string();\r
350 \r
351         boost::asio::async_read_until(socket_tcp_,\r
352             receive_buf_, NETWORK_UTILS_DELIMITOR,\r
353             boost::bind(\r
354               &ServerSession::ReceiveTCP, shared_from_this(),\r
355               boost::asio::placeholders::error));\r
356     }\r
357 }\r