OSDN Git Service

Re-add epoll repository
[ultramonkey-l7/ultramonkey-l7-v3.git] / l7vsd / src / tcp_session.cpp
index 99ff3af..5092aa5 100644 (file)
@@ -72,8 +72,8 @@ namespace l7vs{
         protocol_module(NULL),
         session_pause_flag(false),
         client_socket(session_io,set_option),
-        upstream_buffer_size(8192),
-        downstream_buffer_size(8192),
+        upstream_buffer_size(-1),
+        downstream_buffer_size(-1),
         virtualservice_endpoint(listen_endpoint),
         access_log_flag(false),
         access_logger(set_access_logger),
@@ -280,23 +280,11 @@ namespace l7vs{
         add_down_thread_vs_message_func.second = boost::bind(&tcp_session::down_thread_exit,this,_1);
         virtual_service_message_down_thread_function_map.insert(add_down_thread_vs_message_func);
 
-        if( ( up_cl_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
-            Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl_epoll_create", __FILE__, __LINE__ );
-            return;
-        }
-        if( ( up_rs_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
-            Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs_epoll_create", __FILE__, __LINE__ );
-            return;
-        }
-        if( ( down_cl_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
-            Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl_epoll_create", __FILE__, __LINE__ );
-            return;
-        }
-        if( ( down_rs_epollfd = epoll_create( MAXEVENTS ) ) < 0 ){
-            Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs_epoll_create", __FILE__, __LINE__ );
-            return;
-        }
-
+        // epoll impliment
+        up_client_epollfd = epoll_create( EVENT_NUM );
+        up_realserver_epollfd = epoll_create( EVENT_NUM );
+        down_client_epollfd = epoll_create( EVENT_NUM );
+        down_realserver_epollfd = epoll_create( EVENT_NUM );
     }
     //! destructor
     tcp_session::~tcp_session(){
@@ -329,6 +317,10 @@ namespace l7vs{
             else
                 break;
         }
+        close( up_client_epollfd );
+        close( up_realserver_epollfd );
+        close( down_client_epollfd );
+        close( down_realserver_epollfd );
     }
     //! initialize
     session_result_message tcp_session::initialize(){
@@ -367,15 +359,11 @@ namespace l7vs{
         int_val    = param.get_int( PARAM_COMP_SESSION, PARAM_UP_BUFFER_SIZE, vs_err );
         if((likely( !vs_err )) && (0 < int_val)){
             upstream_buffer_size    = int_val;
-        }else{
-            Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 1, "up buffer param error set default 8192" , __FILE__, __LINE__ );    
         }
 
         int_val    = param.get_int( PARAM_COMP_SESSION, PARAM_DOWN_BUFFER_SIZE, vs_err );
         if((likely( !vs_err )) && (0 < int_val)){
             downstream_buffer_size    = int_val;
-        }else{
-            Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 2, "down buffer param error set default 8192" , __FILE__, __LINE__ );    
         }
 
         protocol_module = parent_service.get_protocol_module();
@@ -393,51 +381,56 @@ namespace l7vs{
 
         // Reset SSL structure to allow another connection.
         if ( ssl_flag ) {
-            if ( ssl_cache_flag ) {
-                if (unlikely(ssl_clear_keep_cache(client_ssl_socket.get_socket().impl()->ssl) == false)) {
-                    //Error ssl_clear_keep_cache
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] ssl_clear_keep_cache failed";
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 110, buf.str(), __FILE__, __LINE__ );
-                    msg.flag = true;
-                    msg.message = "ssl_clear_keep_cache failed";
-                } else {
-                    //----Debug log----------------------------------------------------------------------
-                    if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+            if(client_ssl_socket.is_handshake_error()){
+                // remake socket
+                client_ssl_socket.clear_socket();
+            }else{
+                if ( ssl_cache_flag ) {
+                    if (unlikely(ssl_clear_keep_cache(client_ssl_socket.get_socket().impl()->ssl) == false)) {
+                        //Error ssl_clear_keep_cache
                         std::stringstream buf;
                         buf << "Thread ID[";
                         buf << boost::this_thread::get_id();
-                        buf << "] ssl_clear_keep_cache ok";
-                        Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 81,
-                                    buf.str(),
-                                    __FILE__, __LINE__ );
+                        buf << "] ssl_clear_keep_cache failed";
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 110, buf.str(), __FILE__, __LINE__ );
+                        msg.flag = true;
+                        msg.message = "ssl_clear_keep_cache failed";
+                    } else {
+                        //----Debug log----------------------------------------------------------------------
+                        if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                            std::stringstream buf;
+                            buf << "Thread ID[";
+                            buf << boost::this_thread::get_id();
+                            buf << "] ssl_clear_keep_cache ok";
+                            Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 81,
+                                        buf.str(),
+                                        __FILE__, __LINE__ );
+                        }
+                        //----Debug log----------------------------------------------------------------------
                     }
-                    //----Debug log----------------------------------------------------------------------
-                }
-            } else {
-                if (unlikely(SSL_clear(client_ssl_socket.get_socket().impl()->ssl) == 0)) {
-                    //Error SSL_clear
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] SSL_clear failed";
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 111, buf.str(), __FILE__, __LINE__ );
-                    msg.flag = true;
-                    msg.message = "SSL_clear failed";
-                }else{
-                    //----Debug log----------------------------------------------------------------------
-                    if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                } else {
+                    if (unlikely(SSL_clear(client_ssl_socket.get_socket().impl()->ssl) == 0)) {
+                        //Error SSL_clear
                         std::stringstream buf;
                         buf << "Thread ID[";
                         buf << boost::this_thread::get_id();
-                        buf << "] SSL_clear ok";
-                        Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 82,
-                                    buf.str(),
-                                    __FILE__, __LINE__ );
+                        buf << "] SSL_clear failed";
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 111, buf.str(), __FILE__, __LINE__ );
+                        msg.flag = true;
+                        msg.message = "SSL_clear failed";
+                    }else{
+                        //----Debug log----------------------------------------------------------------------
+                        if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                            std::stringstream buf;
+                            buf << "Thread ID[";
+                            buf << boost::this_thread::get_id();
+                            buf << "] SSL_clear ok";
+                            Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 82,
+                                        buf.str(),
+                                        __FILE__, __LINE__ );
+                        }
+                        //----Debug log----------------------------------------------------------------------
                     }
