OSDN Git Service

UDPコマンドの受信を制限
[mmo/main.git] / server / Server.cpp
1 //
2 // Server.cpp
3 //
4
5 #include "Server.hpp"
6 #include "version.hpp"
7 #include <algorithm>
8 #include <boost/make_shared.hpp>
9 #include <boost/foreach.hpp>
10 #include "../common/Logger.hpp"
11 #include "../common/network/Command.hpp"
12 #include "../common/network/Utils.hpp"
13
14 namespace network {
15
16     Server::Server(const Config& config) :
17                         config_(config),
18             endpoint_(tcp::v4(), config.port()),
19             acceptor_(io_service_, endpoint_),
20             socket_udp_(io_service_, udp::endpoint(udp::v4(), config.port())),
21             udp_packet_count_(0),
22             max_total_read_average_(5000),
23             max_session_read_average_(600),
24             min_session_read_average_(100),
25             session_read_average_(200)
26     {
27     }
28
29     void Server::Start(CallbackFuncPtr callback)
30     {
31         callback_ = std::make_shared<CallbackFunc>(
32                 [&](network::Command c){
33
34             // ログアウト
35             if (c.header() == network::header::FatalConnectionError) {
36                 // 受信制限量を更新
37                 /*
38                 auto new_average = GetSessionReadAverageLimit();
39                 if (session_read_average_ != new_average) {
40                     session_read_average_ = new_average;
41                     SendAll(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
42                 }
43                 */
44             }
45
46             // 通信量制限を越えていた場合、強制的に切断
47      //       else if (auto session = c.session().lock()) {
48      //           if (session->GetReadByteAverage() > session_read_average_) {
49                                         //Logger::Info(_T("Banished session: %d"), session->id());
50      //               session->Close();
51      //           }
52      //       }
53
54             if (callback) {
55                 (*callback)(c);
56             }
57         });
58
59         {
60         auto new_session = boost::make_shared<ServerSession>(io_service_);
61         acceptor_.async_accept(new_session->tcp_socket(),
62                               boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
63         }
64
65         {
66             socket_udp_.async_receive_from(
67                 boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
68                 boost::bind(&Server::ReceiveUDP, this,
69                   boost::asio::placeholders::error,
70                   boost::asio::placeholders::bytes_transferred));
71         }
72
73         boost::asio::io_service::work work(io_service_);
74         io_service_.run();
75     }
76
77     void Server::Stop()
78     {
79         io_service_.stop();
80                 Logger::Info("stop server");
81     }
82     void Server::Stop(int innterrupt_type)
83     {
84         io_service_.stop();
85         Logger::Info(_T("stop server innterrupt_type=%d"),innterrupt_type);
86     }
87
88         int Server::GetUserCount() const
89         {
90                 auto count = std::count_if(sessions_.begin(), sessions_.end(),
91                         [](const SessionWeakPtr& s){ return !s.expired() && s.lock()->online(); });
92
93                 return count;
94         }
95
96         std::string Server::GetStatusJSON() const
97         {
98                 auto msg = (
99                                         boost::format("{\"nam\":\"%s\",\"ver\":\"%d.%d.%d\",\"cnt\":%d,\"cap\":%d,\"stg\":\"%s\"}")
100                                                 % config_.server_name()
101                                                 % MMO_VERSION_MAJOR % MMO_VERSION_MINOR % MMO_VERSION_REVISION
102                                                 % GetUserCount()
103                                                 % config_.capacity()
104                                                 % config_.stage()
105                                         ).str();
106
107                 return msg;
108         }
109
110     bool Server::Empty() const
111     {
112         return GetUserCount() == 0;
113     }
114
115     void Server::ReceiveSession(const SessionPtr& session, const boost::system::error_code& error)
116     {
117                 if (GetUserCount() < config_.capacity()) {
118             session->set_on_receive(callback_);
119             session->Start();
120             sessions_.push_back(SessionWeakPtr(session));
121
122             // クライアント情報を要求
123             session->Send(ClientRequestedClientInfo());
124
125             // 受信制限量を更新
126             auto new_average = GetSessionReadAverageLimit();
127             session->Send(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
128
129             if (session_read_average_ != new_average) {
130                 session_read_average_ = new_average;
131                 SendOthers(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_),
132                         session);
133             }
134
135         } else {
136             Logger::Info("Refuse Session");
137             session->SyncSend(ClientReceiveServerCrowdedError());
138             session->Close();
139         }
140
141         auto new_session = boost::make_shared<ServerSession>(io_service_);
142          acceptor_.async_accept(new_session->tcp_socket(),
143                  boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
144
145                 RefreshSession();
146     }
147
148         void Server::RefreshSession()
149         {
150                 // 使用済のセッションのポインタを破棄
151         auto it = std::remove_if(sessions_.begin(), sessions_.end(),
152                 [](const SessionWeakPtr& ptr){
153             return ptr.expired();
154         });
155         sessions_.erase(it, sessions_.end());
156                 Logger::Info("Active connection: %d", GetUserCount());
157         }
158
159     void Server::SendAll(const Command& command)
160     {
161         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {
162             if (auto session = ptr.lock()) {
163                 session->Send(command);
164             }
165         }
166     }
167
168     void Server::SendOthers(const Command& command, SessionWeakPtr self_ptr)
169     {
170         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {
171             if (auto session = ptr.lock()) {
172                 if (auto self = self_ptr.lock()) {
173                     if (*session != *self) {
174                         session->Send(command);
175                     }
176                 }
177             }
178         }
179     }
180
181     void Server::SendUDPTestPacket(const std::string& ip_address, uint16_t port)
182     {
183         using boost::asio::ip::udp;
184
185         std::stringstream port_str;
186         port_str << (int)port;
187
188         udp::resolver resolver(io_service_);
189         udp::resolver::query query(udp::v4(), ip_address.c_str(), port_str.str().c_str());
190         udp::resolver::iterator iterator = resolver.resolve(query);
191
192         static char request[] = "MMO UDP Test Packet";
193         for (int i = 0; i < UDP_TEST_PACKET_TIME; i++) {
194
195             io_service_.post(boost::bind(&Server::DoWriteUDP, this, request, *iterator));
196         }
197     }
198
199     void Server::SendUDP(const std::string& message, const boost::asio::ip::udp::endpoint endpoint)
200     {
201                 io_service_.post(boost::bind(&Server::DoWriteUDP, this, message, endpoint));
202     }
203
204     void Server::ReceiveUDP(const boost::system::error_code& error, size_t bytes_recvd)
205     {
206         if (bytes_recvd > 0) {
207             std::string buffer(receive_buf_udp_, bytes_recvd);
208             FetchUDP(buffer, sender_endpoint_);
209         }
210         if (!error) {
211           socket_udp_.async_receive_from(
212               boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
213               boost::bind(&Server::ReceiveUDP, this,
214                 boost::asio::placeholders::error,
215                 boost::asio::placeholders::bytes_transferred));
216         } else {
217             Logger::Error("%s", error.message());
218         }
219     }
220
221     void Server::DoWriteUDP(const std::string& msg, const udp::endpoint& endpoint)
222     {
223         boost::shared_ptr<std::string> s = 
224               boost::make_shared<std::string>(msg.data(), msg.size());
225
226         socket_udp_.async_send_to(
227             boost::asio::buffer(s->data(), s->size()), endpoint,
228             boost::bind(&Server::WriteUDP, this,
229               boost::asio::placeholders::error, s));
230     }
231
232     void Server::WriteUDP(const boost::system::error_code& error, boost::shared_ptr<std::string> holder)
233     {
234 //        if (!error) {
235 //            if (!send_queue_.empty()) {
236 //                  send_queue_.pop();
237 //                  if (!send_queue_.empty())
238 //                  {
239 //                    boost::asio::async_write(socket_tcp_,
240 //                        boost::asio::buffer(send_queue_.front().data(),
241 //                          send_queue_.front().size()),
242 //                        boost::bind(&Session::WriteTCP, this,
243 //                          boost::asio::placeholders::error));
244 //                  }
245 //            }
246 //        } else {
247 //            FatalError();
248 //        }
249     }
250
251     void Server::FetchUDP(const std::string& buffer, const boost::asio::ip::udp::endpoint endpoint)
252     {
253         unsigned char header;
254                 uint32_t user_id;
255         unsigned char count;
256         std::string body;
257         SessionPtr session;
258
259                 size_t readed = 0;
260         if (buffer.size() > network::Utils::Deserialize(buffer, &header)) {
261                         readed = network::Utils::Deserialize(buffer, &user_id, &count);
262                 }
263
264                 // 現在コマンドがひとつしか無いのでそれ以外は無視
265                 if (header != network::header::ServerRequstedStatus) {
266                         return;
267                 }
268
269         if (readed < buffer.size()) {
270             body = buffer.substr(readed);
271         }
272
273         if (callback_) {
274                         (*callback_)(Command(static_cast<network::header::CommandHeader>(header), body, endpoint));
275         }
276     }
277
278     void Server::ServerSession::Start()
279     {
280         online_ = true;
281
282         // Nagleアルゴリズムを無効化
283         socket_tcp_.set_option(boost::asio::ip::tcp::no_delay(true));
284
285                 // バッファサイズを変更 1MiB
286                 boost::asio::socket_base::receive_buffer_size option(1048576);
287                 socket_tcp_.set_option(option);
288
289         // IPアドレスを取得
290         global_ip_ = socket_tcp_.remote_endpoint().address().to_string();
291
292         boost::asio::async_read_until(socket_tcp_,
293             receive_buf_, NETWORK_UTILS_DELIMITOR,
294             boost::bind(
295               &ServerSession::ReceiveTCP, shared_from_this(),
296               boost::asio::placeholders::error));
297     }
298
299     int Server::GetSessionReadAverageLimit()
300     {
301         int byte = max_total_read_average_ / (sessions_.size() + 1);
302         byte = std::min(byte, max_session_read_average_);
303         
304         return byte;
305     }
306
307     int Server::max_total_read_average() const
308     {
309         return max_total_read_average_;
310     }
311
312     int Server::max_session_read_average() const
313     {
314         return max_session_read_average_;
315     }
316
317     int Server::min_session_read_average() const
318     {
319         return min_session_read_average_;
320     }
321
322     void Server::set_max_total_read_average(int byte)
323     {
324         max_total_read_average_ = byte;
325     }
326
327     void Server::set_max_session_read_average(int byte)
328     {
329         max_session_read_average_ = byte;
330     }
331
332     void Server::set_min_session_read_average(int byte)
333     {
334         min_session_read_average_ = byte;
335     }
336
337 }