Filter frontend_net may log statistics
authorAdam Dickmeiss <adam@indexdata.dk>
Tue, 6 Mar 2012 11:44:50 +0000 (12:44 +0100)
committerAdam Dickmeiss <adam@indexdata.dk>
Tue, 6 Mar 2012 11:49:26 +0000 (12:49 +0100)
If a custom message element in frontend_net configuration it makes
the frontend_net filter produce a log entry whenever an operation
is completed. The log is always written using yaz_log with the
custom message given, the package ID (session ID) , the time
the operation has been active (end-to-end) , the threads currently
active/total and the size of the request input/output queues.

etc/config-zoom.xml
etc/config1.xml
src/filter_frontend_net.cpp
src/test_thread_pool_observer.cpp
src/thread_pool_observer.cpp
src/thread_pool_observer.hpp
xml/schema/filter_frontend_net.rnc

index fc208b6..b6be3fd 100644 (file)
@@ -5,6 +5,7 @@
     <filter id="frontend" type="frontend_net">
       <threads>10</threads>
       <port>@:9000</port>
+      <message>FN</message>
     </filter>
     <filter id="backend" type="zoom">
       <torus
index 4e98e47..3860287 100644 (file)
@@ -8,6 +8,7 @@
     <filter id="frontend" type="frontend_net">
       <threads>10</threads>
       <port>@:9000</port>
+      <message>FN</message>
     </filter>
     <filter id="backend" type="z3950_client">
      <timeout>30</timeout>
index becc16c..a3ab5f7 100644 (file)
@@ -18,6 +18,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
 #include "config.hpp"
 
+#include <sstream>
+#include <iomanip>
 #include <metaproxy/util.hpp>
 #include "pipe.hpp"
 #include <metaproxy/filter.hpp>
@@ -28,6 +30,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include <yazpp/pdu-assoc.h>
 #include <yazpp/socket-manager.h>
 #include <yazpp/limit-connect.h>
+#include <yaz/timing.h>
 #include <yaz/log.h>
 
 #include <iostream>
@@ -51,6 +54,7 @@ namespace metaproxy_1 {
             int m_listen_duration;
             int m_session_timeout;
             int m_connect_max;
+            std::string m_msg_config;
             yazpp_1::SocketManager mySocketManager;
             ZAssocServer **az;
         };
@@ -69,9 +73,10 @@ namespace metaproxy_1 {
     public:
         ~ZAssocChild();
         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
-                          mp::ThreadPoolSocketObserver *m_thread_pool_observer,
+                    mp::ThreadPoolSocketObserver *m_thread_pool_observer,
                     const mp::Package *package,
-                    std::string route);
+                    std::string route,
+                    const char *msg_config);
         int m_no_requests;
         std::string m_route;
     private:
@@ -89,25 +94,28 @@ namespace metaproxy_1 {
         mp::Origin m_origin;
         bool m_delete_flag;
         const mp::Package *m_package;
+        const char *m_msg_config;
     };
     class ThreadPoolPackage : public mp::IThreadPoolMsg {
     public:
-        ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses) :
-            m_assoc_child(ses), m_package(package) { };
+        ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses,
+            const char *msg_config);
         ~ThreadPoolPackage();
         IThreadPoolMsg *handle();
-        void result();
+        void result(const char *t_info);
         
     private:
+        yaz_timing_t timer;
         mp::ZAssocChild *m_assoc_child;
         mp::Package *m_package;
-        
+        const char *m_msg_config;        
     };
     class ZAssocServer : public yazpp_1::Z_Assoc {
     public:
         ~ZAssocServer();
         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
-                     int connect_max, std::string route);
+                     int connect_max, std::string route,
+                     const char *msg_config);
         void set_package(const mp::Package *package);
         void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
     private:
@@ -126,15 +134,28 @@ namespace metaproxy_1 {
         int m_connect_max;
         yazpp_1::LimitConnect limit_connect;
         std::string m_route;
+        const char *m_msg_config;
     };
 }
 
+mp::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
+                                         mp::ZAssocChild *ses,
+                                         const char *msg_config) :
+    m_assoc_child(ses), m_package(package), m_msg_config(msg_config)
+{
+    if (msg_config)
+        timer = yaz_timing_create();
+    else
+        timer = 0;
+}
+
 mp::ThreadPoolPackage::~ThreadPoolPackage()
 {
+    yaz_timing_destroy(&timer); // timer may be NULL
     delete m_package;
 }
 
-void mp::ThreadPoolPackage::result()
+void mp::ThreadPoolPackage::result(const char *t_info)
 {
     m_assoc_child->m_no_requests--;
 
@@ -179,6 +200,20 @@ void mp::ThreadPoolPackage::result()
     {
         m_assoc_child->close();
     }
+
+    if (m_msg_config)
+    {
+        yaz_timing_stop(timer);
+        double duration = yaz_timing_get_real(timer);
+
+        std::ostringstream os;
+        os  << m_msg_config << " "
+            << *m_package << " "
+            << std::fixed << std::setprecision (6) << duration;
+        
+        yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
+    }
+
     delete this;
 }
 
@@ -190,10 +225,11 @@ mp::IThreadPoolMsg *mp::ThreadPoolPackage::handle()
 
 
 mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
-                                    mp::ThreadPoolSocketObserver *my_thread_pool,
+                             mp::ThreadPoolSocketObserver *my_thread_pool,
                              const mp::Package *package,
