protocol_module(NULL),
session_pause_flag(false),
client_socket(session_io,set_option),
- upstream_buffer_size(8192),
- downstream_buffer_size(8192),
+ upstream_buffer_size(-1),
+ downstream_buffer_size(-1),
virtualservice_endpoint(listen_endpoint),
access_log_flag(false),
access_logger(set_access_logger),
add_down_thread_vs_message_func.second = boost::bind(&tcp_session::down_thread_exit,this,_1);
virtual_service_message_down_thread_function_map.insert(add_down_thread_vs_message_func);
- if( ( up_cl_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl_epoll_create", __FILE__, __LINE__ );
- return;
- }
- if( ( up_rs_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs_epoll_create", __FILE__, __LINE__ );
- return;
- }
- if( ( down_cl_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl_epoll_create", __FILE__, __LINE__ );
- return;
- }
- if( ( down_rs_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs_epoll_create", __FILE__, __LINE__ );
- return;
- }
-
+ // epoll impliment
+ up_client_epollfd = epoll_create( EVENT_NUM );
+ up_realserver_epollfd = epoll_create( EVENT_NUM );
+ down_client_epollfd = epoll_create( EVENT_NUM );
+ down_realserver_epollfd = epoll_create( EVENT_NUM );
}
//! destructor
tcp_session::~tcp_session(){
else
break;
}
+ close( up_client_epollfd );
+ close( up_realserver_epollfd );
+ close( down_client_epollfd );
+ close( down_realserver_epollfd );
}
//! initialize
session_result_message tcp_session::initialize(){
int_val = param.get_int( PARAM_COMP_SESSION, PARAM_UP_BUFFER_SIZE, vs_err );
if((likely( !vs_err )) && (0 < int_val)){
upstream_buffer_size = int_val;
- }else{
- Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 1, "up buffer param error set default 8192" , __FILE__, __LINE__ );
}
int_val = param.get_int( PARAM_COMP_SESSION, PARAM_DOWN_BUFFER_SIZE, vs_err );
if((likely( !vs_err )) && (0 < int_val)){
downstream_buffer_size = int_val;
- }else{
- Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 2, "down buffer param error set default 8192" , __FILE__, __LINE__ );
}
protocol_module = parent_service.get_protocol_module();
// Reset SSL structure to allow another connection.
if ( ssl_flag ) {
- if ( ssl_cache_flag ) {
- if (unlikely(ssl_clear_keep_cache(client_ssl_socket.get_socket().impl()->ssl) == false)) {
- //Error ssl_clear_keep_cache
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] ssl_clear_keep_cache failed";
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 110, buf.str(), __FILE__, __LINE__ );
- msg.flag = true;
- msg.message = "ssl_clear_keep_cache failed";
- } else {
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ if(client_ssl_socket.is_handshake_error()){
+ // remake socket
+ client_ssl_socket.clear_socket();
+ }else{
+ if ( ssl_cache_flag ) {
+ if (unlikely(ssl_clear_keep_cache(client_ssl_socket.get_socket().impl()->ssl) == false)) {
+ //Error ssl_clear_keep_cache
std::stringstream buf;
buf << "Thread ID[";
buf << boost::this_thread::get_id();
- buf << "] ssl_clear_keep_cache ok";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 81,
- buf.str(),
- __FILE__, __LINE__ );
+ buf << "] ssl_clear_keep_cache failed";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 110, buf.str(), __FILE__, __LINE__ );
+ msg.flag = true;
+ msg.message = "ssl_clear_keep_cache failed";
+ } else {
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] ssl_clear_keep_cache ok";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 81,
+ buf.str(),
+ __FILE__, __LINE__ );
+ }
+ //----Debug log----------------------------------------------------------------------
}
- //----Debug log----------------------------------------------------------------------
- }
- } else {
- if (unlikely(SSL_clear(client_ssl_socket.get_socket().impl()->ssl) == 0)) {
- //Error SSL_clear
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] SSL_clear failed";
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 111, buf.str(), __FILE__, __LINE__ );
- msg.flag = true;
- msg.message = "SSL_clear failed";
- }else{
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ } else {
+ if (unlikely(SSL_clear(client_ssl_socket.get_socket().impl()->ssl) == 0)) {
+ //Error SSL_clear
std::stringstream buf;
buf << "Thread ID[";
buf << boost::this_thread::get_id();
- buf << "] SSL_clear ok";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 82,
- buf.str(),
- __FILE__, __LINE__ );
+ buf << "] SSL_clear failed";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 111, buf.str(), __FILE__, __LINE__ );
+ msg.flag = true;
+ msg.message = "SSL_clear failed";
+ }else{
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] SSL_clear ok";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 82,
+ buf.str(),
+ __FILE__, __LINE__ );
+ }
+ //----Debug log----------------------------------------------------------------------
}
- //----Debug log----------------------------------------------------------------------
}
}
}
Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 18, buf.str(), __FILE__, __LINE__ );
}
//----Debug log----------------------------------------------------------------------
- adaptive_wait down_thread_wait( 1, 128 );
while(true){
//wait down thread get id
{
break;
}
}
- //boost::this_thread::yield();
- down_thread_wait.wait();
+ boost::this_thread::yield();
}
- down_thread_wait.reset();
-
//----Debug log----------------------------------------------------------------------
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
std::stringstream buf;
}
if(likely( !is_exit )){
//set client_socket options(recieve buffer size)
- boost::asio::socket_base::receive_buffer_size opt1( upstream_buffer_size );
- if (!ssl_flag) {
- client_socket.get_socket().lowest_layer().set_option( opt1 ,ec);
- } else {
- client_ssl_socket.get_socket().lowest_layer().set_option( opt1 ,ec);
- }
- if(unlikely( ec )){
- //client socket Error!
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] client socket recieve buffer size error : ";
- buf << ec.message();
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 12, buf.str(), __FILE__, __LINE__ );
- {
- rw_scoped_lock scoped_lock(exit_flag_update_mutex);
- exit_flag = true;
+ if (upstream_buffer_size > 0) {
+ boost::asio::socket_base::receive_buffer_size opt1(upstream_buffer_size);
+ if (!ssl_flag) {
+ client_socket.get_socket().lowest_layer().set_option(opt1, ec);
+ } else {
+ client_ssl_socket.get_socket().lowest_layer().set_option(opt1, ec);
+ }
+ if(unlikely( ec )){
+ //client socket Error!
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] client socket recieve buffer size error : ";
+ buf << ec.message();
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 12, buf.str(), __FILE__, __LINE__ );
+ {
+ rw_scoped_lock scoped_lock(exit_flag_update_mutex);
+ exit_flag = true;
+ }
}
}
}
}
if(likely( !is_exit )){
//set client_socket options(send buffer size)
- boost::asio::socket_base::send_buffer_size opt2( downstream_buffer_size );
- if (!ssl_flag) {
- client_socket.get_socket().lowest_layer().set_option( opt2 ,ec);
- } else {
- client_ssl_socket.get_socket().lowest_layer().set_option( opt2 ,ec);
- }
- if(unlikely( ec )){
- //client socket Error!
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] client socket send buffer size error : ";
- buf << ec.message();
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 13, buf.str(), __FILE__, __LINE__ );
- {
- rw_scoped_lock scoped_lock(exit_flag_update_mutex);
- exit_flag = true;
+ if (downstream_buffer_size > 0) {
+ boost::asio::socket_base::send_buffer_size opt2(downstream_buffer_size);
+ if (!ssl_flag) {
+ client_socket.get_socket().lowest_layer().set_option(opt2, ec);
+ } else {
+ client_ssl_socket.get_socket().lowest_layer().set_option(opt2, ec);
+ }
+ if(unlikely( ec )){
+ //client socket Error!
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] client socket send buffer size error : ";
+ buf << ec.message();
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 13, buf.str(), __FILE__, __LINE__ );
+ {
+ rw_scoped_lock scoped_lock(exit_flag_update_mutex);
+ exit_flag = true;
+ }
}
- }
- }
-
- // epoll_register(up, cl)
- int clientfd = client_socket.get_socket().lowest_layer().native();
- epoll_event event;
- memset( &event, 0, sizeof(epoll_event) );
- event.events = EPOLLIN | EPOLLET;
- event.data.fd = clientfd;
- if ( epoll_ctl( up_cl_epollfd, EPOLL_CTL_ADD, clientfd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl epoll_ctl add eror", __FILE__, __LINE__ );
- {
- rw_scoped_lock scoped_lock( exit_flag_update_mutex );
- exit_flag = true;
}
}
up_thread_next_call_function.second(LOCAL_PROC);
}
- //boost::this_thread::yield();
+ boost::this_thread::yield();
}
-
//----Debug log----------------------------------------------------------------------
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
std::stringstream buf;
Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 22, buf.str(), __FILE__, __LINE__ );
}
//----Debug log----------------------------------------------------------------------
- adaptive_wait down_thread_alive_wait( 1, 128 );
while(true){
// wait down thread alive
{
break;
}
}
- //boost::this_thread::yield();
- down_thread_alive_wait.wait();
+ boost::this_thread::yield();
}
- down_thread_alive_wait.reset();
-
//----Debug log----------------------------------------------------------------------
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
std::stringstream buf;
Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 28, buf.str(), __FILE__, __LINE__ );
}
//----Debug log----------------------------------------------------------------------
- adaptive_wait up_thread_active_wait( 1, 128 );
while(true){ // UP_THREAD_ACTIVE
// wait up thread active
{
rd_scoped_lock scoped_lock(exit_flag_update_mutex);
if(unlikely( exit_flag )) break;
}
- //boost::this_thread::yield();
- up_thread_active_wait.wait();
+ boost::this_thread::yield();
}
- up_thread_active_wait.reset();
-
//----Debug log----------------------------------------------------------------------
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
std::stringstream buf;
exit_flag = true;
}
}
-
- // epoll_register(down, cl)
- int clientfd = client_socket.get_socket().lowest_layer().native();
- epoll_event event;
- memset( &event, 0, sizeof(epoll_event) );
- event.events = EPOLLOUT | EPOLLET;
- event.data.fd = clientfd;
- if ( epoll_ctl( down_cl_epollfd, EPOLL_CTL_ADD, clientfd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl epoll_ctl add eror", __FILE__, __LINE__ );
- {
- rw_scoped_lock scoped_lock( exit_flag_update_mutex );
- exit_flag = true;
- }
- }
-
bool is_pause;
while(true){
{
}else{
down_thread_next_call_function.second(LOCAL_PROC);
}
- //boost::this_thread::yield();
+ boost::this_thread::yield();
}
-
//----Debug log----------------------------------------------------------------------
if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
std::stringstream buf;
std::string tcp_session::endpoint_to_string(
const boost::asio::ip::tcp::endpoint& target_endpoint){
std::stringstream ret;
- if( target_endpoint.address().is_v6() ){
+ if( target_endpoint.address().is_v6() ){
ret << "[" << target_endpoint.address().to_string() << "]:" << target_endpoint.port();
}else{
ret << target_endpoint.address().to_string() << ":" << target_endpoint.port();
// handshake fail
if(ec == boost::asio::error::try_again){
func_tag = UP_FUNC_CLIENT_ACCEPT;
- //boost::this_thread::yield();
+ boost::this_thread::yield();
}else{
//ERROR
ssl_handshake_timer->cancel();
boost::system::error_code ec;
std::size_t recv_size;
UP_THREAD_FUNC_TYPE_TAG func_tag;
- adaptive_wait up_recv_wait( 1, 128 );
+
+ struct epoll_event event;
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ if ( !ssl_flag ) {
+ epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+ } else {
+ epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ }
+
while( true ){
{
rd_scoped_lock scoped_lock(exit_flag_update_mutex);
break;
}
}
- // epoll_wait(up,cl)
- epoll_event events[ MAXEVENTS ];
- int nfd = epoll_wait( up_cl_epollfd, events, MAXEVENTS, EPTIMEOUT);
- if( nfd < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl epoll_wait err", __FILE__, __LINE__ );
- func_tag = UP_FUNC_EXIT;
- break;
- }
- else if( nfd > 0 ){
- epoll_event event;
- event.events = EPOLLIN | EPOLLET;
- event.data.fd = events[0].data.fd;
- if( epoll_ctl( up_cl_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl epoll_ctl err", __FILE__, __LINE__ );
- func_tag = UP_FUNC_EXIT;
- }
- up_recv_wait.reset();
- break;
- }
- up_recv_wait.wait();
- }
- if( UP_FUNC_EXIT != func_tag ){
+
if (!ssl_flag) {
recv_size = client_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
} else {
recv_size = client_ssl_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
}
+
if(!ec){
if(recv_size > 0){
//----Debug log----------------------------------------------------------------------
return;
}
func_tag = func_type->second;
- //up_recv_wait.reset();
- //break;
+ break;
}else{
- func_tag = UP_FUNC_CLIENT_RECEIVE;
+ //func_tag = UP_FUNC_CLIENT_RECEIVE;
//boost::this_thread::yield();
- //up_recv_wait.wait();
}
}else{
if(ec == boost::asio::error::try_again){
- func_tag = UP_FUNC_CLIENT_RECEIVE;
+ //func_tag = UP_FUNC_CLIENT_RECEIVE;
//boost::this_thread::yield();
- //up_recv_wait.wait();
}else{
func_tag = UP_FUNC_CLIENT_DISCONNECT;
- //up_recv_wait.reset();
- //break;
+ break;
}
}
+
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ if ( !ssl_flag ) {
+ epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
+ } else {
+ epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ }
+
+ epoll_wait( up_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
+ }
+
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ if ( !ssl_flag ) {
+ epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
+ } else {
+ epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
}
up_thread_function_pair func = up_thread_function_array[func_tag];
std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
std::size_t send_size;
UP_THREAD_FUNC_TYPE_TAG func_tag;
- adaptive_wait up_send_wait( 1, 128 );
+
+ struct epoll_event event;
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, send_socket->second->get_socket().native(), &event );
+
while( true ){
{
rd_scoped_lock scoped_lock(exit_flag_update_mutex);
}
}
- // epoll_wait(up,rs)
- epoll_event events[ MAXEVENTS ];
- int nfd = epoll_wait( up_rs_epollfd, events, MAXEVENTS, EPTIMEOUT);
- if( nfd < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs epoll_wait err", __FILE__, __LINE__ );
- func_tag = UP_FUNC_EXIT;
- break;
- }
- else if( nfd > 0 ){
- epoll_event event;
- event.events = EPOLLOUT | EPOLLET;
- event.data.fd = events[0].data.fd;
- if( epoll_ctl( up_rs_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs epoll_ctl err", __FILE__, __LINE__ );
- func_tag = UP_FUNC_EXIT;
- }
- up_send_wait.reset();
- break;
- }
- up_send_wait.wait();
- }
-
- if( UP_FUNC_EXIT != func_tag ){
send_size = send_socket->second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+
if( !ec ){
send_data_size += send_size;
up_thread_data_dest_side.set_send_size(send_data_size);
}
func_tag = func_type->second;
}
- //up_send_wait.reset();
- //break;
+ break;
}else{
if(ec == boost::asio::error::try_again){
- func_tag = UP_FUNC_REALSERVER_SEND;
+ //func_tag = UP_FUNC_REALSERVER_SEND;
//boost::this_thread::yield();
- //up_send_wait.wait();
}else{
func_tag = UP_FUNC_REALSERVER_DISCONNECT;
- //up_send_wait.reset();
- //break;
+ break;
}
}
+
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, send_socket->second->get_socket().native(), &event );
+
+ epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
+
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, send_socket->second->get_socket().native(), &event );
+
up_thread_function_pair func = up_thread_function_array[func_tag];
if( unlikely( !func.second ) ){
//Error not find function map
}
//set realserver_socket options(recieve buffer size)
- boost::asio::socket_base::receive_buffer_size opt1( downstream_buffer_size );
- new_socket->get_socket().set_option(opt1 , ec);
- if(unlikely( ec )){
- // socket set nonblocking mode error
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] realserver socket recieve buffer size error : ";
- buf << ec.message();
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__ );
- up_thread_exit(process_type);
- return;
+ if (downstream_buffer_size > 0) {
+ boost::asio::socket_base::receive_buffer_size opt1(downstream_buffer_size);
+ new_socket->get_socket().set_option(opt1, ec);
+ if(unlikely( ec )){
+ // socket set nonblocking mode error
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] realserver socket recieve buffer size error : ";
+ buf << ec.message();
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__ );
+ up_thread_exit(process_type);
+ return;
+ }
}
//set realserver_socket options(send buffer size)
- boost::asio::socket_base::send_buffer_size opt2( upstream_buffer_size );
- new_socket->get_socket().set_option(opt2 , ec);
- if(unlikely( ec )){
- // socket set nonblocking mode error
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] realserver socket send buffer size error : ";
- buf << ec.message();
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
- up_thread_exit(process_type);
- return;
- }
-
- // epoll_register(up, rs)
- int realserverfd = new_socket->get_socket().lowest_layer().native();
- epoll_event event;
- memset( &event, 0, sizeof(epoll_event) );
- event.events = EPOLLOUT | EPOLLET;
- event.data.fd = realserverfd;
- if ( epoll_ctl( up_rs_epollfd, EPOLL_CTL_ADD, realserverfd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs epoll_ctl add eror", __FILE__, __LINE__ );
- up_thread_exit(process_type);
- return;
- }
-
- // epoll_register(down, rs)
- memset( &event, 0, sizeof(epoll_event) );
- event.events = EPOLLIN | EPOLLET;
- event.data.fd = realserverfd;
- if ( epoll_ctl( down_rs_epollfd, EPOLL_CTL_ADD, realserverfd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs epoll_ctl add eror", __FILE__, __LINE__ );
- up_thread_exit(process_type);
- return;
+ if (upstream_buffer_size > 0) {
+ boost::asio::socket_base::send_buffer_size opt2(upstream_buffer_size);
+ new_socket->get_socket().set_option(opt2, ec);
+ if(unlikely( ec )){
+ // socket set nonblocking mode error
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] realserver socket send buffer size error : ";
+ buf << ec.message();
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
+ up_thread_exit(process_type);
+ return;
+ }
}
-
socket_element push_element;
push_element.first = server_endpoint;
push_element.second = new_socket;
std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
std::size_t send_size;
UP_THREAD_FUNC_TYPE_TAG func_tag;
- adaptive_wait sorry_send_wait( 1, 128 );
+
+ struct epoll_event event;
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, sorryserver_socket.second->get_socket().native(), &event );
+
while( true ){
{
rd_scoped_lock scoped_lock(exit_flag_update_mutex);
break;
}
}
+
send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+
if(!ec){
send_data_size += send_size;
up_thread_data_dest_side.set_send_size(send_data_size);
}
func_tag = func_type->second;
}
- sorry_send_wait.reset();
break;
}else{
if(ec == boost::asio::error::try_again){
//func_tag = UP_FUNC_SORRYSERVER_SEND;
- sorry_send_wait.wait();
+ //boost::this_thread::yield();
}else{
func_tag = UP_FUNC_SORRYSERVER_DISCONNECT;
- sorry_send_wait.reset();
break;
}
}
+
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, sorryserver_socket.second->get_socket().native(), &event );
+
+ epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
+
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, sorryserver_socket.second->get_socket().native(), &event );
+
up_thread_function_pair func = up_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map
boost::system::error_code ec;
size_t recv_size;
DOWN_THREAD_FUNC_TYPE_TAG func_tag;
- adaptive_wait down_recv_wait( 1, 128 );
+
+ struct epoll_event event;
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ epoll_ctl( down_realserver_epollfd, EPOLL_CTL_ADD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
while( true ){
{
rd_scoped_lock scoped_lock(exit_flag_update_mutex);
break;
}
}
- // epoll_wait(down,rs)
- epoll_event events[ MAXEVENTS ];
- int nfd = epoll_wait( down_rs_epollfd, events, MAXEVENTS, EPTIMEOUT);
- if( nfd < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs epoll_wait err", __FILE__, __LINE__ );
- func_tag = DOWN_FUNC_EXIT;
- break;
- }
- else if( nfd > 0 ){
- epoll_event event;
- event.events = EPOLLIN | EPOLLET;
- event.data.fd = events[0].data.fd;
- if( epoll_ctl( down_rs_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs epoll_ctl err", __FILE__, __LINE__ );
- func_tag = DOWN_FUNC_EXIT;
- }
- down_recv_wait.reset();
- break;
- }
- down_recv_wait.wait();
- }
- if( DOWN_FUNC_EXIT != func_tag ){
+
recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
-
+
boost::asio::ip::tcp::endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
-
down_thread_data_dest_side.set_endpoint(server_endpoint);
if(!ec){
if(recv_size > 0){
buf << server_endpoint;
buf << "]";
Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 43, buf.str(), __FILE__, __LINE__ );
- }
+ }
//----Debug log----------------------------------------------------------------------
down_thread_data_dest_side.set_size(recv_size);
parent_service.update_down_recv_size(recv_size);
return;
}
func_tag = func_type->second;
- down_recv_wait.reset();
- //break;
+ break;
}else{
- func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+ //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
//boost::this_thread::yield();
- //down_recv_wait.wait();
}
}else{
if(ec == boost::asio::error::try_again){
- func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+ //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
//boost::this_thread::yield();
- //down_recv_wait.wait();
}else{
func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
- //down_recv_wait.reset();
- //break;
+ break;
}
}
+
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ epoll_ctl( down_realserver_epollfd, EPOLL_CTL_MOD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
+ epoll_wait( down_realserver_epollfd, down_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ epoll_ctl( down_realserver_epollfd, EPOLL_CTL_DEL, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
down_thread_function_pair func = down_thread_function_array[func_tag];
if(unlikely( !func.second ) ){
//Error not find function map
//! down thread send for client and raise module event of handle_client_send
//! @param[in] process_type is prosecess type
void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_type){
-
boost::system::error_code ec;
boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_client_side.get_data();
std::size_t data_size = down_thread_data_client_side.get_size();
std::size_t send_data_size = down_thread_data_client_side.get_send_size();
std::size_t send_size;
DOWN_THREAD_FUNC_TYPE_TAG func_tag;
- adaptive_wait down_send_wait( 1, 128 );
+
+ struct epoll_event event;
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ if ( !ssl_flag ) {
+ epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+ } else {
+ epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ }
+
while( true ){
{
rd_scoped_lock scoped_lock(exit_flag_update_mutex);
break;
}
}
- // epoll_wait(down,cl)
- epoll_event events[ MAXEVENTS ];
- int nfd = epoll_wait( down_cl_epollfd, events, MAXEVENTS, EPTIMEOUT);
- if( nfd < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl epoll_wait err", __FILE__, __LINE__ );
- func_tag = DOWN_FUNC_EXIT;
- break;
- }
- else if( nfd > 0 ){
- epoll_event event;
- event.events = EPOLLOUT | EPOLLET;
- event.data.fd = events[0].data.fd;
- if( epoll_ctl( down_cl_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl epoll_ctl err", __FILE__, __LINE__ );
- func_tag = DOWN_FUNC_EXIT;
- }
- down_send_wait.reset();
- break;
- }
- down_send_wait.wait();
- }
- if( DOWN_FUNC_EXIT != func_tag ){
+
if (!ssl_flag) {
send_size = client_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
} else {
send_size = client_ssl_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
}
+
if(!ec){
send_data_size += send_size;
down_thread_data_client_side.set_send_size(send_data_size);
}
func_tag = func_type->second;
}
- down_send_wait.reset();
- //break;
+ break;
}else{
if(ec == boost::asio::error::try_again){
- func_tag = DOWN_FUNC_CLIENT_SEND;
+ //func_tag = DOWN_FUNC_CLIENT_SEND;
//boost::this_thread::yield();
- //down_send_wait.wait();
}else{
func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
- //down_send_wait.reset();
- //break;
+ break;
}
}
+
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ if ( !ssl_flag ) {
+ epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
+ } else {
+ epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ }
+
+ epoll_wait( down_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
+ }
+
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ if ( !ssl_flag ) {
+ epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
+ } else {
+ epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
}
+
down_thread_function_pair func = down_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map
down_thread_data_dest_side.initialize();
boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
boost::system::error_code ec;
- size_t recv_size;
+ size_t recv_size = sorryserver_socket.second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
+ boost::asio::ip::tcp::endpoint sorry_endpoint = sorryserver_socket.first;
+ down_thread_data_dest_side.set_endpoint(sorry_endpoint);
DOWN_THREAD_FUNC_TYPE_TAG func_tag;
- adaptive_wait sorry_recv_wait( 1, 128 );
- while( true ){
- {
- rd_scoped_lock scoped_lock(exit_flag_update_mutex);
- if(unlikely(exit_flag)){
- func_tag = DOWN_FUNC_EXIT;
- break;
+ if(!ec){
+ if(recv_size > 0){
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] down_thread_sorryserver_receive";
+ buf << " receive data size[";
+ buf << recv_size;
+ buf << "] from [";
+ buf << sorry_endpoint;
+ buf << "]";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 46, buf.str(), __FILE__, __LINE__ );
}
- }
- recv_size = sorryserver_socket.second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
- boost::asio::ip::tcp::endpoint sorry_endpoint = sorryserver_socket.first;
- down_thread_data_dest_side.set_endpoint(sorry_endpoint);
- if(!ec){
- if(recv_size > 0){
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] down_thread_sorryserver_receive";
- buf << " receive data size[";
- buf << recv_size;
- buf << "] from [";
- buf << sorry_endpoint;
- buf << "]";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 46, buf.str(), __FILE__, __LINE__ );
- }
- //----Debug log----------------------------------------------------------------------
- down_thread_data_dest_side.set_size(recv_size);
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_recv(down_thread_id,sorry_endpoint,data_buff,recv_size);
- std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
- if(unlikely( func_type == down_thread_module_event_map.end() )){
- //Error unknown protocol_module_base::EVENT_TAG return
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] protocol_module returnd illegal EVENT_TAG : ";
- buf << module_event;
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 88, buf.str(), __FILE__, __LINE__ );
- down_thread_exit(process_type);
- return;
- }
- func_tag = func_type->second;
- sorry_recv_wait.reset();
- break;
- }else{
- //func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
- //boost::this_thread::yield();
- sorry_recv_wait.wait();
+ //----Debug log----------------------------------------------------------------------
+ down_thread_data_dest_side.set_size(recv_size);
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_recv(down_thread_id,sorry_endpoint,data_buff,recv_size);
+ std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
+ if(unlikely( func_type == down_thread_module_event_map.end() )){
+ //Error unknown protocol_module_base::EVENT_TAG return
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] protocol_module returnd illegal EVENT_TAG : ";
+ buf << module_event;
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 88, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
}
+ func_tag = func_type->second;
}else{
- if(ec == boost::asio::error::try_again){
- //func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
- //boost::this_thread::yield();
- sorry_recv_wait.wait();
- }else{
- func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
- sorry_recv_wait.reset();
- break;
- }
+ func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+ boost::this_thread::yield();
+ }
+ }else{
+ if(ec == boost::asio::error::try_again){
+ func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+ boost::this_thread::yield();
+ }else{
+ func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
}
}
-
down_thread_function_pair func = down_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map