-                    //----Debug log----------------------------------------------------------------------
                 }
             }
         }
@@ -750,7 +743,6 @@ namespace l7vs{
             Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 18, buf.str(), __FILE__, __LINE__ );
         }
         //----Debug log----------------------------------------------------------------------
-        adaptive_wait down_thread_wait( 1, 128 );
         while(true){ 
             //wait down thread get id
             {
@@ -759,11 +751,8 @@ namespace l7vs{
                     break;
                 }
             }
-            //boost::this_thread::yield();
-            down_thread_wait.wait();
+            boost::this_thread::yield();
         }
-        down_thread_wait.reset();
-
         //----Debug log----------------------------------------------------------------------
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
             std::stringstream buf;
@@ -874,23 +863,25 @@ namespace l7vs{
         }
         if(likely( !is_exit )){
             //set client_socket options(recieve buffer size)
-            boost::asio::socket_base::receive_buffer_size    opt1( upstream_buffer_size );
-            if (!ssl_flag) {
-                client_socket.get_socket().lowest_layer().set_option( opt1 ,ec);
-            } else {
-                client_ssl_socket.get_socket().lowest_layer().set_option( opt1 ,ec);
-            }
-            if(unlikely( ec )){
-                //client socket Error!
-                std::stringstream buf;
-                buf << "Thread ID[";
-                buf << boost::this_thread::get_id();
-                buf << "] client socket recieve buffer size error : ";
-                buf << ec.message();
-                Logger::putLogError( LOG_CAT_L7VSD_SESSION, 12, buf.str(), __FILE__, __LINE__ );
-                {
-                    rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                    exit_flag = true;
+            if (upstream_buffer_size > 0) {
+                boost::asio::socket_base::receive_buffer_size opt1(upstream_buffer_size);
+                if (!ssl_flag) {
+                    client_socket.get_socket().lowest_layer().set_option(opt1, ec);
+                } else {
+                    client_ssl_socket.get_socket().lowest_layer().set_option(opt1, ec);
+                }
+                if(unlikely( ec )){
+                    //client socket Error!
+                    std::stringstream buf;
+                    buf << "Thread ID[";
+                    buf << boost::this_thread::get_id();
+                    buf << "] client socket recieve buffer size error : ";
+                    buf << ec.message();
+                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 12, buf.str(), __FILE__, __LINE__ );
+                    {
+                        rw_scoped_lock scoped_lock(exit_flag_update_mutex);
+                        exit_flag = true;
+                    }
                 }
             }
         }
@@ -901,38 +892,26 @@ namespace l7vs{
         }
         if(likely( !is_exit )){
             //set client_socket options(send buffer size)
-            boost::asio::socket_base::send_buffer_size        opt2( downstream_buffer_size );
-            if (!ssl_flag) {
-                client_socket.get_socket().lowest_layer().set_option( opt2 ,ec);
-            } else {
-                client_ssl_socket.get_socket().lowest_layer().set_option( opt2 ,ec);
-            }
-            if(unlikely( ec )){
-                //client socket Error!
-                std::stringstream buf;
-                buf << "Thread ID[";
-                buf << boost::this_thread::get_id();
-                buf << "] client socket send buffer size error : ";
-                buf << ec.message();
-                Logger::putLogError( LOG_CAT_L7VSD_SESSION, 13, buf.str(), __FILE__, __LINE__ );
-                {
-                    rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                    exit_flag = true;
+            if (downstream_buffer_size > 0) {
+                boost::asio::socket_base::send_buffer_size opt2(downstream_buffer_size);
+                if (!ssl_flag) {
+                    client_socket.get_socket().lowest_layer().set_option(opt2, ec);
+                } else {
+                    client_ssl_socket.get_socket().lowest_layer().set_option(opt2, ec);
+                }
+                if(unlikely( ec )){
+                    //client socket Error!
+                    std::stringstream buf;
+                    buf << "Thread ID[";
+                    buf << boost::this_thread::get_id();
+                    buf << "] client socket send buffer size error : ";
+                    buf << ec.message();
+                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 13, buf.str(), __FILE__, __LINE__ );
+                    {
+                        rw_scoped_lock scoped_lock(exit_flag_update_mutex);
+                        exit_flag = true;
+                    }
                 }
-            }
-        }
-
-        // epoll_register(up, cl)
-        int clientfd = client_socket.get_socket().lowest_layer().native();
-        epoll_event event;
-        memset( &event, 0, sizeof(epoll_event) );
-        event.events = EPOLLIN | EPOLLET;
-        event.data.fd = clientfd;
-        if ( epoll_ctl( up_cl_epollfd, EPOLL_CTL_ADD, clientfd, &event ) < 0 ){
-            Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl epoll_ctl add eror", __FILE__, __LINE__ );
-            {
-                rw_scoped_lock scoped_lock( exit_flag_update_mutex );
-                exit_flag = true;
             }
         }
 
@@ -1032,9 +1011,8 @@ namespace l7vs{
                 up_thread_next_call_function.second(LOCAL_PROC);
             }
 
-            //boost::this_thread::yield();
+            boost::this_thread::yield();
         }
-
         //----Debug log----------------------------------------------------------------------
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
             std::stringstream buf;
@@ -1057,7 +1035,6 @@ namespace l7vs{
             Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 22, buf.str(), __FILE__, __LINE__ );
         }
         //----Debug log----------------------------------------------------------------------
