8 #include <boost/make_shared.hpp>
9 #include <boost/foreach.hpp>
10 #include "../common/Logger.hpp"
11 #include "../common/network/Command.hpp"
12 #include "../common/network/Utils.hpp"
16 Server::Server(const Config& config) :
18 endpoint_(tcp::v4(), config.port()),
19 acceptor_(io_service_, endpoint_),
20 socket_udp_(io_service_, udp::endpoint(udp::v4(), config.port())),
22 max_total_read_average_(5000),
23 max_session_read_average_(600),
24 min_session_read_average_(100),
25 session_read_average_(200)
29 void Server::Start(CallbackFuncPtr callback)
31 callback_ = std::make_shared<CallbackFunc>(
32 [&](network::Command c){
35 if (c.header() == network::header::FatalConnectionError) {
38 auto new_average = GetSessionReadAverageLimit();
39 if (session_read_average_ != new_average) {
40 session_read_average_ = new_average;
41 SendAll(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
46 // 通信量制限を越えていた場合、強制的に切断
47 // else if (auto session = c.session().lock()) {
48 // if (session->GetReadByteAverage() > session_read_average_) {
49 //Logger::Info(_T("Banished session: %d"), session->id());
60 auto new_session = boost::make_shared<ServerSession>(io_service_);
61 acceptor_.async_accept(new_session->tcp_socket(),
62 boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
66 socket_udp_.async_receive_from(
67 boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
68 boost::bind(&Server::ReceiveUDP, this,
69 boost::asio::placeholders::error,
70 boost::asio::placeholders::bytes_transferred));
73 boost::asio::io_service::work work(io_service_);
80 Logger::Info("stop server");
82 void Server::Stop(int innterrupt_type)
85 Logger::Info(_T("stop server innterrupt_type=%d"),innterrupt_type);
88 int Server::GetUserCount() const
90 auto count = std::count_if(sessions_.begin(), sessions_.end(),
91 [](const SessionWeakPtr& s){ return !s.expired() && s.lock()->online(); });
96 std::string Server::GetStatusJSON() const
99 boost::format("{\"nam\":\"%s\",\"ver\":\"%d.%d.%d\",\"cnt\":%d,\"cap\":%d,\"stg\":\"%s\"}")
100 % config_.server_name()
101 % MMO_VERSION_MAJOR % MMO_VERSION_MINOR % MMO_VERSION_REVISION
110 bool Server::Empty() const
112 return GetUserCount() == 0;
115 void Server::ReceiveSession(const SessionPtr& session, const boost::system::error_code& error)
117 if (GetUserCount() < config_.capacity()) {
118 session->set_on_receive(callback_);
120 sessions_.push_back(SessionWeakPtr(session));
123 session->Send(ClientRequestedClientInfo());
126 auto new_average = GetSessionReadAverageLimit();
127 session->Send(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_));
129 if (session_read_average_ != new_average) {
130 session_read_average_ = new_average;
131 SendOthers(network::ClientReceiveWriteAverageLimitUpdate(session_read_average_),
136 Logger::Info("Refuse Session");
137 session->SyncSend(ClientReceiveServerCrowdedError());
141 auto new_session = boost::make_shared<ServerSession>(io_service_);
142 acceptor_.async_accept(new_session->tcp_socket(),
143 boost::bind(&Server::ReceiveSession, this, new_session, boost::asio::placeholders::error));
148 void Server::RefreshSession()
151 auto it = std::remove_if(sessions_.begin(), sessions_.end(),
152 [](const SessionWeakPtr& ptr){
153 return ptr.expired();
155 sessions_.erase(it, sessions_.end());
156 Logger::Info("Active connection: %d", GetUserCount());
159 void Server::SendAll(const Command& command)
161 BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {
162 if (auto session = ptr.lock()) {
163 session->Send(command);
168 void Server::SendOthers(const Command& command, SessionWeakPtr self_ptr)
170 BOOST_FOREACH(SessionWeakPtr& ptr, sessions_) {
171 if (auto session = ptr.lock()) {
172 if (auto self = self_ptr.lock()) {
173 if (*session != *self) {
174 session->Send(command);
181 void Server::SendUDPTestPacket(const std::string& ip_address, uint16_t port)
183 using boost::asio::ip::udp;
185 std::stringstream port_str;
186 port_str << (int)port;
188 udp::resolver resolver(io_service_);
189 udp::resolver::query query(udp::v4(), ip_address.c_str(), port_str.str().c_str());
190 udp::resolver::iterator iterator = resolver.resolve(query);
192 static char request[] = "MMO UDP Test Packet";
193 for (int i = 0; i < UDP_TEST_PACKET_TIME; i++) {
195 io_service_.post(boost::bind(&Server::DoWriteUDP, this, request, *iterator));
199 void Server::SendUDP(const std::string& message, const boost::asio::ip::udp::endpoint endpoint)
201 io_service_.post(boost::bind(&Server::DoWriteUDP, this, message, endpoint));
204 void Server::ReceiveUDP(const boost::system::error_code& error, size_t bytes_recvd)
206 if (bytes_recvd > 0) {
207 std::string buffer(receive_buf_udp_, bytes_recvd);
208 FetchUDP(buffer, sender_endpoint_);
211 socket_udp_.async_receive_from(
212 boost::asio::buffer(receive_buf_udp_, UDP_MAX_RECEIVE_LENGTH), sender_endpoint_,
213 boost::bind(&Server::ReceiveUDP, this,
214 boost::asio::placeholders::error,
215 boost::asio::placeholders::bytes_transferred));
217 Logger::Error("%s", error.message());
221 void Server::DoWriteUDP(const std::string& msg, const udp::endpoint& endpoint)
223 boost::shared_ptr<std::string> s =
224 boost::make_shared<std::string>(msg.data(), msg.size());
226 socket_udp_.async_send_to(
227 boost::asio::buffer(s->data(), s->size()), endpoint,
228 boost::bind(&Server::WriteUDP, this,
229 boost::asio::placeholders::error, s));
232 void Server::WriteUDP(const boost::system::error_code& error, boost::shared_ptr<std::string> holder)
235 // if (!send_queue_.empty()) {
236 // send_queue_.pop();
237 // if (!send_queue_.empty())
239 // boost::asio::async_write(socket_tcp_,
240 // boost::asio::buffer(send_queue_.front().data(),
241 // send_queue_.front().size()),
242 // boost::bind(&Session::WriteTCP, this,
243 // boost::asio::placeholders::error));
251 void Server::FetchUDP(const std::string& buffer, const boost::asio::ip::udp::endpoint endpoint)
253 unsigned char header;
260 if (buffer.size() > network::Utils::Deserialize(buffer, &header)) {
261 readed = network::Utils::Deserialize(buffer, &user_id, &count);
264 // 現在コマンドがひとつしか無いのでそれ以外は無視
265 if (header != network::header::ServerRequstedStatus) {
269 if (readed < buffer.size()) {
270 body = buffer.substr(readed);
274 (*callback_)(Command(static_cast<network::header::CommandHeader>(header), body, endpoint));
278 void Server::ServerSession::Start()
283 socket_tcp_.set_option(boost::asio::ip::tcp::no_delay(true));
286 boost::asio::socket_base::receive_buffer_size option(1048576);
287 socket_tcp_.set_option(option);
290 global_ip_ = socket_tcp_.remote_endpoint().address().to_string();
292 boost::asio::async_read_until(socket_tcp_,
293 receive_buf_, NETWORK_UTILS_DELIMITOR,
295 &ServerSession::ReceiveTCP, shared_from_this(),
296 boost::asio::placeholders::error));
299 int Server::GetSessionReadAverageLimit()
301 int byte = max_total_read_average_ / (sessions_.size() + 1);
302 byte = std::min(byte, max_session_read_average_);
307 int Server::max_total_read_average() const
309 return max_total_read_average_;
312 int Server::max_session_read_average() const
314 return max_session_read_average_;
317 int Server::min_session_read_average() const
319 return min_session_read_average_;
322 void Server::set_max_total_read_average(int byte)
324 max_total_read_average_ = byte;
327 void Server::set_max_session_read_average(int byte)
329 max_session_read_average_ = byte;
332 void Server::set_min_session_read_average(int byte)
334 min_session_read_average_ = byte;