OSDN Git Service

First Commit
[mmo/main.git] / server / Server.cpp
1 //
2 // Server.cpp
3 //
4
5 #include "Server.hpp"
6 #include <algorithm>
7 #include <boost/make_shared.hpp>
8 #include "../common/Logger.hpp"
9 #include "../common/network/Command.hpp"
10 #include "../common/network/Utils.hpp"
11
12 namespace network {
13
14     Server::Server(uint16_t port) :
15             endpoint_(tcp::v4(), port),
16             acceptor_(io_service_, endpoint_),
17             socket_udp_(io_service_, udp::endpoint(udp::v4(), port)),
18             udp_packet_count_(0),
19             max_total_read_average_(5000),
20             max_session_read_average_(600),
21             min_session_read_average_(100),
22             session_read_average_(200)
23     {
24     }
25
26     void Server::Start(CallbackFuncPtr callback)
27     {
28         callback_ = std::make_shared<CallbackFunc>(
29                 [&](network::Command c){
30
31             // ログアウト
32             if (c.header() == network::header::FatalConnectionError) {
33                 // 受信制限量を更新
34                 /*
35                 auto new_average = GetSessionReadAverageLimit();
36                 if (session_read_average_ != new_average) {
37                     session_read_average_ = new_average;
38                     SendAll(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
39                 }
40                 */
41             }
42
43             // 通信量制限を越えていた場合、強制的に切断
44             else if (auto session = c.session().lock()) {
45                 if (session->GetReadByteAverage() > session_read_average_) {
46                     if (auto session = c.session().lock()) {
47                         if (callback) {
48                             (*callback)(network::PlayerLogoutNotify(session->id()));
49                         }
50                         return;
51                     }
52                     session->Close();
53                 }
54             }
55
56             if (callback) {
57                 (*callback)(c);
58             }
59         });
60
61         {
62         auto new_session = boost::make_shared<ServerSession>(io_service_);
63         acceptor_.async_accept(new_session->tcp_socket(),
64                               boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
65         }
66
67         {
68             socket_udp_.async_receive_from(
69                 boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
70                 boost::bind(&Server::ReceiveUDP, this,
71                   boost::asio::placeholders::error,
72                   boost::asio::placeholders::bytes_transferred));
73         }
74
75         boost::asio::io_service::work work(io_service_);
76         io_service_.run();
77     }
78
79     void Server::Stop()
80     {
81         io_service_.stop();
82     }
83
84     bool Server::Empty() const
85     {
86         return sessions_.size() == 0;
87     }
88
89     void Server::ReceiveSession(const SessionPtr& session, const boost::system::error_code& error)
90     {
91         if (session_read_average_ > min_session_read_average_) {
92             session->set_on_receive(callback_);
93             session->Start();
94             sessions_.push_back(SessionWeakPtr(session));
95
96             // クライアント情報を要求
97             session->Send(ClientRequestedClientInfo());
98
99             // 受信制限量を更新
100             auto new_average = GetSessionReadAverageLimit();
101             session->Send(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
102
103             if (session_read_average_ != new_average) {
104                 session_read_average_ = new_average;
105                 SendOthers(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_),
106                         session);
107             }
108
109         } else {
110             Logger::Info("Refuse Session");
111             session->SyncSend(ClientReceiveServerCrowdedError());
112             session->Close();
113         }
114
115         auto new_session = boost::make_shared<ServerSession>(io_service_);
116          acceptor_.async_accept(new_session->tcp_socket(),
117                  boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
118
119         // 使用済のセッションのポインタを破棄
120         auto it = std::remove_if(sessions_.begin(), sessions_.end(),
121                 [](const SessionWeakPtr& ptr){
122             return ptr.expired();
123         });
124         sessions_.erase(it, sessions_.end());
125
126     }
127
128     void Server::SendAll(const Command& command)
129     {
130         for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
131             SessionWeakPtr& ptr = *it;
132             if (auto session = ptr.lock()) {
133                 session->Send(command);
134             }
135         }
136     }
137
138     void Server::SendOthers(const Command& command, SessionWeakPtr self_ptr)
139     {
140         for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
141             SessionWeakPtr& ptr = *it;
142             if (auto session = ptr.lock()) {
143                 if (auto self = self_ptr.lock()) {
144                     if (*session != *self) {
145                         session->Send(command);
146                     }
147                 }
148             }
149         }
150     }
151
152     void Server::SendUDPTestPacket(const std::string& ip_address, uint16_t port)
153     {
154         using boost::asio::ip::udp;
155
156         std::stringstream port_str;
157         port_str << (int)port;
158
159         udp::resolver resolver(io_service_);
160         udp::resolver::query query(udp::v4(), ip_address.c_str(), port_str.str().c_str());
161         udp::resolver::iterator iterator = resolver.resolve(query);
162
163         static char request[] = "MMO UDP Test Packet";
164         for (int i = 0; i < UDP_TEST_PACKET_TIME; i++) {
165
166             io_service_.post(boost::bind(&Server::DoWriteUDP, this, request, *iterator));
167         }
168     }
169
170     void Server::ReceiveUDP(const boost::system::error_code& error, size_t bytes_recvd)
171     {
172         if (bytes_recvd > 0) {
173             std::string buffer(receive_buf_udp_, bytes_recvd);
174             FetchUDP(buffer);
175         }
176         if (!error) {
177           socket_udp_.async_receive_from(
178               boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
179               boost::bind(&Server::ReceiveUDP, this,
180                 boost::asio::placeholders::error,
181                 boost::asio::placeholders::bytes_transferred));
182         } else {
183             Logger::Error("%s", error.message());
184         }
185     }
186
187     void Server::DoWriteUDP(std::string data, const udp::endpoint& endpoint)
188     {
189         socket_udp_.async_send_to(
190             boost::asio::buffer(data), endpoint,
191             boost::bind(&Server::WriteUDP, this,
192               boost::asio::placeholders::error));
193     }
194
195     void Server::WriteUDP(const boost::system::error_code& error)
196     {
197 //        if (!error) {
198 //            if (!send_queue_.empty()) {
199 //                  send_queue_.pop();
200 //                  if (!send_queue_.empty())
201 //                  {
202 //                    boost::asio::async_write(socket_tcp_,
203 //                        boost::asio::buffer(send_queue_.front().data(),
204 //                          send_queue_.front().size()),
205 //                        boost::bind(&Session::WriteTCP, this,
206 //                          boost::asio::placeholders::error));
207 //                  }
208 //            }
209 //        } else {
210 //            FatalError();
211 //        }
212     }
213
214     Command Server::FetchUDP(const std::string& buffer)
215     {
216         uint32_t user_id;
217         unsigned char count;
218         header::CommandHeader header;
219         std::string body;
220         SessionPtr session;
221
222         size_t readed = network::Utils::Deserialize(buffer, &user_id, &count, &header);
223         if (readed < buffer.size()) {
224             body = buffer.substr(readed);
225         }
226
227         return Command(header, body, session);
228     }
229
230     void Server::ServerSession::Start()
231     {
232         online_ = true;
233
234         // Nagleアルゴリズムを無効化
235         socket_tcp_.set_option(boost::asio::ip::tcp::no_delay(true));
236
237         // IPアドレスを取得
238         global_ip_ = socket_tcp_.remote_endpoint().address().to_string();
239
240         boost::asio::async_read_until(socket_tcp_,
241             receive_buf_, NETWORK_UTILS_DELIMITOR,
242             boost::bind(
243               &ServerSession::ReceiveTCP, shared_from_this(),
244               boost::asio::placeholders::error));
245     }
246
247     int Server::GetSessionReadAverageLimit()
248     {
249         int byte = max_total_read_average_ / (sessions_.size() + 1);
250         byte = std::min(byte, max_session_read_average_);
251         
252         return byte;
253     }
254
255     int Server::max_total_read_average() const
256     {
257         return max_total_read_average_;
258     }
259
260     int Server::max_session_read_average() const
261     {
262         return max_session_read_average_;
263     }
264
265     int Server::min_session_read_average() const
266     {
267         return min_session_read_average_;
268     }
269
270     void Server::set_max_total_read_average(int byte)
271     {
272         max_total_read_average_ = byte;
273     }
274
275     void Server::set_max_session_read_average(int byte)
276     {
277         max_session_read_average_ = byte;
278     }
279
280     void Server::set_min_session_read_average(int byte)
281     {
282         min_session_read_average_ = byte;
283     }
284
285 }