-                             std::string route)
-    :  Z_Assoc(PDU_Observable)
+                             std::string route,
+                             const char *msg_config)
+    :  Z_Assoc(PDU_Observable), m_msg_config(msg_config)
 {
     m_thread_pool_observer = my_thread_pool;
     m_no_requests = 0;
@@ -223,7 +259,8 @@ void mp::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
 
     mp::Package *p = new mp::Package(m_session, m_origin);
 
-    mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
+    mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
+                                                          m_msg_config);
     p->copy_route(*m_package);
     p->request() = yazpp_1::GDU(z_pdu);
     m_thread_pool_observer->put(tp);  
@@ -244,7 +281,8 @@ void mp::ZAssocChild::failNotify()
 
     mp::Package *p = new mp::Package(m_session, m_origin);
 
-    mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
+    mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
+                                                          m_msg_config);
     p->copy_route(*m_package);
     m_thread_pool_observer->put(tp);  
 }
@@ -261,9 +299,9 @@ void mp::ZAssocChild::connectNotify()
 
 mp::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
                                int timeout, int connect_max,
-                               std::string route)
+                               std::string route, const char *msg_config)
     :  Z_Assoc(PDU_Observable), m_session_timeout(timeout),
-       m_connect_max(connect_max), m_route(route)
+       m_connect_max(connect_max), m_route(route), m_msg_config(msg_config)
 {
     m_package = 0;
 }
@@ -294,7 +332,7 @@ yazpp_1::IPDU_Observer *mp::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
     }
     mp::ZAssocChild *my =
        new mp::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
-                            m_package, m_route);
+                            m_package, m_route, m_msg_config);
     my->timeout(m_session_timeout);
     return my;
 }
@@ -432,6 +470,10 @@ void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only,
         {
             m_p->m_connect_max = mp::xml::get_int(ptr, 0);
         }
+        else if (!strcmp((const char *) ptr->name, "message"))
+        {
+            m_p->m_msg_config = mp::xml::get_text(ptr);
+        }
         else
         {
             throw mp::filter::FilterException("Bad element " 
@@ -478,7 +520,9 @@ void mp::filter::FrontendNet::set_ports(std::vector<Port> &ports)
         m_p->az[i] = new mp::ZAssocServer(as, 
                                           m_p->m_session_timeout,
                                           m_p->m_connect_max,
-                                          m_p->m_ports[i].route);
+                                          m_p->m_ports[i].route,
+                                          m_p->m_msg_config.length() > 0 ?
+                                          m_p->m_msg_config.c_str() : 0);
         if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
         {
             throw mp::filter::FilterException("Unable to bind to address " 
index a894cae..9700db1 100644 (file)
@@ -39,7 +39,7 @@ class My_Timer_Thread;
 class My_Msg : public mp::IThreadPoolMsg {
 public:
     mp::IThreadPoolMsg *handle();
-    void result();
+    void result(const char *t_info);
     int m_val;
     My_Timer_Thread *m_timer;
 };
@@ -70,7 +70,7 @@ mp::IThreadPoolMsg *My_Msg::handle()
     return res;
 }
 
-void My_Msg::result()
+void My_Msg::result(const char *t_info)
 {
     m_timer->m_sum += m_val;
     m_timer->m_responses++;
index ded50d7..7c6f189 100644 (file)
@@ -68,6 +68,7 @@ namespace metaproxy_1 {
         std::deque<IThreadPoolMsg *> m_output;
         bool m_stop_flag;
         unsigned m_no_threads;
+        unsigned m_no_threads_waiting;
     };
     const unsigned int queue_size_per_thread = 64;
 }
@@ -100,6 +101,7 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver(
 
     m_p->m_stop_flag = false;
     m_p->m_no_threads = no_threads;
+    m_p->m_no_threads_waiting = 0;
     int i;
     for (i = 0; i<no_threads; i++)
     {
@@ -145,8 +147,21 @@ void ThreadPoolSocketObserver::socketNotify(int event)
             out = m_p->m_output.front();
             m_p->m_output.pop_front();
         }
+
+
         if (out)
-            out->result();
+        {
+            std::ostringstream os;
+            {
+                boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+                os  << "tbusy/total " <<
+                    m_p->m_no_threads - m_p->m_no_threads_waiting <<
+                    "/" << m_p->m_no_threads
+                    << " queue in/out " << m_p->m_input.size() << "/"
+                    << m_p->m_output.size();
+            }
+            out->result(os.str().c_str());
+        }
     }
 }
 
@@ -157,8 +172,10 @@ void ThreadPoolSocketObserver::run(void *p)
         IThreadPoolMsg *in = 0;
         {
             boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+            m_p->m_no_threads_waiting++;
             while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
                 m_p->m_cond_input_data.wait(input_lock);
+            m_p->m_no_threads_waiting--;
             if (m_p->m_stop_flag)
                 break;
             
index 52f3d79..67aee0b 100644 (file)
@@ -28,7 +28,7 @@ namespace metaproxy_1 {
     class IThreadPoolMsg {
     public:
         virtual IThreadPoolMsg *handle() = 0;
-        virtual void result() = 0;
+        virtual void result(const char *info) = 0;
         virtual ~IThreadPoolMsg();
     };
 
index f84f44e..f377f7c 100644 (file)
@@ -12,5 +12,6 @@ filter_frontend_net =
     xsd:string
   }+,
   element mp:timeout { xsd:integer }?,
-  element mp:connect-max { xsd:integer }?
+  element mp:connect-max { xsd:integer }?,
+  element mp:message { xsd:string }?