-        adaptive_wait down_thread_alive_wait( 1, 128 );
         while(true){ 
             // wait down thread alive
             {
@@ -1066,11 +1043,8 @@ namespace l7vs{
                     break;
                 }
             }
-            //boost::this_thread::yield();
-            down_thread_alive_wait.wait();
+            boost::this_thread::yield();
         }
-        down_thread_alive_wait.reset();
-
         //----Debug log----------------------------------------------------------------------
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
             std::stringstream buf;
@@ -1135,7 +1109,6 @@ namespace l7vs{
             Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 28, buf.str(), __FILE__, __LINE__ );
         }
         //----Debug log----------------------------------------------------------------------
-        adaptive_wait up_thread_active_wait( 1, 128 );
         while(true){ // UP_THREAD_ACTIVE
             // wait up thread active
             {
@@ -1148,11 +1121,8 @@ namespace l7vs{
                 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
                 if(unlikely(  exit_flag )) break;
             }
-            //boost::this_thread::yield();
-            up_thread_active_wait.wait();
+            boost::this_thread::yield();
         }
-        up_thread_active_wait.reset();
-
         //----Debug log----------------------------------------------------------------------
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
             std::stringstream buf;
@@ -1200,21 +1170,6 @@ namespace l7vs{
                 exit_flag = true;
             }
         }
-
-        // epoll_register(down, cl)
-        int clientfd = client_socket.get_socket().lowest_layer().native();
-        epoll_event event;
-        memset( &event, 0, sizeof(epoll_event) );
-        event.events = EPOLLOUT | EPOLLET;
-        event.data.fd = clientfd;
-        if ( epoll_ctl( down_cl_epollfd, EPOLL_CTL_ADD, clientfd, &event ) < 0 ){
-            Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl epoll_ctl add eror", __FILE__, __LINE__ );
-            {
-                rw_scoped_lock scoped_lock( exit_flag_update_mutex );
-                exit_flag = true;
-            }
-        }
-
         bool is_pause;
         while(true){
             {
@@ -1263,9 +1218,8 @@ namespace l7vs{
             }else{
                 down_thread_next_call_function.second(LOCAL_PROC);
             }
-            //boost::this_thread::yield();
+            boost::this_thread::yield();
         }
-
         //----Debug log----------------------------------------------------------------------
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
             std::stringstream buf;
@@ -1336,7 +1290,7 @@ namespace l7vs{
     std::string tcp_session::endpoint_to_string(
                 const boost::asio::ip::tcp::endpoint& target_endpoint){
         std::stringstream ret;
-       if( target_endpoint.address().is_v6() ){
+        if( target_endpoint.address().is_v6() ){
             ret << "[" << target_endpoint.address().to_string() << "]:" << target_endpoint.port();
         }else{
             ret << target_endpoint.address().to_string() << ":" << target_endpoint.port();
@@ -1398,7 +1352,7 @@ namespace l7vs{
                     // handshake fail
                     if(ec == boost::asio::error::try_again){
                         func_tag = UP_FUNC_CLIENT_ACCEPT;
-                        //boost::this_thread::yield();
+                        boost::this_thread::yield();
                     }else{
                         //ERROR
                         ssl_handshake_timer->cancel();
@@ -1532,7 +1486,15 @@ namespace l7vs{
         boost::system::error_code ec;
         std::size_t recv_size;
         UP_THREAD_FUNC_TYPE_TAG func_tag;
-        adaptive_wait up_recv_wait( 1, 128 );
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+        } else {
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+        }
+
         while( true ){
             {
                 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
@@ -1541,33 +1503,13 @@ namespace l7vs{
                     break;
                 }
             }
-            // epoll_wait(up,cl)
-            epoll_event events[ MAXEVENTS ];
-            int nfd = epoll_wait( up_cl_epollfd, events, MAXEVENTS, EPTIMEOUT);
-            if( nfd < 0 ){
-                Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl epoll_wait err", __FILE__, __LINE__ );
-                func_tag = UP_FUNC_EXIT;
-                break;
-            }
-            else if( nfd > 0 ){
-                epoll_event event;
-                event.events = EPOLLIN | EPOLLET;
-                event.data.fd = events[0].data.fd;
-                if( epoll_ctl( up_cl_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_cl epoll_ctl err", __FILE__, __LINE__ );
-                    func_tag = UP_FUNC_EXIT;
-                }
-                up_recv_wait.reset();
-                break;
-            }
-            up_recv_wait.wait();
-        }
-        if( UP_FUNC_EXIT != func_tag ){
+
             if (!ssl_flag) {
                 recv_size = client_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
             } else {
                 recv_size = client_ssl_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
             }
+
             if(!ec){
                 if(recv_size > 0){
                     //----Debug log----------------------------------------------------------------------
@@ -1606,24 +1548,36 @@ namespace l7vs{
                         return;
                     }
                     func_tag = func_type->second;
-                    //up_recv_wait.reset();
-                    //break;
+                    break;
                 }else{
-                    func_tag = UP_FUNC_CLIENT_RECEIVE;
+                    //func_tag = UP_FUNC_CLIENT_RECEIVE;
                     //boost::this_thread::yield();
-                    //up_recv_wait.wait();
                 }
             }else{
                 if(ec == boost::asio::error::try_again){
-                    func_tag = UP_FUNC_CLIENT_RECEIVE;
+                    //func_tag = UP_FUNC_CLIENT_RECEIVE;
                     //boost::this_thread::yield();
-                    //up_recv_wait.wait();
                 }else{
                     func_tag = UP_FUNC_CLIENT_DISCONNECT;
-                    //up_recv_wait.reset();
-                    //break;
+                    break;
                 }
             }
+
+            event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+            if ( !ssl_flag ) {
+                epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
+            } else {
+                epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+            }
+
+            epoll_wait( up_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
+        }
+
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
+        } else {
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
         }
 
         up_thread_function_pair    func    = up_thread_function_array[func_tag];
@@ -1792,7 +1746,11 @@ namespace l7vs{
         std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
         std::size_t send_size;
         UP_THREAD_FUNC_TYPE_TAG func_tag;
-        adaptive_wait up_send_wait( 1, 128 );
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, send_socket->second->get_socket().native(), &event );
+
         while( true ){
             {
                 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
@@ -1802,30 +1760,8 @@ namespace l7vs{
                 }
             }
 
-            // epoll_wait(up,rs)
-            epoll_event events[ MAXEVENTS ];
-            int nfd = epoll_wait( up_rs_epollfd, events, MAXEVENTS, EPTIMEOUT);
-            if( nfd < 0 ){
-                Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs epoll_wait err", __FILE__, __LINE__ );
-                func_tag = UP_FUNC_EXIT;
-                break;
-            }
-            else if( nfd > 0 ){
-                epoll_event event;
-                event.events = EPOLLOUT | EPOLLET;
-                event.data.fd = events[0].data.fd;
-                if( epoll_ctl( up_rs_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs epoll_ctl err", __FILE__, __LINE__ );
-                    func_tag = UP_FUNC_EXIT;
-                }
-                up_send_wait.reset();
-                break;
-            }
-            up_send_wait.wait();
-        }
-
-        if( UP_FUNC_EXIT != func_tag ){
             send_size = send_socket->second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+
             if( !ec ){
                 send_data_size += send_size;
                 up_thread_data_dest_side.set_send_size(send_data_size);
@@ -1862,20 +1798,26 @@ namespace l7vs{
                     }
                     func_tag = func_type->second;
                 }
-                //up_send_wait.reset();
-                //break;
+                break;
             }else{
                 if(ec == boost::asio::error::try_again){
-                    func_tag = UP_FUNC_REALSERVER_SEND;
+                    //func_tag = UP_FUNC_REALSERVER_SEND;
                     //boost::this_thread::yield();
-                    //up_send_wait.wait();
                 }else{
                     func_tag = UP_FUNC_REALSERVER_DISCONNECT;
-                    //up_send_wait.reset();
-                    //break;
+                    break;
                 }
             }
+
+            event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+            epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, send_socket->second->get_socket().native(), &event );
+
+            epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
+
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, send_socket->second->get_socket().native(), &event );
+
         up_thread_function_pair    func    = up_thread_function_array[func_tag];
         if( unlikely( !func.second ) ){
             //Error not find function map
@@ -1990,57 +1932,38 @@ namespace l7vs{
                 }
 
                 //set realserver_socket options(recieve buffer size)
-                boost::asio::socket_base::receive_buffer_size    opt1( downstream_buffer_size );
-                new_socket->get_socket().set_option(opt1 , ec);
-                if(unlikely( ec )){
-                    // socket set nonblocking mode error
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] realserver socket recieve buffer size error : ";
-                    buf << ec.message();
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__ );
-                    up_thread_exit(process_type);
-                    return;
+                if (downstream_buffer_size > 0) {
+                    boost::asio::socket_base::receive_buffer_size opt1(downstream_buffer_size);
+                    new_socket->get_socket().set_option(opt1, ec);
+                    if(unlikely( ec )){
+                        // socket set nonblocking mode error
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] realserver socket recieve buffer size error : ";
+                        buf << ec.message();
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__ );
+                        up_thread_exit(process_type);
+                        return;
+                    }
                 }
                 //set realserver_socket options(send buffer size)
-                boost::asio::socket_base::send_buffer_size        opt2( upstream_buffer_size );
-                new_socket->get_socket().set_option(opt2 , ec);
-                if(unlikely( ec )){
-                    // socket set nonblocking mode error
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] realserver socket send buffer size error : ";
-                    buf << ec.message();
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
-                    up_thread_exit(process_type);
-                    return;
-                }
-
-                // epoll_register(up, rs)
-                int realserverfd = new_socket->get_socket().lowest_layer().native();
-                epoll_event event;
-                memset( &event, 0, sizeof(epoll_event) );
-                event.events = EPOLLOUT | EPOLLET;
-                event.data.fd = realserverfd;
-                if ( epoll_ctl( up_rs_epollfd, EPOLL_CTL_ADD, realserverfd, &event ) < 0 ){
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "up_rs epoll_ctl add eror", __FILE__, __LINE__ );
-                    up_thread_exit(process_type);
-                    return;
-                }
-
-                // epoll_register(down, rs)
-                memset( &event, 0, sizeof(epoll_event) );
-                event.events = EPOLLIN | EPOLLET;
-                event.data.fd = realserverfd;
-                if ( epoll_ctl( down_rs_epollfd, EPOLL_CTL_ADD, realserverfd, &event ) < 0 ){
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs epoll_ctl add eror", __FILE__, __LINE__ );
-                    up_thread_exit(process_type);
-                    return;
+                if (upstream_buffer_size > 0) {
+                    boost::asio::socket_base::send_buffer_size opt2(upstream_buffer_size);
+                    new_socket->get_socket().set_option(opt2, ec);
+                    if(unlikely( ec )){
+                        // socket set nonblocking mode error
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] realserver socket send buffer size error : ";
+                        buf << ec.message();
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
+                        up_thread_exit(process_type);
+                        return;
+                    }
                 }
 
-
                 socket_element push_element;
                 push_element.first = server_endpoint;
                 push_element.second = new_socket;
@@ -2291,7 +2214,11 @@ namespace l7vs{
         std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
         std::size_t send_size;
         UP_THREAD_FUNC_TYPE_TAG func_tag;
-        adaptive_wait sorry_send_wait( 1, 128 );
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, sorryserver_socket.second->get_socket().native(), &event );
+
         while( true ){
             {
                 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
@@ -2300,7 +2227,9 @@ namespace l7vs{
                     break;
                 }
             }
+
             send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+
             if(!ec){
                 send_data_size += send_size;
                 up_thread_data_dest_side.set_send_size(send_data_size);
@@ -2336,19 +2265,26 @@ namespace l7vs{
                     }
                     func_tag = func_type->second;
                 }
-                sorry_send_wait.reset();
                 break;
             }else{
                 if(ec == boost::asio::error::try_again){
                     //func_tag = UP_FUNC_SORRYSERVER_SEND;
-                    sorry_send_wait.wait();
+                    //boost::this_thread::yield();
                 }else{
                     func_tag = UP_FUNC_SORRYSERVER_DISCONNECT;
-                    sorry_send_wait.reset();
                     break;
                 }
             }
+
+            event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+            epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, sorryserver_socket.second->get_socket().native(), &event );
+
+            epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
+
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, sorryserver_socket.second->get_socket().native(), &event );
+
         up_thread_function_pair    func    = up_thread_function_array[func_tag];
         if(unlikely( !func.second )){
             //Error not find function map
@@ -2763,7 +2699,11 @@ namespace l7vs{
         boost::system::error_code ec;
         size_t recv_size;
         DOWN_THREAD_FUNC_TYPE_TAG func_tag;
-        adaptive_wait down_recv_wait( 1, 128 );
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        epoll_ctl( down_realserver_epollfd, EPOLL_CTL_ADD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
         while( true ){
             {
                 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
@@ -2772,32 +2712,10 @@ namespace l7vs{
                     break;
                 }
             }
-            // epoll_wait(down,rs)
-            epoll_event events[ MAXEVENTS ];
-            int nfd = epoll_wait( down_rs_epollfd, events, MAXEVENTS, EPTIMEOUT);
-            if( nfd < 0 ){
-                Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs epoll_wait err", __FILE__, __LINE__ );
-                func_tag = DOWN_FUNC_EXIT;
-                break;
-            }
-            else if( nfd > 0 ){
-                epoll_event event;
-                event.events = EPOLLIN | EPOLLET;
-                event.data.fd = events[0].data.fd;
-                if( epoll_ctl( down_rs_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_rs epoll_ctl err", __FILE__, __LINE__ );
-                    func_tag = DOWN_FUNC_EXIT;
-                }
-                down_recv_wait.reset();
-                break;
-            }
-            down_recv_wait.wait();
-        }
-        if( DOWN_FUNC_EXIT != func_tag ){
+
             recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
-    
+
             boost::asio::ip::tcp::endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
-    
             down_thread_data_dest_side.set_endpoint(server_endpoint);
             if(!ec){
                 if(recv_size > 0){
@@ -2813,7 +2731,7 @@ namespace l7vs{
                         buf << server_endpoint;
                         buf << "]";
                         Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 43, buf.str(), __FILE__, __LINE__ );
-                    }
+                   }
                     //----Debug log----------------------------------------------------------------------
                     down_thread_data_dest_side.set_size(recv_size);
                     parent_service.update_down_recv_size(recv_size);
@@ -2831,26 +2749,30 @@ namespace l7vs{
                         return;
                     }
                     func_tag = func_type->second;
-                    down_recv_wait.reset();
-                    //break;
+                    break;
                 }else{
-                    func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+                    //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
                     //boost::this_thread::yield();
-                    //down_recv_wait.wait();
                 }
             }else{
                 if(ec == boost::asio::error::try_again){
-                    func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+                    //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
                     //boost::this_thread::yield();
-                    //down_recv_wait.wait();
                 }else{
                     func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
-                    //down_recv_wait.reset();
-                    //break;
+                    break;
                 }
             }
+
+            event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+            epoll_ctl( down_realserver_epollfd, EPOLL_CTL_MOD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
+            epoll_wait( down_realserver_epollfd, down_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
 
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        epoll_ctl( down_realserver_epollfd, EPOLL_CTL_DEL, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
         down_thread_function_pair    func    = down_thread_function_array[func_tag];
         if(unlikely( !func.second ) ){
             //Error not find function map
@@ -3102,14 +3024,21 @@ namespace l7vs{
     //! down thread send for client and raise module event of handle_client_send
     //! @param[in]        process_type is prosecess type
     void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_type){
-        
         boost::system::error_code ec;
         boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_client_side.get_data();
         std::size_t data_size = down_thread_data_client_side.get_size();
         std::size_t send_data_size = down_thread_data_client_side.get_send_size();
         std::size_t send_size;
         DOWN_THREAD_FUNC_TYPE_TAG func_tag;
-        adaptive_wait down_send_wait( 1, 128 );
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+        } else {
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+        }
+
         while( true ){
             {
                 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
@@ -3118,33 +3047,13 @@ namespace l7vs{
                     break;
                 }
             }
-            // epoll_wait(down,cl)
-            epoll_event events[ MAXEVENTS ];
-            int nfd = epoll_wait( down_cl_epollfd, events, MAXEVENTS, EPTIMEOUT);
-            if( nfd < 0 ){
-                Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl epoll_wait err", __FILE__, __LINE__ );
-                func_tag = DOWN_FUNC_EXIT;
-                break;
-            }
-            else if( nfd > 0 ){
-                epoll_event event;
-                event.events = EPOLLOUT | EPOLLET;
-                event.data.fd = events[0].data.fd;
-                if( epoll_ctl( down_cl_epollfd, EPOLL_CTL_MOD, events[0].data.fd, &event ) < 0 ){
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, "down_cl epoll_ctl err", __FILE__, __LINE__ );
-                    func_tag = DOWN_FUNC_EXIT;
-                }
-                down_send_wait.reset();
-                break;
-            }
-            down_send_wait.wait();
-        }
-        if( DOWN_FUNC_EXIT != func_tag ){
+
             if (!ssl_flag) {
                 send_size = client_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
             } else {
                 send_size = client_ssl_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
             }
+
             if(!ec){
                 send_data_size += send_size;
                 down_thread_data_client_side.set_send_size(send_data_size);
@@ -3188,20 +3097,34 @@ namespace l7vs{
                     }
                     func_tag = func_type->second;
                 }
-                down_send_wait.reset();
-                //break;
+                break;
             }else{
                 if(ec == boost::asio::error::try_again){
-                    func_tag = DOWN_FUNC_CLIENT_SEND;
+                    //func_tag = DOWN_FUNC_CLIENT_SEND;
                     //boost::this_thread::yield();
-                    //down_send_wait.wait();
                 }else{
                     func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
-                    //down_send_wait.reset();
-                    //break;
+                    break;
                 }
             }
+
+            event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+            if ( !ssl_flag ) {
+                epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
+            } else {
+                epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+            }
+
+            epoll_wait( down_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
+        }
+
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
+        } else {
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
         }
+
         down_thread_function_pair    func    = down_thread_function_array[func_tag];
         if(unlikely( !func.second )){
             //Error not find function map
@@ -3298,71 +3221,53 @@ namespace l7vs{
         down_thread_data_dest_side.initialize();
         boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
         boost::system::error_code ec;
-        size_t recv_size;
+        size_t recv_size = sorryserver_socket.second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
+        boost::asio::ip::tcp::endpoint sorry_endpoint = sorryserver_socket.first;
+        down_thread_data_dest_side.set_endpoint(sorry_endpoint);
         DOWN_THREAD_FUNC_TYPE_TAG func_tag;
-        adaptive_wait sorry_recv_wait( 1, 128 );
-        while( true ){
-            {
-                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
-                if(unlikely(exit_flag)){
-                    func_tag = DOWN_FUNC_EXIT;
-                    break;
+        if(!ec){
+            if(recv_size > 0){
+                //----Debug log----------------------------------------------------------------------
+                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+                    std::stringstream buf;
+                    buf << "Thread ID[";
+                    buf << boost::this_thread::get_id();
+                    buf << "] down_thread_sorryserver_receive";
+                    buf << " receive data size[";
+                    buf << recv_size;
+                    buf << "] from [";
+                    buf << sorry_endpoint;
+                    buf << "]";
+                    Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 46, buf.str(), __FILE__, __LINE__ );
                 }
-            }
-            recv_size = sorryserver_socket.second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
-            boost::asio::ip::tcp::endpoint sorry_endpoint = sorryserver_socket.first;
-            down_thread_data_dest_side.set_endpoint(sorry_endpoint);
-            if(!ec){
-                if(recv_size > 0){
-                    //----Debug log----------------------------------------------------------------------
-                    if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
-                        std::stringstream buf;
-                        buf << "Thread ID[";
-                        buf << boost::this_thread::get_id();
-                        buf << "] down_thread_sorryserver_receive";
-                        buf << " receive data size[";
-                        buf << recv_size;
-                        buf << "] from [";
-                        buf << sorry_endpoint;
-                        buf << "]";
-                        Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 46, buf.str(), __FILE__, __LINE__ );
-                    }
-                    //----Debug log----------------------------------------------------------------------
-                    down_thread_data_dest_side.set_size(recv_size);
-                    protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_recv(down_thread_id,sorry_endpoint,data_buff,recv_size);
-                    std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
-                    if(unlikely( func_type == down_thread_module_event_map.end() )){
-                        //Error unknown protocol_module_base::EVENT_TAG return
-                        std::stringstream buf;
-                        buf << "Thread ID[";
-                        buf << boost::this_thread::get_id();
-                        buf << "] protocol_module returnd illegal EVENT_TAG : ";
-                        buf << module_event;    
-                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 88, buf.str(), __FILE__, __LINE__ );
-                        down_thread_exit(process_type);
-                        return;
-                    }
-                    func_tag = func_type->second;
-                    sorry_recv_wait.reset();
-                    break;
-                }else{
-                    //func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
-                    //boost::this_thread::yield();
-                    sorry_recv_wait.wait();
+                //----Debug log----------------------------------------------------------------------
+                down_thread_data_dest_side.set_size(recv_size);
+                protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_recv(down_thread_id,sorry_endpoint,data_buff,recv_size);
+                std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
+                if(unlikely( func_type == down_thread_module_event_map.end() )){
+                    //Error unknown protocol_module_base::EVENT_TAG return
+                    std::stringstream buf;
+                    buf << "Thread ID[";
+                    buf << boost::this_thread::get_id();
+                    buf << "] protocol_module returnd illegal EVENT_TAG : ";
+                    buf << module_event;    
+                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 88, buf.str(), __FILE__, __LINE__ );
+                    down_thread_exit(process_type);
+                    return;
                 }
+                func_tag = func_type->second;
             }else{
-                if(ec == boost::asio::error::try_again){
-                    //func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
-                    //boost::this_thread::yield();
-                    sorry_recv_wait.wait();
-                }else{
-                    func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
-                    sorry_recv_wait.reset();
-                    break;
-                }
+                func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+                boost::this_thread::yield();
+            }
+        }else{
+            if(ec == boost::asio::error::try_again){
+                func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+                boost::this_thread::yield();
+            }else{
+                func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
             }
         }
-
         down_thread_function_pair    func    = down_thread_function_array[func_tag];
         if(unlikely( !func.second )){
             //Error not find function map