#include "config.hpp"
+#include <sstream>
+#include <iomanip>
#include <metaproxy/util.hpp>
#include "pipe.hpp"
#include <metaproxy/filter.hpp>
#include <yazpp/pdu-assoc.h>
#include <yazpp/socket-manager.h>
#include <yazpp/limit-connect.h>
+#include <yaz/timing.h>
#include <yaz/log.h>
#include <iostream>
int m_listen_duration;
int m_session_timeout;
int m_connect_max;
+ std::string m_msg_config;
yazpp_1::SocketManager mySocketManager;
ZAssocServer **az;
};
public:
~ZAssocChild();
ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
- mp::ThreadPoolSocketObserver *m_thread_pool_observer,
+ mp::ThreadPoolSocketObserver *m_thread_pool_observer,
const mp::Package *package,
- std::string route);
+ std::string route,
+ const char *msg_config);
int m_no_requests;
std::string m_route;
private:
mp::Origin m_origin;
bool m_delete_flag;
const mp::Package *m_package;
+ const char *m_msg_config;
};
class ThreadPoolPackage : public mp::IThreadPoolMsg {
public:
- ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses) :
- m_assoc_child(ses), m_package(package) { };
+ ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses,
+ const char *msg_config);
~ThreadPoolPackage();
IThreadPoolMsg *handle();
- void result();
+ void result(const char *t_info);
private:
+ yaz_timing_t timer;
mp::ZAssocChild *m_assoc_child;
mp::Package *m_package;
-
+ const char *m_msg_config;
};
class ZAssocServer : public yazpp_1::Z_Assoc {
public:
~ZAssocServer();
ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
- int connect_max, std::string route);
+ int connect_max, std::string route,
+ const char *msg_config);
void set_package(const mp::Package *package);
void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
private:
int m_connect_max;
yazpp_1::LimitConnect limit_connect;
std::string m_route;
+ const char *m_msg_config;
};
}
+mp::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
+ mp::ZAssocChild *ses,
+ const char *msg_config) :
+ m_assoc_child(ses), m_package(package), m_msg_config(msg_config)
+{
+ if (msg_config)
+ timer = yaz_timing_create();
+ else
+ timer = 0;
+}
+
mp::ThreadPoolPackage::~ThreadPoolPackage()
{
+ yaz_timing_destroy(&timer); // timer may be NULL
delete m_package;
}
-void mp::ThreadPoolPackage::result()
+void mp::ThreadPoolPackage::result(const char *t_info)
{
m_assoc_child->m_no_requests--;
{
m_assoc_child->close();
}
+
+ if (m_msg_config)
+ {
+ yaz_timing_stop(timer);
+ double duration = yaz_timing_get_real(timer);
+
+ std::ostringstream os;
+ os << m_msg_config << " "
+ << *m_package << " "
+ << std::fixed << std::setprecision (6) << duration;
+
+ yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
+ }
+
delete this;
}
mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
- mp::ThreadPoolSocketObserver *my_thread_pool,
+ mp::ThreadPoolSocketObserver *my_thread_pool,
const mp::Package *package,
- std::string route)
- : Z_Assoc(PDU_Observable)
+ std::string route,
+ const char *msg_config)
+ : Z_Assoc(PDU_Observable), m_msg_config(msg_config)
{
m_thread_pool_observer = my_thread_pool;
m_no_requests = 0;
mp::Package *p = new mp::Package(m_session, m_origin);
- mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
+ mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
+ m_msg_config);
p->copy_route(*m_package);
p->request() = yazpp_1::GDU(z_pdu);
m_thread_pool_observer->put(tp);
mp::Package *p = new mp::Package(m_session, m_origin);
- mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
+ mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
+ m_msg_config);
p->copy_route(*m_package);
m_thread_pool_observer->put(tp);
}
mp::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
int timeout, int connect_max,
- std::string route)
+ std::string route, const char *msg_config)
: Z_Assoc(PDU_Observable), m_session_timeout(timeout),
- m_connect_max(connect_max), m_route(route)
+ m_connect_max(connect_max), m_route(route), m_msg_config(msg_config)
{
m_package = 0;
}
}
mp::ZAssocChild *my =
new mp::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
- m_package, m_route);
+ m_package, m_route, m_msg_config);
my->timeout(m_session_timeout);
return my;
}
{
m_p->m_connect_max = mp::xml::get_int(ptr, 0);
}
+ else if (!strcmp((const char *) ptr->name, "message"))
+ {
+ m_p->m_msg_config = mp::xml::get_text(ptr);
+ }
else
{
throw mp::filter::FilterException("Bad element "
m_p->az[i] = new mp::ZAssocServer(as,
m_p->m_session_timeout,
m_p->m_connect_max,
- m_p->m_ports[i].route);
+ m_p->m_ports[i].route,
+ m_p->m_msg_config.length() > 0 ?
+ m_p->m_msg_config.c_str() : 0);
if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
{
throw mp::filter::FilterException("Unable to bind to address "
std::deque<IThreadPoolMsg *> m_output;
bool m_stop_flag;
unsigned m_no_threads;
+ unsigned m_no_threads_waiting;
};
const unsigned int queue_size_per_thread = 64;
}
m_p->m_stop_flag = false;
m_p->m_no_threads = no_threads;
+ m_p->m_no_threads_waiting = 0;
int i;
for (i = 0; i<no_threads; i++)
{
out = m_p->m_output.front();
m_p->m_output.pop_front();
}
+
+
if (out)
- out->result();
+ {
+ std::ostringstream os;
+ {
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ os << "tbusy/total " <<
+ m_p->m_no_threads - m_p->m_no_threads_waiting <<
+ "/" << m_p->m_no_threads
+ << " queue in/out " << m_p->m_input.size() << "/"
+ << m_p->m_output.size();
+ }
+ out->result(os.str().c_str());
+ }
}
}
IThreadPoolMsg *in = 0;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ m_p->m_no_threads_waiting++;
while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
m_p->m_cond_input_data.wait(input_lock);
+ m_p->m_no_threads_waiting--;
if (m_p->m_stop_flag)
break;