~ThreadPoolPackage();
IThreadPoolMsg *handle();
void result(const char *t_info);
-
+ bool cleanup(void *info);
private:
yaz_timing_t timer;
mp::ZAssocChild *m_assoc_child;
delete m_package;
}
+bool mp::ThreadPoolPackage::cleanup(void *info)
+{
+ mp::Session *ses = (mp::Session *) info;
+
+ return *ses == m_package->session();
+}
+
void mp::ThreadPoolPackage::result(const char *t_info)
{
m_assoc_child->m_no_requests--;
mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
m_msg_config);
p->copy_route(*m_package);
- m_thread_pool_observer->put(tp);
+ m_thread_pool_observer->cleanup(tp, &m_session);
+ m_thread_pool_observer->put(tp);
}
void mp::ZAssocChild::timeoutNotify()
public:
mp::IThreadPoolMsg *handle();
void result(const char *t_info);
+ bool cleanup(void *info);
int m_val;
My_Timer_Thread *m_timer;
};
return res;
}
+bool My_Msg::cleanup(void *info)
+{
+ return false;
+}
+
void My_Msg::result(const char *t_info)
{
m_timer->m_sum += m_val;
}
}
+void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info)
+{
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
+ std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
+ while (it != m_p->m_input.end())
+ {
+ if ((*it)->cleanup(info))
+ it = m_p->m_input.erase(it);
+ else
+ it++;
+ }
+}
+
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
m_p->m_input.push_back(m);
m_p->m_cond_input_data.notify_one();
}
+
/*
* Local variables:
* c-basic-offset: 4
virtual IThreadPoolMsg *handle() = 0;
virtual void result(const char *info) = 0;
virtual ~IThreadPoolMsg();
+ virtual bool cleanup(void *info) = 0;
};
class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
int no_threads);
virtual ~ThreadPoolSocketObserver();
void put(IThreadPoolMsg *m);
+ void cleanup(IThreadPoolMsg *m, void *info);
IThreadPoolMsg *get();
void run(void *p);
private: