OSDN Git Service

サーバーに接続できなくなる問題の応急措置
[mmo/main.git] / server / Server.cpp
1 //
2 // Server.cpp
3 //
4
5 #include "Server.hpp"
6 #include <algorithm>
7 #include <boost/make_shared.hpp>
8 #include <boost/foreach.hpp>
9 #include "../common/Logger.hpp"
10 #include "../common/network/Command.hpp"
11 #include "../common/network/Utils.hpp"
12
13 namespace network {
14
15     Server::Server(uint16_t port) :
16             endpoint_(tcp::v4(), port),
17             acceptor_(io_service_, endpoint_),
18             socket_udp_(io_service_, udp::endpoint(udp::v4(), port)),
19             udp_packet_count_(0),
20             max_total_read_average_(5000),
21             max_session_read_average_(600),
22             min_session_read_average_(100),
23             session_read_average_(200)
24     {
25     }
26
27     void Server::Start(CallbackFuncPtr callback)
28     {
29         callback_ = std::make_shared<CallbackFunc>(
30                 [&](network::Command c){
31
32             // ログアウト
33             if (c.header() == network::header::FatalConnectionError) {
34                 // 受信制限量を更新
35                 /*
36                 auto new_average = GetSessionReadAverageLimit();
37                 if (session_read_average_ != new_average) {
38                     session_read_average_ = new_average;
39                     SendAll(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
40                 }
41                 */
42             }
43
44             // 通信量制限を越えていた場合、強制的に切断
45             else if (auto session = c.session().lock()) {
46                 if (session->GetReadByteAverage() > session_read_average_) {
47                                         Logger::Info(_T("Banished session: %d"), session->id());
48                     session->Close();
49                 }
50             }
51
52             if (callback) {
53                 (*callback)(c);
54             }
55         });
56
57         {
58         auto new_session = boost::make_shared<ServerSession>(io_service_);
59         acceptor_.async_accept(new_session->tcp_socket(),
60                               boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
61         }
62
63         {
64             socket_udp_.async_receive_from(
65                 boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
66                 boost::bind(&Server::ReceiveUDP, this,
67                   boost::asio::placeholders::error,
68                   boost::asio::placeholders::bytes_transferred));
69         }
70
71         boost::asio::io_service::work work(io_service_);
72         io_service_.run();
73     }
74
75     void Server::Stop()
76     {
77         io_service_.stop();
78         Logger::Info("stop server");
79     }
80     void Server::Stop(int innterrupt_type)
81     {
82         io_service_.stop();
83         Logger::Info(_T("stop server innterrupt_type=%d"),innterrupt_type);
84     }
85
86
87     bool Server::Empty() const
88     {
89         return sessions_.size() == 0;
90     }
91
92     void Server::ReceiveSession(const SessionPtr& session, const boost::system::error_code& error)
93     {
94         if (session_read_average_ > min_session_read_average_) {
95             session->set_on_receive(callback_);
96             session->Start();
97             sessions_.push_back(SessionWeakPtr(session));
98
99             // クライアント情報を要求
100             session->Send(ClientRequestedClientInfo());
101
102             // 受信制限量を更新
103             auto new_average = GetSessionReadAverageLimit();
104             session->Send(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
105
106             if (session_read_average_ != new_average) {
107                 session_read_average_ = new_average;
108                 SendOthers(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_),
109                         session);
110             }
111
112         } else {
113             Logger::Info("Refuse Session");
114             session->SyncSend(ClientReceiveServerCrowdedError());
115             session->Close();
116         }
117
118         auto new_session = boost::make_shared<ServerSession>(io_service_);
119          acceptor_.async_accept(new_session->tcp_socket(),
120                  boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
121
122         // 使用済のセッションのポインタを破棄
123         auto it = std::remove_if(sessions_.begin(), sessions_.end(),
124                 [](const SessionWeakPtr& ptr){
125             return ptr.expired();
126         });
127         sessions_.erase(it, sessions_.end());
128
129                 Logger::Info("Active sessoin: %d", sessions_.size() - 1);
130     }
131
132     void Server::SendAll(const Command& command)
133     {
134         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {
135             if (auto session = ptr.lock()) {
136                 session->Send(command);
137             }
138         }
139     }
140
141     void Server::SendOthers(const Command& command, SessionWeakPtr self_ptr)
142     {
143         BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {
144             if (auto session = ptr.lock()) {
145                 if (auto self = self_ptr.lock()) {
146                     if (*session != *self) {
147                         session->Send(command);
148                     }
149                 }
150             }
151         }
152     }
153
154     void Server::SendUDPTestPacket(const std::string& ip_address, uint16_t port)
155     {
156         using boost::asio::ip::udp;
157
158         std::stringstream port_str;
159         port_str << (int)port;
160
161         udp::resolver resolver(io_service_);
162         udp::resolver::query query(udp::v4(), ip_address.c_str(), port_str.str().c_str());
163         udp::resolver::iterator iterator = resolver.resolve(query);
164
165         static char request[] = "MMO UDP Test Packet";
166         for (int i = 0; i < UDP_TEST_PACKET_TIME; i++) {
167
168             io_service_.post(boost::bind(&Server::DoWriteUDP, this, request, *iterator));
169         }
170     }
171
172     void Server::ReceiveUDP(const boost::system::error_code& error, size_t bytes_recvd)
173     {
174         if (bytes_recvd > 0) {
175             std::string buffer(receive_buf_udp_, bytes_recvd);
176             FetchUDP(buffer);
177         }
178         if (!error) {
179           socket_udp_.async_receive_from(
180               boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
181               boost::bind(&Server::ReceiveUDP, this,
182                 boost::asio::placeholders::error,
183                 boost::asio::placeholders::bytes_transferred));
184         } else {
185             Logger::Error("%s", error.message());
186         }
187     }
188
189     void Server::DoWriteUDP(const std::string& msg, const udp::endpoint& endpoint)
190     {
191         boost::shared_ptr<std::string> s = 
192               boost::make_shared<std::string>(msg.data(), msg.size());
193
194         socket_udp_.async_send_to(
195             boost::asio::buffer(s->data(), s->size()), endpoint,
196             boost::bind(&Server::WriteUDP, this,
197               boost::asio::placeholders::error, s));
198     }
199
200     void Server::WriteUDP(const boost::system::error_code& error, boost::shared_ptr<std::string> holder)
201     {
202 //        if (!error) {
203 //            if (!send_queue_.empty()) {
204 //                  send_queue_.pop();
205 //                  if (!send_queue_.empty())
206 //                  {
207 //                    boost::asio::async_write(socket_tcp_,
208 //                        boost::asio::buffer(send_queue_.front().data(),
209 //                          send_queue_.front().size()),
210 //                        boost::bind(&Session::WriteTCP, this,
211 //                          boost::asio::placeholders::error));
212 //                  }
213 //            }
214 //        } else {
215 //            FatalError();
216 //        }
217     }
218
219     Command Server::FetchUDP(const std::string& buffer)
220     {
221         uint32_t user_id;
222         unsigned char count;
223         header::CommandHeader header;
224         std::string body;
225         SessionPtr session;
226
227         size_t readed = network::Utils::Deserialize(buffer, &user_id, &count, &header);
228         if (readed < buffer.size()) {
229             body = buffer.substr(readed);
230         }
231
232         return Command(header, body, session);
233     }
234
235     void Server::ServerSession::Start()
236     {
237         online_ = true;
238
239         // Nagleアルゴリズムを無効化
240         socket_tcp_.set_option(boost::asio::ip::tcp::no_delay(true));
241
242         // IPアドレスを取得
243         global_ip_ = socket_tcp_.remote_endpoint().address().to_string();
244
245         boost::asio::async_read_until(socket_tcp_,
246             receive_buf_, NETWORK_UTILS_DELIMITOR,
247             boost::bind(
248               &ServerSession::ReceiveTCP, shared_from_this(),
249               boost::asio::placeholders::error));
250     }
251
252     int Server::GetSessionReadAverageLimit()
253     {
254         int byte = max_total_read_average_ / (sessions_.size() + 1);
255         byte = std::min(byte, max_session_read_average_);
256         
257         return byte;
258     }
259
260     int Server::max_total_read_average() const
261     {
262         return max_total_read_average_;
263     }
264
265     int Server::max_session_read_average() const
266     {
267         return max_session_read_average_;
268     }
269
270     int Server::min_session_read_average() const
271     {
272         return min_session_read_average_;
273     }
274
275     void Server::set_max_total_read_average(int byte)
276     {
277         max_total_read_average_ = byte;
278     }
279
280     void Server::set_max_session_read_average(int byte)
281     {
282         max_session_read_average_ = byte;
283     }
284
285     void Server::set_min_session_read_average(int byte)
286     {
287         min_session_read_average_ = byte;
288     }
289
290 }