z-grs.h z-mterm2.h z-opac.h z-rrf1.h z-rrf2.h z-sum.h z-sutrs.h z-uifr1.h \
z-univ.h z-oclcui.h zes-expi.h zes-exps.h zes-order.h zes-pquery.h \
zes-psched.h zes-admin.h zes-pset.h zes-update.h zes-update0.h \
- zoom.h z-charneg.h charneg.h soap.h srw.h zgdu.h matchstr.h
+ zoom.h z-charneg.h charneg.h soap.h srw.h zgdu.h matchstr.h \
+ sock_man.h srv.h
EXTRA_DIST = yaz-version.h.in
void yaz_sock_man_destroy(yaz_sock_man_t man);
YAZ_EXPORT
-yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t srv, int fd, void *data,
+yaz_sock_chan_t yaz_sock_man_wait(yaz_sock_man_t man);
+
+YAZ_EXPORT
+yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t man, int fd, void *data,
unsigned mask);
YAZ_EXPORT
-void yaz_sock_chan_destroy(yaz_sock_man_t srv, yaz_sock_chan_t p);
+void yaz_sock_chan_destroy(yaz_sock_chan_t p);
YAZ_EXPORT
void yaz_sock_chan_set_mask(yaz_sock_chan_t chan, unsigned mask);
--- /dev/null
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data.
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Index Data nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * \file
+ * \brief socket manager
+ */
+#ifndef YAZ_SRV_H
+#define YAZ_SRV_H
+
+#include <stddef.h>
+#include <yaz/poll.h>
+#include <yaz/zgdu.h>
+
+YAZ_BEGIN_CDECL
+
+typedef struct yaz_srv_s *yaz_srv_t;
+typedef struct yaz_pkg_s *yaz_pkg_t;
+
+struct cs_session;
+
+typedef void (yaz_srv_gdu_handler_t)(yaz_pkg_t pkg, void *user);
+typedef void *(yaz_srv_session_handler_t)(struct cs_session *cs);
+
+YAZ_EXPORT
+yaz_srv_t yaz_srv_create(const char **listeners_str);
+
+YAZ_EXPORT
+void yaz_srv_destroy(yaz_srv_t p);
+
+YAZ_EXPORT
+void yaz_srv_run(yaz_srv_t p, yaz_srv_session_handler_t *session_handler,
+ yaz_srv_gdu_handler_t *gdu_handler);
+
+YAZ_EXPORT
+void yaz_pkg_destroy(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+Z_GDU **yaz_pkg_get_gdu(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+ODR yaz_pkg_get_odr(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+void yaz_pkg_stop_server(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+void yaz_pkg_close(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+yaz_pkg_t yaz_pkg_create(yaz_pkg_t request_pkg);
+
+YAZ_EXPORT
+Z_GDU *zget_wrap_APDU(ODR o, Z_APDU *apdu);
+
+YAZ_EXPORT
+void yaz_pkg_send(yaz_pkg_t pkg);
+
+YAZ_END_CDECL
+
+#endif
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
--- /dev/null
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data.
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Index Data nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * \file
+ * \brief socket manager
+ */
+#ifndef YAZ_TPOOL_H
+#define YAZ_TPOOL_H
+
+#include <stddef.h>
+
+
+YAZ_BEGIN_CDECL
+
+typedef struct yaz_tpool_s *yaz_tpool_t;
+
+YAZ_EXPORT
+void yaz_tpool_add(yaz_tpool_t p, void *data);
+
+YAZ_EXPORT
+void yaz_tpool_destroy(yaz_tpool_t p);
+
+YAZ_EXPORT
+yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data),
+ void (*work_destroy)(void *work_data),
+ size_t no_threads);
+
+YAZ_END_CDECL
+
+#endif
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
copy_types.c match_glob.c poll.c daemon.c \
iconv_encode_marc8.c iconv_encode_iso_8859_1.c iconv_encode_wchar.c \
iconv_decode_marc8.c iconv_decode_iso5426.c iconv_decode_danmarc.c sc.c \
- sock_man.c nanohttp.c
+ sock_man.c srv.c tpool.c
libyaz_la_LDFLAGS=-version-info $(YAZ_VERSION_INFO)
+++ /dev/null
-/* This file is part of the YAZ toolkit.
- * Copyright (C) 1995-2009 Index Data
- * See the file LICENSE for details.
- */
-/**
- * \file
- * \brief Small HTTP server
- */
-
-#include <yaz/zgdu.h>
-#include <yaz/comstack.h>
-#include <yaz/nmem.h>
-#include <yaz/log.h>
-#include <yaz/sock_man.h>
-#include <assert.h>
-
-typedef struct yaz_nano_srv_s *yaz_nano_srv_t;
-typedef struct yaz_nano_pkg_s *yaz_nano_pkg_t;
-
-struct yaz_nano_pkg_s {
- void *handle;
- int listener_id;
- ODR encode_odr;
- Z_GDU *request_gdu;
- Z_GDU *response_gdu;
-};
-
-struct yaz_nano_srv_s {
- COMSTACK *cs_listeners;
- size_t num_listeners;
- NMEM nmem;
- yaz_sock_man_t sock_man;
-};
-
-void yaz_nano_srv_destroy(yaz_nano_srv_t p)
-{
- if (p)
- {
- size_t i;
- for (i = 0; i < p->num_listeners; i++)
- if (p->cs_listeners[i])
- cs_close(p->cs_listeners[i]);
- yaz_sock_man_destroy(p->sock_man);
- nmem_destroy(p->nmem);
- }
-}
-
-yaz_nano_srv_t yaz_nano_srv_create(const char **listeners_str)
-{
- NMEM nmem = nmem_create();
- yaz_nano_srv_t p = nmem_malloc(nmem, sizeof(*p));
- size_t i;
- for (i = 0; listeners_str[i]; i++)
- ;
- p->nmem = nmem;
- p->num_listeners = i;
- p->cs_listeners =
- nmem_malloc(nmem, p->num_listeners * sizeof(*p->cs_listeners));
- for (i = 0; i < p->num_listeners; i++)
- {
- void *ap;
- const char *where = listeners_str[i];
- COMSTACK l = cs_create_host(where, 2, &ap);
- p->cs_listeners[i] = 0; /* not OK (yet) */
- if (!l)
- {
- yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_create_host(%s) failed", where);
- }
- else
- {
- if (cs_bind(l, ap, CS_SERVER) < 0)
- {
- if (cs_errno(l) == CSYSERR)
- yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to bind to %s", where);
- else
- yaz_log(YLOG_FATAL, "Failed to bind to %s: %s", where,
- cs_strerror(l));
- cs_close(l);
- }
- else
- p->cs_listeners[i] = l; /* success */
- }
- }
- p->sock_man = yaz_sock_man_new();
-
- /* check if all are OK */
- for (i = 0; i < p->num_listeners; i++)
- if (!p->cs_listeners[i])
- {
- yaz_nano_srv_destroy(p);
- return 0;
- }
-
- for (i = 0; i < p->num_listeners; i++)
- {
- yaz_sock_chan_t chan;
-
- chan = yaz_sock_chan_new(p->sock_man, cs_fileno(p->cs_listeners[i]),
- p->cs_listeners + i,
- yaz_poll_read | yaz_poll_except);
- }
- return p;
-}
-
-Z_GDU *yaz_nano_pkg_req(yaz_nano_pkg_t pkg)
-{
- return pkg->request_gdu;
-}
-
-Z_GDU *yaz_nano_pkg_response(yaz_nano_pkg_t pkg)
-{
- return pkg->response_gdu;
-}
-
-ODR yaz_nano_pkg_encode(yaz_nano_pkg_t pkg)
-{
- return pkg->encode_odr;
-}
-
-int yaz_nano_pkg_listener_id(yaz_nano_pkg_t pkg)
-{
- return pkg->listener_id;
-}
-
-yaz_nano_pkg_t yaz_nano_srv_get_pkg(yaz_nano_srv_t p)
-{
- return 0;
-}
-
-void yaz_nano_srv_put_pkg(yaz_nano_srv_t p, yaz_nano_pkg_t pkg)
-{
-
-}
-
-/*
- * Local variables:
- * c-basic-offset: 4
- * c-file-style: "Stroustrup"
- * indent-tabs-mode: nil
- * End:
- * vim: shiftwidth=4 tabstop=8 expandtab
- */
-
#include <yaz/sock_man.h>
+#include <yaz/log.h>
#include <yaz/nmem.h>
#include <assert.h>
#include <sys/epoll.h>
yaz_sock_chan_t next;
yaz_sock_chan_t prev;
int fd;
- unsigned mask;
+ unsigned input_mask;
unsigned output_mask;
int max_idle;
void *data;
man->maxevents = 30;
man->event_no = 0;
man->event_ret = 0;
- man->timeout = 0;
+ man->timeout = -1;
man->rescan = 0;
man->events = nmem_malloc(nmem, man->maxevents * sizeof(*man->events));
if (man->epoll_handle == -1)
{
if (man)
{
+ while (man->chan_list)
+ {
+ yaz_log(YLOG_WARN, "yaz_sock_man_destroy: closing %p",
+ man->chan_list);
+ yaz_sock_chan_destroy(man->chan_list);
+ }
if (man->epoll_handle != -1)
close(man->epoll_handle);
- assert(man->chan_list == 0);
nmem_destroy(man->nmem);
}
}
struct epoll_event event;
event.events = 0;
- if (p->mask & yaz_poll_read)
+ if (p->input_mask & yaz_poll_read)
event.events |= EPOLLIN;
- if (p->mask & yaz_poll_write)
+ if (p->input_mask & yaz_poll_write)
event.events |= EPOLLOUT;
- if (p->mask & yaz_poll_except)
+ if (p->input_mask & yaz_poll_except)
event.events |= EPOLLERR;
event.data.ptr = p;
srv->chan_list = p;
p->fd = fd;
- p->mask = 0;
+ p->input_mask = mask;
+ p->output_mask = 0;
p->data = data;
- p->max_idle = 0;
+ p->max_idle = -1;
p->man = srv;
poll_ctl(EPOLL_CTL_ADD, p);
{
if (man->rescan)
{
- int timeout = 0;
+ int timeout = -1;
yaz_sock_chan_t p;
for (p = man->chan_list; p; p = p->next)
- if (p->max_idle && (timeout == 0 || p->max_idle < timeout))
+ if (p->max_idle != -1 && (timeout == -1 || p->max_idle < timeout))
timeout = p->max_idle;
man->timeout = timeout;
man->rescan = 0;
}
}
-void yaz_sock_chan_destroy(yaz_sock_man_t srv, yaz_sock_chan_t p)
+void yaz_sock_chan_destroy(yaz_sock_chan_t p)
{
+ yaz_sock_man_t srv = p->man;
if (p->prev)
p->prev->next = p->next;
else
}
man->timeout_list = 0; /* no more timeout events */
}
- assert(man->timeout_list = 0);
+ assert(man->timeout_list == 0);
assert(man->event_no <= man->event_ret);
if (man->event_no == man->event_ret)
{ /* must wait again */
void yaz_sock_chan_set_mask(yaz_sock_chan_t chan, unsigned mask)
{
- if (chan->mask != mask)
+ if (chan->input_mask != mask)
{
- chan->mask = mask;
+ chan->input_mask = mask;
poll_ctl(EPOLL_CTL_MOD, chan);
}
}
unsigned yaz_sock_get_mask(yaz_sock_chan_t chan)
{
- return chan->mask;
+ return chan->output_mask;
}
void *yaz_sock_chan_get_data(yaz_sock_chan_t chan)
--- /dev/null
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data
+ * See the file LICENSE for details.
+ */
+/**
+ * \file
+ * \brief Small HTTP server
+ */
+
+#include <yaz/zgdu.h>
+#include <yaz/comstack.h>
+#include <yaz/nmem.h>
+#include <yaz/log.h>
+#include <yaz/sock_man.h>
+#include <yaz/tpool.h>
+#include <assert.h>
+#include <yaz/srv.h>
+
+
+enum cs_ses_type {
+ cs_ses_type_listener,
+ cs_ses_type_accepting,
+ cs_ses_type_normal
+};
+
+struct cs_session {
+ enum cs_ses_type type;
+ COMSTACK cs;
+ yaz_sock_chan_t chan;
+ unsigned cs_put_mask;
+ unsigned cs_get_mask;
+ char *input_buffer;
+ int input_len;
+ void *user;
+};
+
+struct yaz_pkg_s {
+ Z_GDU *gdu;
+ ODR odr;
+ struct cs_session *ses;
+ yaz_srv_t srv;
+};
+
+struct yaz_srv_s {
+ struct cs_session *listeners;
+ size_t num_listeners;
+ NMEM nmem;
+ yaz_sock_man_t sock_man;
+ yaz_tpool_t tpool;
+ int stop_flag;
+ yaz_srv_session_handler_t *session_handler;
+ yaz_srv_gdu_handler_t *gdu_handler;
+};
+
+static void cs_session_init(struct cs_session *ses, enum cs_ses_type type)
+{
+ ses->type = type;
+ ses->cs = 0;
+ ses->chan = 0;
+ ses->cs_put_mask = 0;
+ ses->cs_get_mask = yaz_poll_read;
+ ses->input_buffer = 0;
+ ses->input_len = 0;
+}
+
+static void cs_session_destroy(struct cs_session *ses)
+{
+ xfree(ses->input_buffer);
+ if (ses->chan)
+ yaz_sock_chan_destroy(ses->chan);
+ if (ses->cs)
+ cs_close(ses->cs);
+}
+
+void yaz_srv_destroy(yaz_srv_t p)
+{
+ if (p)
+ {
+ size_t i;
+
+ yaz_tpool_destroy(p->tpool);
+ for (i = 0; i < p->num_listeners; i++)
+ {
+ cs_session_destroy(p->listeners + i);
+ }
+ yaz_sock_man_destroy(p->sock_man);
+ nmem_destroy(p->nmem);
+ }
+}
+
+yaz_srv_t yaz_srv_create(const char **listeners_str)
+{
+ NMEM nmem = nmem_create();
+ yaz_srv_t p = nmem_malloc(nmem, sizeof(*p));
+ size_t i;
+ for (i = 0; listeners_str[i]; i++)
+ ;
+ p->nmem = nmem;
+
+ p->stop_flag = 0;
+ p->session_handler = 0;
+ p->gdu_handler = 0;
+ p->num_listeners = i;
+ p->listeners =
+ nmem_malloc(nmem, p->num_listeners * sizeof(*p->listeners));
+ p->sock_man = yaz_sock_man_new();
+ p->tpool = 0;
+ for (i = 0; i < p->num_listeners; i++)
+ {
+ void *ap;
+ const char *where = listeners_str[i];
+ COMSTACK l = cs_create_host(where, CS_FLAGS_NUMERICHOST, &ap);
+
+ cs_session_init(p->listeners +i, cs_ses_type_listener);
+ if (!l)
+ {
+ yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_create_host(%s) failed", where);
+ }
+ else
+ {
+ if (cs_bind(l, ap, CS_SERVER) < 0)
+ {
+ if (cs_errno(l) == CSYSERR)
+ yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to bind to %s", where);
+ else
+ yaz_log(YLOG_FATAL, "Failed to bind to %s: %s", where,
+ cs_strerror(l));
+ cs_close(l);
+ }
+ else
+ {
+ p->listeners[i].cs = l; /* success */
+ p->listeners[i].chan =
+ yaz_sock_chan_new(p->sock_man,
+ cs_fileno(l),
+ p->listeners + i,
+ yaz_poll_read | yaz_poll_except);
+ }
+ }
+ }
+
+ /* check if all are OK */
+ for (i = 0; i < p->num_listeners; i++)
+ if (!p->listeners[i].cs)
+ {
+ yaz_srv_destroy(p);
+ return 0;
+ }
+ return p;
+}
+
+static void new_session(yaz_srv_t p, COMSTACK new_line)
+{
+ struct cs_session *ses = xmalloc(sizeof(*ses));
+ unsigned mask =
+ ((new_line->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) |
+ ((new_line->io_pending & CS_WANT_READ) ? yaz_poll_read : 0);
+
+ if (mask)
+ {
+ yaz_log(YLOG_LOG, "type accepting");
+ cs_session_init(ses, cs_ses_type_accepting);
+ }
+ else
+ {
+ yaz_log(YLOG_LOG, "type normal");
+ cs_session_init(ses, cs_ses_type_normal);
+ mask = yaz_poll_read;
+ ses->user = p->session_handler(ses);
+ }
+ ses->cs = new_line;
+ ses->chan = yaz_sock_chan_new(p->sock_man, cs_fileno(new_line), ses, mask);
+}
+
+void yaz_pkg_destroy(yaz_pkg_t pkg)
+{
+ if (pkg)
+ {
+ odr_destroy(pkg->odr);
+ xfree(pkg);
+ }
+}
+
+void work_handler(void *data)
+{
+ yaz_pkg_t pkg = (yaz_pkg_t) data;
+
+ pkg->srv->gdu_handler(pkg, pkg->ses->user);
+ yaz_pkg_destroy(pkg);
+}
+
+void work_destroy(void *data)
+{
+ yaz_pkg_t pkg = (yaz_pkg_t) data;
+ yaz_pkg_destroy(pkg);
+}
+
+
+void yaz_srv_run(yaz_srv_t p, yaz_srv_session_handler_t session_handler,
+ yaz_srv_gdu_handler_t gdu_handler)
+{
+ yaz_sock_chan_t chan;
+
+ p->session_handler = session_handler;
+ p->gdu_handler = gdu_handler;
+
+ assert(!p->tpool);
+ p->tpool = yaz_tpool_create(work_handler, work_destroy, 20);
+ while ((chan = yaz_sock_man_wait(p->sock_man)))
+ {
+ unsigned output_mask = yaz_sock_get_mask(chan);
+ struct cs_session *ses = yaz_sock_chan_get_data(chan);
+
+ if (p->stop_flag)
+ break;
+ switch (ses->type)
+ {
+ case cs_ses_type_listener:
+ if (yaz_sock_get_mask(chan) & yaz_poll_read)
+ {
+ int ret = cs_listen(ses->cs, 0, 0);
+ if (ret < 0)
+ {
+ yaz_log(YLOG_WARN|YLOG_ERRNO, "listen failed");
+ }
+ else if (ret == 1)
+ {
+ yaz_log(YLOG_WARN, "cs_listen incomplete");
+ }
+ else
+ {
+ COMSTACK new_line = cs_accept(ses->cs);
+ if (new_line)
+ {
+ yaz_log(YLOG_LOG, "new session");
+ new_session(p, new_line);
+ }
+ else
+ {
+ yaz_log(YLOG_WARN|YLOG_ERRNO, "accept failed");
+ }
+ }
+ }
+ break;
+ case cs_ses_type_accepting:
+ if (!cs_accept(ses->cs))
+ {
+ yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_accept failed");
+ cs_session_destroy(ses);
+ xfree(ses);
+ }
+ else
+ {
+ unsigned mask =
+ ((ses->cs->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) |
+ ((ses->cs->io_pending & CS_WANT_READ) ? yaz_poll_read : 0);
+ if (mask)
+ {
+ ses->type = cs_ses_type_accepting;
+ }
+ else
+ {
+ ses->type = cs_ses_type_normal;
+ mask = yaz_poll_read;
+ }
+ yaz_sock_chan_set_mask(ses->chan, mask);
+ }
+ break;
+ case cs_ses_type_normal:
+ if ((ses->cs_put_mask & yaz_poll_read) == 0 &&
+ output_mask & ses->cs_get_mask)
+ {
+ /* receiving package */
+ unsigned new_mask = yaz_poll_read;
+ yaz_log(YLOG_LOG, "Receive");
+ do
+ {
+ int res = cs_get(ses->cs, &ses->input_buffer, &ses->input_len);
+ if (res <= 0)
+ {
+ yaz_log(YLOG_WARN, "Connection closed by client");
+ cs_session_destroy(ses);
+ xfree(ses);
+ ses = 0;
+ break;
+ }
+ else if (res == 1)
+ {
+ if (ses->cs->io_pending & CS_WANT_WRITE)
+ new_mask |= yaz_poll_write;
+ break;
+ }
+ else
+ { /* complete package */
+ yaz_pkg_t pkg = xmalloc(sizeof(*pkg));
+ yaz_log(YLOG_LOG, "COMPLETE PACKAGE");
+
+ pkg->ses = ses;
+ pkg->srv = p;
+ pkg->odr = odr_createmem(ODR_DECODE);
+ odr_setbuf(pkg->odr, ses->input_buffer, res, 0);
+ if (!z_GDU(pkg->odr, &pkg->gdu, 0, 0))
+ {
+ yaz_log(YLOG_WARN, "decoding failed");
+ odr_destroy(pkg->odr);
+ xfree(pkg);
+ }
+ else
+ {
+ yaz_tpool_add(p->tpool, pkg);
+ }
+ }
+ } while (cs_more(ses->cs));
+ yaz_sock_chan_set_mask(chan, new_mask);
+ }
+ if (ses && (output_mask & ses->cs_put_mask))
+ { /* sending package */
+ yaz_log(YLOG_LOG, "Sending");
+ }
+ }
+ }
+}
+
+Z_GDU **yaz_pkg_get_gdu(yaz_pkg_t pkg)
+{
+ return &pkg->gdu;
+}
+
+ODR yaz_pkg_get_odr(yaz_pkg_t pkg)
+{
+ return pkg->odr;
+}
+
+void yaz_pkg_close(yaz_pkg_t pkg)
+{
+ struct cs_session *ses = pkg->ses;
+ if (ses)
+ {
+ cs_session_destroy(ses);
+ xfree(ses);
+ }
+ pkg->ses = 0;
+}
+
+void yaz_pkg_stop_server(yaz_pkg_t pkg)
+{
+ pkg->srv->stop_flag = 1;
+}
+
+yaz_pkg_t yaz_pkg_create(yaz_pkg_t request_pkg)
+{
+ yaz_pkg_t pkg = xmalloc(sizeof(*pkg));
+
+ pkg->gdu = 0;
+ pkg->odr = odr_createmem(ODR_ENCODE);
+ pkg->ses = request_pkg->ses;
+ pkg->srv = request_pkg->srv;
+ return pkg;
+}
+
+Z_GDU *zget_wrap_APDU(ODR o, Z_APDU *apdu)
+{
+ Z_GDU *gdu = odr_malloc(o, sizeof(*gdu));
+ gdu->which = Z_GDU_Z3950;
+ gdu->u.z3950 = apdu;
+ return gdu;
+}
+
+void yaz_pkg_send(yaz_pkg_t pkg)
+{
+ yaz_log(YLOG_WARN, "send.. UNFINISHED");
+}
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
--- /dev/null
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data
+ * See the file LICENSE for details.
+ */
+/**
+ * \file
+ * \brief thread pool workers
+ */
+
+#include <assert.h>
+#include <yaz/nmem.h>
+#include <yaz/tpool.h>
+#include <pthread.h>
+
+struct work_item {
+ void *data;
+ struct work_item *next;
+};
+
+struct yaz_tpool_s {
+ NMEM nmem;
+ pthread_t *thread_id;
+ pthread_mutex_t mutex;
+ pthread_cond_t input_data;
+ int stop_flag;
+ size_t no_threads;
+ struct work_item *input_queue;
+ struct work_item *output_queue;
+ struct work_item *free_queue;
+ void (*work_handler)(void *work_data);
+ void (*work_destroy)(void *work_data);
+};
+
+static struct work_item *queue_remove_last(struct work_item **q)
+{
+ struct work_item **work_p = q, *work_this = 0;
+
+ while (*work_p && (*work_p)->next)
+ work_p = &(*work_p)->next;
+ if (*work_p)
+ {
+ work_this = *work_p;
+ *work_p = 0;
+ }
+ return work_this;
+}
+
+static void queue_trav(struct work_item *q, void (*f)(void *data))
+{
+ for (; q; q = q->next)
+ f(q->data);
+}
+
+void yaz_tpool_add(yaz_tpool_t p, void *data)
+{
+ struct work_item *work_p;
+
+ pthread_mutex_lock(&p->mutex);
+
+ if (p->free_queue)
+ {
+ work_p = p->free_queue;
+ p->free_queue = p->free_queue->next;
+ }
+ else
+ work_p = nmem_malloc(p->nmem, sizeof(*work_p));
+
+ work_p->data = data;
+ work_p->next = p->input_queue;
+ p->input_queue = work_p;
+
+ pthread_cond_signal(&p->input_data);
+ pthread_mutex_unlock(&p->mutex);
+}
+
+void yaz_tpool_destroy(yaz_tpool_t p)
+{
+ if (p)
+ {
+ size_t i;
+
+ pthread_mutex_lock(&p->mutex);
+ p->stop_flag = 1;
+ pthread_cond_broadcast(&p->input_data);
+ pthread_mutex_unlock(&p->mutex);
+
+ for (i = 0; i < p->no_threads; i++)
+ pthread_join(p->thread_id[i], 0);
+
+ if (p->work_destroy)
+ {
+ queue_trav(p->input_queue, p->work_destroy);
+ queue_trav(p->output_queue, p->work_destroy);
+ }
+
+ pthread_cond_destroy(&p->input_data);
+ pthread_mutex_destroy(&p->mutex);
+ nmem_destroy(p->nmem);
+ }
+}
+
+static void *tpool_thread_handler(void *vp)
+{
+ yaz_tpool_t p = (yaz_tpool_t) vp;
+ while (1)
+ {
+ struct work_item *work_this = 0;
+ /* wait for some work */
+ pthread_mutex_lock(&p->mutex);
+ while (!p->stop_flag && !p->input_queue)
+ pthread_cond_wait(&p->input_data, &p->mutex);
+ /* see if we were waken up because we're shutting down */
+ if (p->stop_flag)
+ break;
+ /* got something. Take the last one out of input_queue */
+ assert(p->input_queue);
+ work_this = queue_remove_last(&p->input_queue);
+ assert(work_this);
+
+ pthread_mutex_unlock(&p->mutex);
+
+ /* work on this item */
+ p->work_handler(work_this->data);
+ }
+ pthread_mutex_unlock(&p->mutex);
+ return 0;
+}
+
+yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data),
+ void (*work_destroy)(void *work_data),
+ size_t no_threads)
+{
+ NMEM nmem = nmem_create();
+ yaz_tpool_t p = nmem_malloc(nmem, sizeof(*p));
+ size_t i;
+ p->nmem = nmem;
+ p->stop_flag = 0;
+ p->no_threads = no_threads;
+
+ p->input_queue = 0;
+ p->output_queue = 0;
+ p->free_queue = 0;
+
+ p->work_handler = work_handler;
+ p->work_destroy = work_destroy;
+
+ pthread_mutex_init(&p->mutex, 0);
+ pthread_cond_init(&p->input_data, 0);
+
+ p->thread_id = nmem_malloc(p->nmem, sizeof(*p->thread_id) * p->no_threads);
+ for (i = 0; i < p->no_threads; i++)
+ pthread_create (p->thread_id + i, 0, tpool_thread_handler, p);
+ return p;
+}
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
tstccl tstlog tstcomstack \
tstsoap1 tstsoap2 tstodrstack tstlogthread tstxmlquery tstpquery \
tst_comstack tst_filepath tst_record_conv tst_retrieval tst_tpath \
- tst_timing tst_query_charset tst_oid tst_icu_I18N tst_match_glob tst_rpn2cql
+ tst_timing tst_query_charset tst_oid tst_icu_I18N tst_match_glob \
+ tst_rpn2cql tst_srv
check_SCRIPTS = tstmarc.sh tstmarccol.sh tstcql2xcql.sh tstcql2pqf.sh tsticu.sh
TESTS = $(check_PROGRAMS) $(check_SCRIPTS)
cd $(srcdir); $(YAZCOMP) tstodr.asn
LDADD = ../src/libyaz.la
-tst_icu_I18N_LDADD = ../src/libyaz_icu.la $(ICU_LIBS)
+tst_icu_I18N_LDADD = ../src/libyaz.la ../src/libyaz_icu.la $(ICU_LIBS)
CONFIG_CLEAN_FILES=*.log
tst_icu_I18N_SOURCES = tst_icu_I18N.c
tst_match_glob_SOURCES = tst_match_glob.c
tst_rpn2cql_SOURCES = tst_rpn2cql.c
+tst_srv_SOURCES = tst_srv.c
+tst_srv_LDADD = ../src/libyaz.la $(PTHREAD_LIBS)
--- /dev/null
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data
+ * See the file LICENSE for details.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <yaz/log.h>
+#include <yaz/test.h>
+#include <yaz/srv.h>
+#include <yaz/odr.h>
+#include <yaz/proto.h>
+
+struct my_info {
+ int x;
+};
+
+static void *create_session(struct cs_session *ses)
+{
+ struct my_info *my = xmalloc(sizeof(*my));
+ my->x = 42;
+ yaz_log(YLOG_LOG, "create_session");
+ return my;
+}
+
+static void gdu_handler(yaz_pkg_t pkg, void *user)
+{
+ struct my_info *my = user;
+ Z_GDU **gdu = yaz_pkg_get_gdu(pkg);
+ ODR o = odr_createmem(ODR_PRINT);
+
+ yaz_log(YLOG_LOG, "gdu_handler");
+ YAZ_CHECK_EQ(my->x, 42);
+
+ z_GDU(o, gdu, 0, 0);
+ odr_destroy(o);
+
+ if ((*gdu)->which == Z_GDU_Z3950)
+ {
+ ODR encode = odr_createmem(ODR_ENCODE);
+ Z_APDU *apdu_req = (*gdu)->u.z3950;
+ Z_APDU *apdu_res = 0;
+ int must_close = 0;
+
+ if (apdu_req->which == Z_APDU_close)
+ {
+ apdu_res = zget_APDU(encode, Z_APDU_close);
+ *apdu_res->u.close->closeReason = Z_Close_finished;
+ must_close = 1;
+ }
+ else if (apdu_req->which == Z_APDU_initRequest)
+ {
+ apdu_res = zget_APDU(encode, Z_APDU_initResponse);
+ }
+ else
+ {
+ apdu_res = zget_APDU(encode, Z_APDU_close);
+
+ *apdu_res->u.close->closeReason = Z_Close_unspecified;
+ must_close = 1;
+ }
+ if (apdu_res)
+ {
+ yaz_pkg_t pkg_res = yaz_pkg_create(pkg);
+ *yaz_pkg_get_gdu(pkg) = zget_wrap_APDU(encode, apdu_res);
+ yaz_pkg_send(pkg_res);
+ }
+ if (must_close)
+ yaz_pkg_close(pkg);
+ yaz_pkg_stop_server(pkg);
+ }
+}
+
+static void tst_srv(void)
+{
+ const char *listeners[] = {"unix:socket", 0};
+
+ yaz_srv_t srv = yaz_srv_create(listeners);
+ YAZ_CHECK(srv);
+ if (!srv)
+ return;
+
+ yaz_srv_run(srv, create_session, gdu_handler);
+ yaz_srv_destroy(srv);
+}
+
+int main (int argc, char **argv)
+{
+ YAZ_CHECK_INIT(argc, argv);
+ YAZ_CHECK_LOG();
+ /* tst_srv(); */
+ YAZ_CHECK_TERM;
+}
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+