OSDN Git Service

Revert virtualservice_tcp, tcp_session
author0809216 <0809216@1ed66053-1c2d-0410-8867-f7571e6e31d3>
Mon, 8 Nov 2010 12:17:40 +0000 (12:17 +0000)
committer0809216 <0809216@1ed66053-1c2d-0410-8867-f7571e6e31d3>
Mon, 8 Nov 2010 12:17:40 +0000 (12:17 +0000)
git-svn-id: http://10.144.169.20/repos/um/branches/l7vsd-3.x-ramiel-epoll-cond@10425 1ed66053-1c2d-0410-8867-f7571e6e31d3

l7vsd/src/tcp_session.cpp
l7vsd/src/virtualservice_tcp.cpp

index 5584cc4..85024db 100644 (file)
@@ -1378,15 +1378,6 @@ void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_ty
                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
         }
 
-        boost::system::error_code ec;
-        std::size_t recv_size;
-        UP_THREAD_FUNC_TYPE_TAG func_tag = UP_FUNC_CLIENT_RECEIVE;
-        struct epoll_event event;
-        int ret_fds;
-
-        up_thread_data_client_side.initialize();
-        boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_client_side.get_data();
-
         if (unlikely(0 < parent_service.get_wait_upstream())) {
                 //----Debug log----------------------------------------------------------------------
                 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
@@ -1396,7 +1387,84 @@ void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_ty
                         Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__);
                 }
                 //----Debug log----------------------------------------------------------------------
-                goto up_thread_client_receive_out;
+                return; // try again
+        }
+        up_thread_data_client_side.initialize();
+        boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_client_side.get_data();
+        boost::system::error_code ec;
+        std::size_t recv_size;
+        UP_THREAD_FUNC_TYPE_TAG func_tag;
+
+        struct epoll_event event;
+        event.data.fd = !ssl_flag ? client_socket.get_socket().native()
+                        : client_ssl_socket.get_socket().lowest_layer().native();
+
+        // epoll wait codes
+        if (is_epoll_edge_trigger) {
+                event.events = EPOLLIN | EPOLLHUP | EPOLLET;
+        } else {
+                event.events = EPOLLIN | EPOLLHUP;
+        }
+        bool add_flag = false;
+        if (!up_client_epollfd_registered) {
+                if (epoll_ctl(up_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+                        std::stringstream buf;
+                        buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_ADD error: ";
+                        buf << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        boost::this_thread::yield();
+                        return;
+                }
+                up_client_epollfd_registered = true;
+                add_flag = true;
+        }
+
+        if (is_epoll_edge_trigger && (!add_flag)) {
+                if (epoll_ctl(up_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+                        std::stringstream buf;
+                        buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_MOD error: ";
+                        buf << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
+                        return;
+                }
+        }
+        int ret_fds = epoll_wait(up_client_epollfd, up_client_events, EVENT_NUM, epoll_timeout);
+        if (ret_fds == 0) {
+                //----Debug log----------------------------------------------------------------------
+                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                        boost::format formatter("up_thread_client_receive: epoll_wait timeout %d msec");
+                        formatter % epoll_timeout;
+                        Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+                }
+                //----Debug log----------------------------------------------------------------------
+                up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_RECEIVE];
+                return;
+        } else if (ret_fds < 0) {
+                boost::format formatter("up_thread_client_receive: epoll_wait error: %s");
+                formatter % strerror(errno);
+                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+                up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
+                return;
+        }
+
+        for (int i = 0; i < ret_fds; ++i) {
+                if (up_client_events[i].data.fd == event.data.fd) {
+                        if (up_client_events[i].events & EPOLLIN) {
+                                break;
+                        }
+                        if (up_client_events[i].events & EPOLLHUP) {
+                                up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
+                                //----Debug log----------------------------------------------------------------------
+                                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                                        boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_receive EPOLLHUP: NEXT_FUNC[%s]");
+                                        formatter % boost::this_thread::get_id() % func_tag_to_string(UP_FUNC_CLIENT_DISCONNECT);
+                                        Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+                                }
+                                //----Debug log----------------------------------------------------------------------
+                                return;
+                        }
+                }
         }
 
         recv_size = !ssl_flag ? client_socket.read_some(
@@ -1405,10 +1473,14 @@ void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_ty
                     : client_ssl_socket.read_some(
                             boost::asio::buffer(data_buff, MAX_BUFFER_SIZE),
                             ec);
+
         if (!ec) {
                 if (recv_size > 0) {
                         //----Debug log----------------------------------------------------------------------
                         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                                endpoint client_endpoint; // XXX redefined???
+                                client_endpoint = !ssl_flag ? client_socket.get_socket().lowest_layer().remote_endpoint(ec)
+                                                  : client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
                                 boost::format formatter("Thread ID[%d] up_thread_client_receive receive data size[%d] from [%d]");
                                 formatter % boost::this_thread::get_id() % recv_size % client_endpoint;
                                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 36, formatter.str(), __FILE__, __LINE__);
@@ -1424,70 +1496,20 @@ void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_ty
                                 formatter % boost::this_thread::get_id() % module_event;
                                 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 20, formatter.str(), __FILE__, __LINE__);
                                 up_thread_exit(process_type);
-                                func_tag = UP_FUNC_EXIT;
-                        } else {
-                                func_tag = func_type->second;
+                                return;
                         }
-                } else if (recv_size < 0) {
+                        func_tag = func_type->second;
+                } else {
                         boost::format formatter("Thread ID[%d] client read error. recv_size: %d");
                         formatter % boost::this_thread::get_id() % recv_size;
                         Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+                        func_tag = UP_FUNC_CLIENT_RECEIVE;
                 }
         } else {
-                if (ec == boost::asio::error::try_again) {
-                        // epoll wait codes
-                        event.data.fd = !ssl_flag ? client_socket.get_socket().native()
-                                        : client_ssl_socket.get_socket().lowest_layer().native();
-
-                        if (!up_client_epollfd_registered) {
-                                if (is_epoll_edge_trigger) {
-                                        event.events = EPOLLIN | EPOLLHUP | EPOLLET;
-                                } else {
-                                        event.events = EPOLLIN | EPOLLHUP;
-                                }
-                                if ( (!ssl_flag && !client_socket.get_socket().is_open()) ||
-                                     (ssl_flag && !client_ssl_socket.get_socket().lowest_layer().is_open()) ) {
-                                        func_tag = UP_FUNC_CLIENT_DISCONNECT;
-                                        goto up_thread_client_receive_out;
-                                }
-                                if (epoll_ctl(up_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
-                                        std::stringstream buf;
-                                        buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_ADD error: ";
-                                        buf << strerror(errno);
-                                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                                        func_tag = UP_FUNC_CLIENT_DISCONNECT;
-                                        goto up_thread_client_receive_out;
-                                }
-                                up_client_epollfd_registered = true;
-                        }
-                        ret_fds = epoll_wait(up_client_epollfd, up_client_events, EVENT_NUM, epoll_timeout);
-                        if (ret_fds == 0) {
-                                goto up_thread_client_receive_out; // epoll timeout
-                        } else if (ret_fds < 0) {
-                                boost::format formatter("up_thread_client_receive: epoll_wait error: %s");
-                                formatter % strerror(errno);
-                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
-                                func_tag = UP_FUNC_CLIENT_DISCONNECT;
-                                goto up_thread_client_receive_out;
-                        }
-                
-                        for (int i = 0; i < ret_fds; ++i) {
-                                if (up_client_events[i].data.fd == event.data.fd) {
-                                        if (up_client_events[i].events & EPOLLIN) {
-                                                goto up_thread_client_receive_out;
-                                        }
-                                        if (up_client_events[i].events & EPOLLHUP) {
-                                                func_tag = UP_FUNC_CLIENT_DISCONNECT;
-                                                goto up_thread_client_receive_out;
-                                        }
-                                }
-                        }
-                        boost::format formatter("up_thread_client_receive: unknown file descriptor's event: %s");
-                        formatter % strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
-                        func_tag = UP_FUNC_CLIENT_DISCONNECT;
-                } else if (ec == boost::asio::error::eof) {
+                if (ec == boost::asio::error::eof) {
                         func_tag = UP_FUNC_CLIENT_DISCONNECT;
+                } else if (ec == boost::asio::error::try_again) {
+                        func_tag = UP_FUNC_CLIENT_RECEIVE;
                 } else {
                         boost::format formatter("Thread ID[%d] client read error: %s");
                         formatter % boost::this_thread::get_id() % ec.message();
@@ -1495,8 +1517,6 @@ void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_ty
                         func_tag = UP_FUNC_CLIENT_DISCONNECT;
                 }
         }
-
-up_thread_client_receive_out:
         up_thread_next_call_function  = up_thread_function_array[func_tag];
 
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
@@ -1631,10 +1651,82 @@ void tcp_session::up_thread_realserver_send(const TCP_PROCESS_TYPE_TAG process_t
         std::size_t data_size = up_thread_data_dest_side.get_size();
         std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
         std::size_t send_size;
-        UP_THREAD_FUNC_TYPE_TAG func_tag = UP_FUNC_REALSERVER_SEND;
-        int ret_fds;
+        UP_THREAD_FUNC_TYPE_TAG func_tag;
 
         struct epoll_event event;
+        event.data.fd = send_socket->second->get_socket().native();
+        if (is_epoll_edge_trigger) {
+                event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
+        } else {
+                event.events = EPOLLOUT | EPOLLHUP;
+        }
+        bool add_flag = false;
+        if (!up_realserver_epollfd_registered) {
+                if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+                        std::stringstream buf;
+                        buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_ADD error: ";
+                        buf << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        boost::this_thread::yield();
+                        return;
+                }
+                up_realserver_epollfd_registered = true;
+                add_flag = true;
+        }
+
+        {
+                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+                if (unlikely(exit_flag)) {
+                        up_thread_next_call_function = up_thread_function_array[UP_FUNC_EXIT];
+                        return;
+                }
+        }
+
+        if (is_epoll_edge_trigger && (!add_flag)) {
+                if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+                        std::stringstream buf;
+                        buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_MOD error: ";
+                        buf << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        boost::this_thread::yield();
+                        return;
+                }
+        }
+
+        int ret_fds = epoll_wait(up_realserver_epollfd, up_realserver_events, EVENT_NUM, epoll_timeout);
+        if (ret_fds <= 0) {
+                if (ret_fds == 0) {
+                        //----Debug log----------------------------------------------------------------------
+                        if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                                std::stringstream buf;
+                                buf << "up_thread_realserver_send: epoll_wait timeout " << epoll_timeout << " msec.";
+                                Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        }
+                        //----Debug log----------------------------------------------------------------------
+                } else {
+                        std::stringstream buf;
+                        buf << "up_thread_realserver_send: epoll_wait error: " << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                }
+                //XXX no need to retry???
+                up_thread_next_call_function = up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT];
+                return;
+        }
+
+        for (int i = 0; i < ret_fds; ++i) {
+                if (up_realserver_events[i].data.fd == event.data.fd) {
+                        if (up_realserver_events[i].events & EPOLLOUT) {
+                                break;
+                        } else if (up_realserver_events[i].events & EPOLLHUP) {
+                                std::stringstream buf;
+                                buf << "up_thread_realserver_send: epoll hung up event";
+                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                                up_thread_next_call_function = up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT];
+                                return;
+                        }
+                }
+        }
+
         send_size = send_socket->second->write_some(
                             boost::asio::buffer(
                                     data_buff.data() + send_data_size,
@@ -1651,70 +1743,29 @@ void tcp_session::up_thread_realserver_send(const TCP_PROCESS_TYPE_TAG process_t
                         Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 37, formatter.str(), __FILE__, __LINE__);
                 }
                 //----Debug log----------------------------------------------------------------------
-                if (data_size <= send_data_size) {
+                if (data_size > send_data_size) {
+                        func_tag = UP_FUNC_REALSERVER_SEND;
+                } else {
                         protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_send(up_thread_id);
                         std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
                         func_tag = func_type->second;
                 }
         } else {
                 if (ec == boost::asio::error::try_again) {
-                        event.data.fd = send_socket->second->get_socket().native();
-                        if (!up_realserver_epollfd_registered) {
-                                if (is_epoll_edge_trigger) {
-                                        event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
-                                } else {
-                                        event.events = EPOLLOUT | EPOLLHUP;
-                                }
-                                if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
-                                        std::stringstream buf;
-                                        buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_ADD error: ";
-                                        buf << strerror(errno);
-                                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                                        goto up_thread_realserver_send_out;
-                                }
-                                up_realserver_epollfd_registered = true;
-                        }
-                        ret_fds = epoll_wait(up_realserver_epollfd, up_realserver_events, EVENT_NUM, epoll_timeout);
-                        if (ret_fds == 0) {
-                                goto up_thread_realserver_send_out; // epoll timeout
-                        } else if (ret_fds < 0) {
-                                std::stringstream buf;
-                                buf << "up_thread_realserver_send: epoll_wait error: " << strerror(errno);
-                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                                func_tag = UP_FUNC_REALSERVER_DISCONNECT;
-                                goto up_thread_realserver_send_out;
-                        }
-                        for (int i = 0; i < ret_fds; ++i) {
-                                if (up_realserver_events[i].data.fd == event.data.fd) {
-                                        if (up_realserver_events[i].events & EPOLLOUT) {
-                                                goto up_thread_realserver_send_out;
-                                        } else if (up_realserver_events[i].events & EPOLLHUP) {
-                                                func_tag = UP_FUNC_REALSERVER_DISCONNECT;
-                                                goto up_thread_realserver_send_out;
-                                        }
-                                }
-                        }
-                        boost::format formatter("up_thread_realserver_send: unknown file descriptor's event: %s");
-                        formatter % strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
-                        func_tag = UP_FUNC_REALSERVER_DISCONNECT;
-                } else if (ec == boost::asio::error::eof) {
-                        func_tag = UP_FUNC_REALSERVER_DISCONNECT;
+                        func_tag = UP_FUNC_REALSERVER_SEND;
                 } else {
-                        boost::format formatter("Thread ID[%d] realserver write error: %s");
-                        formatter % boost::this_thread::get_id() % ec.message();
-                        Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
                         func_tag = UP_FUNC_REALSERVER_DISCONNECT;
                 }
         }
 
-up_thread_realserver_send_out:
         up_thread_next_call_function = up_thread_function_array[func_tag];
+
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
                 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_send: NEXT_FUNC[%s]");
                 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
         }
+
 }
 
 //! up thread raise module event of handle_realserver_select
@@ -2608,16 +2659,6 @@ void tcp_session::down_thread_realserver_receive(const TCP_PROCESS_TYPE_TAG proc
                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
         }
 
-        down_thread_data_dest_side.initialize();
-        boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
-        boost::system::error_code ec;
-        size_t recv_size;
-        DOWN_THREAD_FUNC_TYPE_TAG func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
-        struct epoll_event event;
-        int ret_fds;
-        endpoint server_endpoint;
-        std::list<socket_element>::iterator list_end;
-
         if (down_thread_receive_realserver_socket_list.empty()) {
                 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
                         boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_receive");
@@ -2628,8 +2669,10 @@ void tcp_session::down_thread_realserver_receive(const TCP_PROCESS_TYPE_TAG proc
                 if (!realserver_connect_status) {
                         to_time(LOCKTIMEOUT, xt);
                         realserver_connect_cond.timed_wait(lock, xt);
+                        //realserver_connect_cond.wait(lock);
                 }
-                goto down_thread_realserver_receive_no_socket_out;
+                down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE];
+                return;
         }
 
         if (unlikely(0 < parent_service.get_wait_downstream())) {
@@ -2641,10 +2684,81 @@ void tcp_session::down_thread_realserver_receive(const TCP_PROCESS_TYPE_TAG proc
                         Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 42, buf.str(), __FILE__, __LINE__);
                 }
                 //----Debug log----------------------------------------------------------------------
-                goto down_thread_realserver_receive_out;
+                return;
+        }
+        down_thread_data_dest_side.initialize();
+        boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
+        boost::system::error_code ec;
+        size_t recv_size;
+        DOWN_THREAD_FUNC_TYPE_TAG func_tag;
+
+        struct epoll_event event;
+        event.data.fd = down_thread_current_receive_realserver_socket->second->get_socket().native();
+        if (is_epoll_edge_trigger) {
+                event.events = EPOLLIN | EPOLLHUP | EPOLLET;
+        } else {
+                event.events = EPOLLIN | EPOLLHUP;
+        }
+        bool add_flag = false;
+        if (!down_realserver_epollfd_registered) {
+                if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+                        std::stringstream buf;
+                        buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_ADD error: " << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        boost::this_thread::yield();
+                        return;
+                }
+                down_realserver_epollfd_registered = true;
+                add_flag = true;
+        }
+
+        if (is_epoll_edge_trigger && (!add_flag)) {
+                if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+                        std::stringstream buf;
+                        buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_MOD error: " << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        boost::this_thread::yield();
+                        return;
+                }
+        }
+
+        int ret_fds = epoll_wait(down_realserver_epollfd, down_realserver_events, EVENT_NUM, epoll_timeout);
+        if (ret_fds <= 0) {
+                if (ret_fds == 0) {
+                        //----Debug log----------------------------------------------------------------------
+                        if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                                boost::format formatter("down_thread_realserver_receive: epoll_wait timeout %d msec.");
+                                formatter % epoll_timeout;
+                                Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+                        }
+                        //----Debug log----------------------------------------------------------------------
+                        down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE];
+                        return;
+                } else {
+                        boost::format formatter("down_thread_realserver_receive: epoll_wait error: %d");
+                        formatter % strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
+                        down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT];
+                        return;
+                }
         }
 
-        server_endpoint = down_thread_current_receive_realserver_socket->first;
+        for (int i = 0; i < ret_fds; ++i) {
+                if (down_realserver_events[i].data.fd == event.data.fd) {
+                        if (down_realserver_events[i].events & EPOLLIN) {
+                                break;
+                        } else if (down_realserver_events[i].events & EPOLLHUP) {
+                                down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT];
+                                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                                        boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_receive: EPOLL_HUP");
+                                        formatter % boost::this_thread::get_id();
+                                }
+                                return;
+                        }
+                }
+        }
+
+        endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
         down_thread_data_dest_side.set_endpoint(server_endpoint);
 
         recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff, MAX_BUFFER_SIZE), ec);
@@ -2665,68 +2779,25 @@ void tcp_session::down_thread_realserver_receive(const TCP_PROCESS_TYPE_TAG proc
 
                         std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
                         func_tag = func_type->second;
+                } else {
+                        func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+                        //boost::this_thread::yield();
                 }
         } else {
                 if (ec == boost::asio::error::try_again) {
-                        // epoll wait
-                        event.data.fd = down_thread_current_receive_realserver_socket->second->get_socket().native();
-                        if (!down_realserver_epollfd_registered) {
-                                if (is_epoll_edge_trigger) {
-                                        event.events = EPOLLIN | EPOLLHUP | EPOLLET;
-                                } else {
-                                        event.events = EPOLLIN | EPOLLHUP;
-                                }
-                                if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
-                                        std::stringstream buf;
-                                        buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_ADD error: " << strerror(errno);
-                                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                                        goto down_thread_realserver_receive_out;
-                                }
-                                down_realserver_epollfd_registered = true;
-                        }
-                        ret_fds = epoll_wait(down_realserver_epollfd, down_realserver_events, EVENT_NUM, epoll_timeout);
-                        if (ret_fds == 0) {
-                                goto down_thread_realserver_receive_out; // epoll timeout
-                        } else if (ret_fds < 0) {
-                                boost::format formatter("down_thread_realserver_receive: epoll_wait error: %d");
-                                formatter % strerror(errno);
-                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
-                                func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
-                                goto down_thread_realserver_receive_out;
-                        }
-                        for (int i = 0; i < ret_fds; ++i) {
-                                if (down_realserver_events[i].data.fd == event.data.fd) {
-                                        if (down_realserver_events[i].events & EPOLLIN) {
-                                                goto down_thread_realserver_receive_out;
-                                        } else if (down_realserver_events[i].events & EPOLLHUP) {
-                                                func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
-                                                goto down_thread_realserver_receive_out;
-                                        }
-                                }
-                        }
-                        boost::format formatter("down_thread_realserver_receive: unknown file descriptor's event: %s");
-                        formatter % strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
-                        func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
-                } else if (ec == boost::asio::error::eof) {
-                        func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
+                        func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+                        //boost::this_thread::yield();
                 } else {
-                        boost::format formatter("Thread ID[%d] realserver read error: %s");
-                        formatter % boost::this_thread::get_id() % ec.message();
-                        Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
                         func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
                 }
         }
 
-down_thread_realserver_receive_out:
+        down_thread_function_pair func = down_thread_function_array[func_tag];
         down_thread_current_receive_realserver_socket++;
-        list_end = down_thread_receive_realserver_socket_list.end();
+        std::list<socket_element>::iterator list_end = down_thread_receive_realserver_socket_list.end();
         if (down_thread_current_receive_realserver_socket == list_end) {
                 down_thread_current_receive_realserver_socket = down_thread_receive_realserver_socket_list.begin();
         }
-
-down_thread_realserver_receive_no_socket_out:
-        down_thread_function_pair func = down_thread_function_array[func_tag];
         down_thread_next_call_function = func;
 
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
@@ -2933,91 +3004,124 @@ void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_typ
         std::size_t data_size = down_thread_data_client_side.get_size();
         std::size_t send_data_size = down_thread_data_client_side.get_send_size();
         std::size_t send_size;
-        DOWN_THREAD_FUNC_TYPE_TAG func_tag = DOWN_FUNC_CLIENT_SEND;
+        DOWN_THREAD_FUNC_TYPE_TAG func_tag;
+
         struct epoll_event event;
-        int ret_fds;
+        event.data.fd = !ssl_flag ? client_socket.get_socket().native()
+                        : client_ssl_socket.get_socket().lowest_layer().native();
 
-        send_size = !ssl_flag ? client_socket.write_some(
-                            boost::asio::buffer(
-                                    data_buff.data() + send_data_size,
-                                    data_size - send_data_size),
-                            ec)
-                    : client_ssl_socket.write_some(
-                            boost::asio::buffer(
-                                    data_buff.data() + send_data_size,
-                                    data_size - send_data_size),
-                            ec);
-        if (!ec) {
-                send_data_size += send_size;
-                down_thread_data_client_side.set_send_size(send_data_size);
-                parent_service.update_down_send_size(send_size);
-                //----Debug log----------------------------------------------------------------------
-                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
-                        boost::format formatter("Thread ID[%d] down_thread_client_send send data size[%d] for [%d]");
-                        formatter % boost::this_thread::get_id() % send_size % client_endpoint;
-                        Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 45, formatter.str(), __FILE__, __LINE__);
+        if (is_epoll_edge_trigger) {
+                event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
+        } else {
+                event.events = EPOLLOUT | EPOLLHUP;
+        }
+        bool add_flag = false;
+        if (!down_client_epollfd_registered) {
+                if (epoll_ctl(down_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
+                        std::stringstream buf;
+                        buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_ADD error: ";
+                        buf << strerror(errno);
+                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        boost::this_thread::yield();
+                        return;
                 }
-                //----Debug log----------------------------------------------------------------------
-                if (data_size > send_data_size) {
-                        goto down_thread_client_send_out;
-                } else {
-                        protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
-                        std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
-                        func_tag = func_type->second;
-                        goto down_thread_client_send_out;
+                down_client_epollfd_registered = true;
+                add_flag = true;
+        }
+
+        while (true) {
+                {
+                        rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+                        if (unlikely(exit_flag)) {
+                                func_tag = DOWN_FUNC_EXIT;
+                                break;
+                        }
                 }
-        } else {
-                if (ec == boost::asio::error::try_again) {
-                        event.data.fd = !ssl_flag ? client_socket.get_socket().native()
-                                        : client_ssl_socket.get_socket().lowest_layer().native();
-                        if (!down_client_epollfd_registered) {
-                                if (is_epoll_edge_trigger) {
-                                        event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
-                                } else {
-                                        event.events = EPOLLOUT | EPOLLHUP;
-                                }
-                                if (epoll_ctl(down_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
-                                        std::stringstream buf;
-                                        buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_ADD error: ";
-                                        buf << strerror(errno);
-                                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                                        func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
-                                        goto down_thread_client_send_out;
-                                }
-                                down_client_epollfd_registered = true;
+
+                if (is_epoll_edge_trigger && (!add_flag)) {
+                        if (epoll_ctl(down_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
+                                std::stringstream buf;
+                                buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_MOD error: ";
+                                buf << strerror(errno);
+                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                                boost::this_thread::yield();
+                                return;
                         }
-                        ret_fds = epoll_wait(down_client_epollfd, down_client_events, EVENT_NUM, epoll_timeout);
+                }
+
+                int ret_fds = epoll_wait(down_client_epollfd, down_client_events, EVENT_NUM, epoll_timeout);
+                if (ret_fds <= 0) {
                         if (ret_fds == 0) {
-                                goto down_thread_client_send_out; // epoll timeout
-                        } else if (ret_fds < 0) {
-                                func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
-                                goto down_thread_client_send_out;
+                                std::stringstream buf;
+                                buf << "down_thread_client_send: epoll_wait timeout ";
+                                buf << epoll_timeout;
+                                buf << "mS";
+                                Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
+                        } else {
+                                std::stringstream buf;
+                                buf << "down_thread_client_send: epoll_wait error: ";
+                                buf << strerror(errno);
+                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
                         }
-                        for (int i = 0; i < ret_fds; ++i) {
-                                if (down_client_events[i].data.fd == event.data.fd) {
-                                        if (down_client_events[i].events & EPOLLOUT) {
-                                                goto down_thread_client_send_out;
-                                        } else if (down_client_events[i].events & EPOLLHUP) {
-                                                func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
-                                                goto down_thread_client_send_out;
-                                        }
+                        down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT];
+                        return;
+                }
+
+                for (int i = 0; i < ret_fds; ++i) {
+                        if (down_client_events[i].data.fd == event.data.fd) {
+                                if (down_client_events[i].events & EPOLLOUT) {
+                                        break;
+                                } else if (down_client_events[i].events & EPOLLHUP) {
+                                        down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT];
+                                        return;
                                 }
                         }
-                        boost::format formatter("down_thread_client_send: unknown file descriptor's event: %s");
-                        formatter % strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
-                        func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
-                } else if (ec == boost::asio::error::eof) {
-                        func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+                }
+
+                send_size = !ssl_flag ? client_socket.write_some(
+                                    boost::asio::buffer(
+                                            data_buff.data() + send_data_size,
+                                            data_size - send_data_size),
+                                    ec)
+                            : client_ssl_socket.write_some(
+                                    boost::asio::buffer(
+                                            data_buff.data() + send_data_size,
+                                            data_size - send_data_size),
+                                    ec);
+                if (!ec) {
+                        send_data_size += send_size;
+                        down_thread_data_client_side.set_send_size(send_data_size);
+                        parent_service.update_down_send_size(send_size);
+                        //----Debug log----------------------------------------------------------------------
+                        if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
+                                endpoint client_endpoint // XXX redefined???
+                                = !ssl_flag ? client_socket.get_socket().lowest_layer().remote_endpoint(ec)
+                                  : client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
+                                boost::format formatter("Thread ID[%d] down_thread_client_send send data size[%d] for [%d]");
+                                formatter % boost::this_thread::get_id() % send_size % client_endpoint;
+                                Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 45, formatter.str(), __FILE__, __LINE__);
+                        }
+                        //----Debug log----------------------------------------------------------------------
+                        if (data_size > send_data_size) {
+                                func_tag = DOWN_FUNC_CLIENT_SEND;
+                                //down_send_wait.reset();
+                        } else {
+                                protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
+                                std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
+                                func_tag = func_type->second;
+                        }
+                        break;
                 } else {
-                        boost::format formatter("Thread ID[%d] client write error: %s");
-                        formatter % boost::this_thread::get_id() % ec.message();
-                        Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
-                        func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+                        if (ec == boost::asio::error::try_again) {
+                                //func_tag = DOWN_FUNC_CLIENT_SEND;
+                                //boost::this_thread::yield();
+                        } else {
+                                func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+                                break;
+                        }
                 }
         }
 
-down_thread_client_send_out:
         down_thread_next_call_function = down_thread_function_array[func_tag];
 
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
index 234caa4..882591f 100644 (file)
@@ -1721,11 +1721,11 @@ void l7vs::virtualservice_tcp::release_session(const tcp_session *session_ptr)
 
         session_thread_control *stc_ptr = active_sessions.find(session_ptr);
         if (unlikely(NULL == stc_ptr)) {
-                boost::format fmt("session release fail: "
-                        "active_sessions.find( const tcp_session* session_ptr = %d )");
-                fmt % session_ptr;
-                Logger::putLogError(LOG_CAT_L7VSD_VIRTUALSERVICE, /*XXX*/999, fmt.str(), __FILE__, __LINE__);
                 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
+                        boost::format fmt("session release fail: "
+                                "active_sessions.find( const tcp_session* session_ptr = %d )");
+                        fmt % session_ptr;
+                        Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 87, fmt.str(), __FILE__, __LINE__);
                         Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 88, "out_function: "
                                 "void virtualservice_tcp::release_session( "
                                 "const boost::thread::id thread_id )", __FILE__, __LINE__);