Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
}
- boost::system::error_code ec;
- std::size_t recv_size;
- UP_THREAD_FUNC_TYPE_TAG func_tag = UP_FUNC_CLIENT_RECEIVE;
- struct epoll_event event;
- int ret_fds;
-
- up_thread_data_client_side.initialize();
- boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_client_side.get_data();
-
if (unlikely(0 < parent_service.get_wait_upstream())) {
//----Debug log----------------------------------------------------------------------
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__);
}
//----Debug log----------------------------------------------------------------------
- goto up_thread_client_receive_out;
+ return; // try again
+ }
+ up_thread_data_client_side.initialize();
+ boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_client_side.get_data();
+ boost::system::error_code ec;
+ std::size_t recv_size;
+ UP_THREAD_FUNC_TYPE_TAG func_tag;
+
+ struct epoll_event event;
+ event.data.fd = !ssl_flag ? client_socket.get_socket().native()
+ : client_ssl_socket.get_socket().lowest_layer().native();
+
+ // epoll wait codes
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLIN | EPOLLHUP | EPOLLET;
+ } else {
+ event.events = EPOLLIN | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!up_client_epollfd_registered) {
+ if (epoll_ctl(up_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_ADD error: ";
+ buf << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ boost::this_thread::yield();
+ return;
+ }
+ up_client_epollfd_registered = true;
+ add_flag = true;
+ }
+
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl(up_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_MOD error: ";
+ buf << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
+ return;
+ }
+ }
+ int ret_fds = epoll_wait(up_client_epollfd, up_client_events, EVENT_NUM, epoll_timeout);
+ if (ret_fds == 0) {
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ boost::format formatter("up_thread_client_receive: epoll_wait timeout %d msec");
+ formatter % epoll_timeout;
+ Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+ }
+ //----Debug log----------------------------------------------------------------------
+ up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_RECEIVE];
+ return;
+ } else if (ret_fds < 0) {
+ boost::format formatter("up_thread_client_receive: epoll_wait error: %s");
+ formatter % strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+ up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
+ return;
+ }
+
+ for (int i = 0; i < ret_fds; ++i) {
+ if (up_client_events[i].data.fd == event.data.fd) {
+ if (up_client_events[i].events & EPOLLIN) {
+ break;
+ }
+ if (up_client_events[i].events & EPOLLHUP) {
+ up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_receive EPOLLHUP: NEXT_FUNC[%s]");
+ formatter % boost::this_thread::get_id() % func_tag_to_string(UP_FUNC_CLIENT_DISCONNECT);
+ Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+ }
+ //----Debug log----------------------------------------------------------------------
+ return;
+ }
+ }
}
recv_size = !ssl_flag ? client_socket.read_some(
: client_ssl_socket.read_some(
boost::asio::buffer(data_buff, MAX_BUFFER_SIZE),
ec);
+
if (!ec) {
if (recv_size > 0) {
//----Debug log----------------------------------------------------------------------
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ endpoint client_endpoint; // XXX redefined???
+ client_endpoint = !ssl_flag ? client_socket.get_socket().lowest_layer().remote_endpoint(ec)
+ : client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
boost::format formatter("Thread ID[%d] up_thread_client_receive receive data size[%d] from [%d]");
formatter % boost::this_thread::get_id() % recv_size % client_endpoint;
Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 36, formatter.str(), __FILE__, __LINE__);
formatter % boost::this_thread::get_id() % module_event;
Logger::putLogError(LOG_CAT_L7VSD_SESSION, 20, formatter.str(), __FILE__, __LINE__);
up_thread_exit(process_type);
- func_tag = UP_FUNC_EXIT;
- } else {
- func_tag = func_type->second;
+ return;
}
- } else if (recv_size < 0) {
+ func_tag = func_type->second;
+ } else {
boost::format formatter("Thread ID[%d] client read error. recv_size: %d");
formatter % boost::this_thread::get_id() % recv_size;
Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+ func_tag = UP_FUNC_CLIENT_RECEIVE;
}
} else {
- if (ec == boost::asio::error::try_again) {
- // epoll wait codes
- event.data.fd = !ssl_flag ? client_socket.get_socket().native()
- : client_ssl_socket.get_socket().lowest_layer().native();
-
- if (!up_client_epollfd_registered) {
- if (is_epoll_edge_trigger) {
- event.events = EPOLLIN | EPOLLHUP | EPOLLET;
- } else {
- event.events = EPOLLIN | EPOLLHUP;
- }
- if ( (!ssl_flag && !client_socket.get_socket().is_open()) ||
- (ssl_flag && !client_ssl_socket.get_socket().lowest_layer().is_open()) ) {
- func_tag = UP_FUNC_CLIENT_DISCONNECT;
- goto up_thread_client_receive_out;
- }
- if (epoll_ctl(up_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
- std::stringstream buf;
- buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_ADD error: ";
- buf << strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
- func_tag = UP_FUNC_CLIENT_DISCONNECT;
- goto up_thread_client_receive_out;
- }
- up_client_epollfd_registered = true;
- }
- ret_fds = epoll_wait(up_client_epollfd, up_client_events, EVENT_NUM, epoll_timeout);
- if (ret_fds == 0) {
- goto up_thread_client_receive_out; // epoll timeout
- } else if (ret_fds < 0) {
- boost::format formatter("up_thread_client_receive: epoll_wait error: %s");
- formatter % strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
- func_tag = UP_FUNC_CLIENT_DISCONNECT;
- goto up_thread_client_receive_out;
- }
-
- for (int i = 0; i < ret_fds; ++i) {
- if (up_client_events[i].data.fd == event.data.fd) {
- if (up_client_events[i].events & EPOLLIN) {
- goto up_thread_client_receive_out;
- }
- if (up_client_events[i].events & EPOLLHUP) {
- func_tag = UP_FUNC_CLIENT_DISCONNECT;
- goto up_thread_client_receive_out;
- }
- }
- }
- boost::format formatter("up_thread_client_receive: unknown file descriptor's event: %s");
- formatter % strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
- func_tag = UP_FUNC_CLIENT_DISCONNECT;
- } else if (ec == boost::asio::error::eof) {
+ if (ec == boost::asio::error::eof) {
func_tag = UP_FUNC_CLIENT_DISCONNECT;
+ } else if (ec == boost::asio::error::try_again) {
+ func_tag = UP_FUNC_CLIENT_RECEIVE;
} else {
boost::format formatter("Thread ID[%d] client read error: %s");
formatter % boost::this_thread::get_id() % ec.message();
func_tag = UP_FUNC_CLIENT_DISCONNECT;
}
}
-
-up_thread_client_receive_out:
up_thread_next_call_function = up_thread_function_array[func_tag];
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
std::size_t data_size = up_thread_data_dest_side.get_size();
std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
std::size_t send_size;
- UP_THREAD_FUNC_TYPE_TAG func_tag = UP_FUNC_REALSERVER_SEND;
- int ret_fds;
+ UP_THREAD_FUNC_TYPE_TAG func_tag;
struct epoll_event event;
+ event.data.fd = send_socket->second->get_socket().native();
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
+ } else {
+ event.events = EPOLLOUT | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!up_realserver_epollfd_registered) {
+ if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_ADD error: ";
+ buf << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ boost::this_thread::yield();
+ return;
+ }
+ up_realserver_epollfd_registered = true;
+ add_flag = true;
+ }
+
+ {
+ rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+ if (unlikely(exit_flag)) {
+ up_thread_next_call_function = up_thread_function_array[UP_FUNC_EXIT];
+ return;
+ }
+ }
+
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_MOD error: ";
+ buf << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ boost::this_thread::yield();
+ return;
+ }
+ }
+
+ int ret_fds = epoll_wait(up_realserver_epollfd, up_realserver_events, EVENT_NUM, epoll_timeout);
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send: epoll_wait timeout " << epoll_timeout << " msec.";
+ Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ }
+ //----Debug log----------------------------------------------------------------------
+ } else {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send: epoll_wait error: " << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ }
+ //XXX no need to retry???
+ up_thread_next_call_function = up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT];
+ return;
+ }
+
+ for (int i = 0; i < ret_fds; ++i) {
+ if (up_realserver_events[i].data.fd == event.data.fd) {
+ if (up_realserver_events[i].events & EPOLLOUT) {
+ break;
+ } else if (up_realserver_events[i].events & EPOLLHUP) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send: epoll hung up event";
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ up_thread_next_call_function = up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT];
+ return;
+ }
+ }
+ }
+
send_size = send_socket->second->write_some(
boost::asio::buffer(
data_buff.data() + send_data_size,
Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 37, formatter.str(), __FILE__, __LINE__);
}
//----Debug log----------------------------------------------------------------------
- if (data_size <= send_data_size) {
+ if (data_size > send_data_size) {
+ func_tag = UP_FUNC_REALSERVER_SEND;
+ } else {
protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_send(up_thread_id);
std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
func_tag = func_type->second;
}
} else {
if (ec == boost::asio::error::try_again) {
- event.data.fd = send_socket->second->get_socket().native();
- if (!up_realserver_epollfd_registered) {
- if (is_epoll_edge_trigger) {
- event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
- } else {
- event.events = EPOLLOUT | EPOLLHUP;
- }
- if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
- std::stringstream buf;
- buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_ADD error: ";
- buf << strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
- goto up_thread_realserver_send_out;
- }
- up_realserver_epollfd_registered = true;
- }
- ret_fds = epoll_wait(up_realserver_epollfd, up_realserver_events, EVENT_NUM, epoll_timeout);
- if (ret_fds == 0) {
- goto up_thread_realserver_send_out; // epoll timeout
- } else if (ret_fds < 0) {
- std::stringstream buf;
- buf << "up_thread_realserver_send: epoll_wait error: " << strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
- func_tag = UP_FUNC_REALSERVER_DISCONNECT;
- goto up_thread_realserver_send_out;
- }
- for (int i = 0; i < ret_fds; ++i) {
- if (up_realserver_events[i].data.fd == event.data.fd) {
- if (up_realserver_events[i].events & EPOLLOUT) {
- goto up_thread_realserver_send_out;
- } else if (up_realserver_events[i].events & EPOLLHUP) {
- func_tag = UP_FUNC_REALSERVER_DISCONNECT;
- goto up_thread_realserver_send_out;
- }
- }
- }
- boost::format formatter("up_thread_realserver_send: unknown file descriptor's event: %s");
- formatter % strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
- func_tag = UP_FUNC_REALSERVER_DISCONNECT;
- } else if (ec == boost::asio::error::eof) {
- func_tag = UP_FUNC_REALSERVER_DISCONNECT;
+ func_tag = UP_FUNC_REALSERVER_SEND;
} else {
- boost::format formatter("Thread ID[%d] realserver write error: %s");
- formatter % boost::this_thread::get_id() % ec.message();
- Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
func_tag = UP_FUNC_REALSERVER_DISCONNECT;
}
}
-up_thread_realserver_send_out:
up_thread_next_call_function = up_thread_function_array[func_tag];
+
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_send: NEXT_FUNC[%s]");
formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
}
+
}
//! up thread raise module event of handle_realserver_select
Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
}
- down_thread_data_dest_side.initialize();
- boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
- boost::system::error_code ec;
- size_t recv_size;
- DOWN_THREAD_FUNC_TYPE_TAG func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
- struct epoll_event event;
- int ret_fds;
- endpoint server_endpoint;
- std::list<socket_element>::iterator list_end;
-
if (down_thread_receive_realserver_socket_list.empty()) {
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_receive");
if (!realserver_connect_status) {
to_time(LOCKTIMEOUT, xt);
realserver_connect_cond.timed_wait(lock, xt);
+ //realserver_connect_cond.wait(lock);
}
- goto down_thread_realserver_receive_no_socket_out;
+ down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE];
+ return;
}
if (unlikely(0 < parent_service.get_wait_downstream())) {
Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 42, buf.str(), __FILE__, __LINE__);
}
//----Debug log----------------------------------------------------------------------
- goto down_thread_realserver_receive_out;
+ return;
+ }
+ down_thread_data_dest_side.initialize();
+ boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
+ boost::system::error_code ec;
+ size_t recv_size;
+ DOWN_THREAD_FUNC_TYPE_TAG func_tag;
+
+ struct epoll_event event;
+ event.data.fd = down_thread_current_receive_realserver_socket->second->get_socket().native();
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLIN | EPOLLHUP | EPOLLET;
+ } else {
+ event.events = EPOLLIN | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!down_realserver_epollfd_registered) {
+ if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_ADD error: " << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ boost::this_thread::yield();
+ return;
+ }
+ down_realserver_epollfd_registered = true;
+ add_flag = true;
+ }
+
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_MOD error: " << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ boost::this_thread::yield();
+ return;
+ }
+ }
+
+ int ret_fds = epoll_wait(down_realserver_epollfd, down_realserver_events, EVENT_NUM, epoll_timeout);
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ boost::format formatter("down_thread_realserver_receive: epoll_wait timeout %d msec.");
+ formatter % epoll_timeout;
+ Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+ }
+ //----Debug log----------------------------------------------------------------------
+ down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE];
+ return;
+ } else {
+ boost::format formatter("down_thread_realserver_receive: epoll_wait error: %d");
+ formatter % strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+ down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT];
+ return;
+ }
}
- server_endpoint = down_thread_current_receive_realserver_socket->first;
+ for (int i = 0; i < ret_fds; ++i) {
+ if (down_realserver_events[i].data.fd == event.data.fd) {
+ if (down_realserver_events[i].events & EPOLLIN) {
+ break;
+ } else if (down_realserver_events[i].events & EPOLLHUP) {
+ down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT];
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_receive: EPOLL_HUP");
+ formatter % boost::this_thread::get_id();
+ }
+ return;
+ }
+ }
+ }
+
+ endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
down_thread_data_dest_side.set_endpoint(server_endpoint);
recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff, MAX_BUFFER_SIZE), ec);
std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
func_tag = func_type->second;
+ } else {
+ func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+ //boost::this_thread::yield();
}
} else {
if (ec == boost::asio::error::try_again) {
- // epoll wait
- event.data.fd = down_thread_current_receive_realserver_socket->second->get_socket().native();
- if (!down_realserver_epollfd_registered) {
- if (is_epoll_edge_trigger) {
- event.events = EPOLLIN | EPOLLHUP | EPOLLET;
- } else {
- event.events = EPOLLIN | EPOLLHUP;
- }
- if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
- std::stringstream buf;
- buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_ADD error: " << strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
- goto down_thread_realserver_receive_out;
- }
- down_realserver_epollfd_registered = true;
- }
- ret_fds = epoll_wait(down_realserver_epollfd, down_realserver_events, EVENT_NUM, epoll_timeout);
- if (ret_fds == 0) {
- goto down_thread_realserver_receive_out; // epoll timeout
- } else if (ret_fds < 0) {
- boost::format formatter("down_thread_realserver_receive: epoll_wait error: %d");
- formatter % strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
- func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
- goto down_thread_realserver_receive_out;
- }
- for (int i = 0; i < ret_fds; ++i) {
- if (down_realserver_events[i].data.fd == event.data.fd) {
- if (down_realserver_events[i].events & EPOLLIN) {
- goto down_thread_realserver_receive_out;
- } else if (down_realserver_events[i].events & EPOLLHUP) {
- func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
- goto down_thread_realserver_receive_out;
- }
- }
- }
- boost::format formatter("down_thread_realserver_receive: unknown file descriptor's event: %s");
- formatter % strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
- func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
- } else if (ec == boost::asio::error::eof) {
- func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
+ func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+ //boost::this_thread::yield();
} else {
- boost::format formatter("Thread ID[%d] realserver read error: %s");
- formatter % boost::this_thread::get_id() % ec.message();
- Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
}
}
-down_thread_realserver_receive_out:
+ down_thread_function_pair func = down_thread_function_array[func_tag];
down_thread_current_receive_realserver_socket++;
- list_end = down_thread_receive_realserver_socket_list.end();
+ std::list<socket_element>::iterator list_end = down_thread_receive_realserver_socket_list.end();
if (down_thread_current_receive_realserver_socket == list_end) {
down_thread_current_receive_realserver_socket = down_thread_receive_realserver_socket_list.begin();
}
-
-down_thread_realserver_receive_no_socket_out:
- down_thread_function_pair func = down_thread_function_array[func_tag];
down_thread_next_call_function = func;
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
std::size_t data_size = down_thread_data_client_side.get_size();
std::size_t send_data_size = down_thread_data_client_side.get_send_size();
std::size_t send_size;
- DOWN_THREAD_FUNC_TYPE_TAG func_tag = DOWN_FUNC_CLIENT_SEND;
+ DOWN_THREAD_FUNC_TYPE_TAG func_tag;
+
struct epoll_event event;
- int ret_fds;
+ event.data.fd = !ssl_flag ? client_socket.get_socket().native()
+ : client_ssl_socket.get_socket().lowest_layer().native();
- send_size = !ssl_flag ? client_socket.write_some(
- boost::asio::buffer(
- data_buff.data() + send_data_size,
- data_size - send_data_size),
- ec)
- : client_ssl_socket.write_some(
- boost::asio::buffer(
- data_buff.data() + send_data_size,
- data_size - send_data_size),
- ec);
- if (!ec) {
- send_data_size += send_size;
- down_thread_data_client_side.set_send_size(send_data_size);
- parent_service.update_down_send_size(send_size);
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
- boost::format formatter("Thread ID[%d] down_thread_client_send send data size[%d] for [%d]");
- formatter % boost::this_thread::get_id() % send_size % client_endpoint;
- Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 45, formatter.str(), __FILE__, __LINE__);
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
+ } else {
+ event.events = EPOLLOUT | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!down_client_epollfd_registered) {
+ if (epoll_ctl(down_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_ADD error: ";
+ buf << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ boost::this_thread::yield();
+ return;
}
- //----Debug log----------------------------------------------------------------------
- if (data_size > send_data_size) {
- goto down_thread_client_send_out;
- } else {
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
- std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
- func_tag = func_type->second;
- goto down_thread_client_send_out;
+ down_client_epollfd_registered = true;
+ add_flag = true;
+ }
+
+ while (true) {
+ {
+ rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+ if (unlikely(exit_flag)) {
+ func_tag = DOWN_FUNC_EXIT;
+ break;
+ }
}
- } else {
- if (ec == boost::asio::error::try_again) {
- event.data.fd = !ssl_flag ? client_socket.get_socket().native()
- : client_ssl_socket.get_socket().lowest_layer().native();
- if (!down_client_epollfd_registered) {
- if (is_epoll_edge_trigger) {
- event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
- } else {
- event.events = EPOLLOUT | EPOLLHUP;
- }
- if (epoll_ctl(down_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
- std::stringstream buf;
- buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_ADD error: ";
- buf << strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
- func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
- goto down_thread_client_send_out;
- }
- down_client_epollfd_registered = true;
+
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl(down_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+ std::stringstream buf;
+ buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_MOD error: ";
+ buf << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ boost::this_thread::yield();
+ return;
}
- ret_fds = epoll_wait(down_client_epollfd, down_client_events, EVENT_NUM, epoll_timeout);
+ }
+
+ int ret_fds = epoll_wait(down_client_epollfd, down_client_events, EVENT_NUM, epoll_timeout);
+ if (ret_fds <= 0) {
if (ret_fds == 0) {
- goto down_thread_client_send_out; // epoll timeout
- } else if (ret_fds < 0) {
- func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
- goto down_thread_client_send_out;
+ std::stringstream buf;
+ buf << "down_thread_client_send: epoll_wait timeout ";
+ buf << epoll_timeout;
+ buf << "mS";
+ Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+ } else {
+ std::stringstream buf;
+ buf << "down_thread_client_send: epoll_wait error: ";
+ buf << strerror(errno);
+ Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
}
- for (int i = 0; i < ret_fds; ++i) {
- if (down_client_events[i].data.fd == event.data.fd) {
- if (down_client_events[i].events & EPOLLOUT) {
- goto down_thread_client_send_out;
- } else if (down_client_events[i].events & EPOLLHUP) {
- func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
- goto down_thread_client_send_out;
- }
+ down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT];
+ return;
+ }
+
+ for (int i = 0; i < ret_fds; ++i) {
+ if (down_client_events[i].data.fd == event.data.fd) {
+ if (down_client_events[i].events & EPOLLOUT) {
+ break;
+ } else if (down_client_events[i].events & EPOLLHUP) {
+ down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT];
+ return;
}
}
- boost::format formatter("down_thread_client_send: unknown file descriptor's event: %s");
- formatter % strerror(errno);
- Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
- func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
- } else if (ec == boost::asio::error::eof) {
- func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+ }
+
+ send_size = !ssl_flag ? client_socket.write_some(
+ boost::asio::buffer(
+ data_buff.data() + send_data_size,
+ data_size - send_data_size),
+ ec)
+ : client_ssl_socket.write_some(
+ boost::asio::buffer(
+ data_buff.data() + send_data_size,
+ data_size - send_data_size),
+ ec);
+ if (!ec) {
+ send_data_size += send_size;
+ down_thread_data_client_side.set_send_size(send_data_size);
+ parent_service.update_down_send_size(send_size);
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ endpoint client_endpoint // XXX redefined???
+ = !ssl_flag ? client_socket.get_socket().lowest_layer().remote_endpoint(ec)
+ : client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
+ boost::format formatter("Thread ID[%d] down_thread_client_send send data size[%d] for [%d]");
+ formatter % boost::this_thread::get_id() % send_size % client_endpoint;
+ Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 45, formatter.str(), __FILE__, __LINE__);
+ }
+ //----Debug log----------------------------------------------------------------------
+ if (data_size > send_data_size) {
+ func_tag = DOWN_FUNC_CLIENT_SEND;
+ //down_send_wait.reset();
+ } else {
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
+ std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
+ func_tag = func_type->second;
+ }
+ break;
} else {
- boost::format formatter("Thread ID[%d] client write error: %s");
- formatter % boost::this_thread::get_id() % ec.message();
- Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
- func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+ if (ec == boost::asio::error::try_again) {
+ //func_tag = DOWN_FUNC_CLIENT_SEND;
+ //boost::this_thread::yield();
+ } else {
+ func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+ break;
+ }
}
}
-down_thread_client_send_out:
down_thread_next_call_function = down_thread_function_array[func_tag];
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {