Logo Search packages:      
Sourcecode: sofia-sip version File versions

su_source.c

Go to the documentation of this file.
/*
 * This file is part of the Sofia-SIP package
 *
 * Copyright (C) 2005 Nokia Corporation.
 *
 * Contact: Pekka Pessi <pekka.pessi@nokia.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA
 *
 */

/**
 * @file su_source.c
 * @brief Wrapper for glib GSource.
 *
 * Refs:
 *  - http://sofia-sip.sourceforge.net/refdocs/su/group__su__wait.html
 *  - http://developer.gnome.org/doc/API/glib/glib-the-main-event-loop.html
 *
 * @author Pekka Pessi <Pekka.Pessi@nokia.com>.
 *
 * @date Created: Thu Mar  4 15:15:15 2004 ppessi
 *
 */

#include "config.h"

#include <glib.h>

#if HAVE_OPEN_C
#include <glib/gthread.h>
#include <glib_global.h>
#endif

#define SU_PORT_IMPLEMENTATION 1

#define SU_MSG_ARG_T union { char anoymous[4]; }

#define su_port_s su_source_s

#include "sofia-sip/su_source.h"
#include "sofia-sip/su_glib.h"

#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"

#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>

#if 1
#define PORT_LOCK_DEBUG(x)  ((void)0)
#else
#define PORT_LOCK_DEBUG(x)  printf x
#endif

00073 static su_port_t *su_source_port_create(void) __attribute__((__malloc__));
static gboolean su_source_prepare(GSource *gs, gint *return_tout);
static gboolean su_source_check(GSource *gs);
static gboolean su_source_dispatch(GSource *gs,
                           GSourceFunc callback,
                           gpointer user_data);
static void su_source_finalize(GSource *source);

static
GSourceFuncs su_source_funcs[1] = {{
    su_source_prepare,
    su_source_check,
    su_source_dispatch,
    su_source_finalize,
    NULL,
    NULL
  }};

static int su_source_port_init(su_port_t *self, su_port_vtable_t const *vtable);
static void su_source_port_deinit(su_port_t *self);

static void su_source_lock(su_port_t *self, char const *who);
static void su_source_unlock(su_port_t *self, char const *who);
static void su_source_incref(su_port_t *self, char const *who);
static void su_source_decref(su_port_t *self, int blocking, char const *who);
static struct _GSource *su_source_gsource(su_port_t *port);

static int su_source_send(su_port_t *self, su_msg_r rmsg);

static int su_source_register(su_port_t *self,
                      su_root_t *root,
                      su_wait_t *wait,
                      su_wakeup_f callback,
                      su_wakeup_arg_t *arg,
                      int priority);
static int su_source_unregister(su_port_t *port,
                        su_root_t *root,
                        su_wait_t *wait,
                        su_wakeup_f callback,
                        su_wakeup_arg_t *arg);
static int su_source_deregister(su_port_t *self, int i);
static int su_source_unregister_all(su_port_t *self,
                          su_root_t *root);
static int su_source_eventmask(su_port_t *self,
                       int index, int socket, int events);
static void su_source_run(su_port_t *self);
static void su_source_break(su_port_t *self);
static su_duration_t su_source_step(su_port_t *self, su_duration_t tout);
static int su_source_thread(su_port_t *self, enum su_port_thread_op op);
static int su_source_add_prepoll(su_port_t *port,
                         su_root_t *root,
                         su_prepoll_f *,
                         su_prepoll_magic_t *);
static int su_source_remove_prepoll(su_port_t *port,
                          su_root_t *root);
static int su_source_multishot(su_port_t *self, int multishot);

static char const *su_source_name(su_port_t const *self);

static
su_port_vtable_t const su_source_port_vtable[1] =
  {{
      /* su_vtable_size: */ sizeof su_source_port_vtable,
      su_source_lock,
      su_source_unlock,

      su_source_incref,
      su_source_decref,

      su_source_gsource,

      su_source_send,
      su_source_register,
      su_source_unregister,
      su_source_deregister,
      su_source_unregister_all,
      su_source_eventmask,
      su_source_run,
      su_source_break,
      su_source_step,
      su_source_thread,
      su_source_add_prepoll,
      su_source_remove_prepoll,
      su_base_port_timers,
      su_source_multishot,
      /*su_source_wait_events*/ NULL,
      su_base_port_getmsgs,
      su_base_port_getmsgs_from,
      su_source_name,
      su_base_port_start_shared,
      su_base_port_wait,
      NULL,
    }};

static char const *su_source_name(su_port_t const *self)
{
  return "GSource";
}

/**
 * Port is a per-thread reactor.
 *
 * Multiple root objects executed by single thread share a su_port_t object.
 */
00177 struct su_source_s {
  su_base_port_t   sup_base[1];

  GThread         *sup_tid;
  GStaticMutex     sup_obtained[1];

  GStaticMutex     sup_mutex[1];

00185   GSource         *sup_source;      /**< Backpointer to source */
00186   GMainLoop       *sup_main_loop; /**< Reference to mainloop while running */

  /* Waits */
  unsigned         sup_registers; /** Counter incremented by
                              su_port_register() or
                              su_port_unregister()
                          */
00193   unsigned         sup_n_waits;
  unsigned         sup_size_waits;
  unsigned         sup_max_index;
  unsigned        *sup_indices;
  su_wait_t       *sup_waits;
  su_wakeup_f     *sup_wait_cbs;
  su_wakeup_arg_t**sup_wait_args;
  su_root_t      **sup_wait_roots;
};

typedef struct _SuSource
{
  GSource    ss_source[1];
  su_port_t  ss_port[1];
} SuSource;

#define SU_SOURCE_OWN_THREAD(p)   ((p)->sup_tid == g_thread_self())

#if 1
#define SU_SOURCE_INCREF(p, f)    (g_source_ref(p->sup_source))
#define SU_SOURCE_DECREF(p, f)    (g_source_unref(p->sup_source))

#else

/* Debugging versions */
#define SU_SOURCE_INCREF(p, f)    (g_source_ref(p->sup_source), printf("incref(%p) by %s\n", (p), f))
#define SU_SOURCE_DECREF(p, f)    do { printf("decref(%p) by %s\n", (p), f), \
  g_source_unref(p->sup_source); } while(0)

#endif

#if HAVE_FUNC
#define enter (void)SU_DEBUG_9(("%s: entering\n", __func__))
#elif HAVE_FUNCTION
#define enter (void)SU_DEBUG_9(("%s: entering\n", __FUNCTION__))
#else
#define enter (void)0
#endif

/*=============== Public function definitions ===============*/

/** Create a root that uses GSource as reactor */
00235 su_root_t *su_glib_root_create(su_root_magic_t *magic)
{
  return su_root_create_with_port(magic, su_source_port_create());
}

/** Deprecated */
00241 su_root_t *su_root_source_create(su_root_magic_t *magic)
{
  return su_glib_root_create(magic);
}

/**
 * Returns a GSource object for the root
 *
 * Note that you need to unref the GSource with g_source_unref()
 * before destroying the root object.
 *
 * @return NULL on error (for instance if root was not created with
 *         su_glib_root_create())
 */
00255 GSource *su_glib_root_gsource(su_root_t *root)
{
  g_assert(root);
  return su_root_gsource(root);
}

/*=============== Private function definitions ===============*/

/** Initialize source port */
00264 static int su_source_port_init(su_port_t *self,
                         su_port_vtable_t const *vtable)
{
  GSource *gs = (GSource *)((char *)self - offsetof(SuSource, ss_port));

  self->sup_source = gs;

  g_static_mutex_init(self->sup_obtained);

  g_static_mutex_init(self->sup_mutex);

  return su_base_port_init(self, vtable);
}


static void su_source_port_deinit(su_port_t *self)
{
  su_base_port_deinit(self);

  g_static_mutex_free(self->sup_mutex);
  g_static_mutex_free(self->sup_obtained);

  if (self->sup_indices)
    free (self->sup_indices), self->sup_indices = NULL;
  if (self->sup_waits)
    free (self->sup_waits), self->sup_waits = NULL;
  if (self->sup_wait_cbs)
    free (self->sup_wait_cbs), self->sup_wait_cbs = NULL;
  if (self->sup_wait_args)
    free (self->sup_wait_args), self->sup_wait_args = NULL;
  if (self->sup_wait_roots)
    free (self->sup_wait_roots), self->sup_wait_roots = NULL;

  su_home_deinit(self->sup_base->sup_home);
}


/** @internal Destroy a port. */
static
00303 void su_source_finalize(GSource *gs)
{
  SuSource *ss = (SuSource *)gs;
  assert(gs);
  SU_DEBUG_9(("su_source_finalize() called\n"));
  su_source_port_deinit(ss->ss_port);
}

void su_source_port_lock(su_port_t *self, char const *who)
{
  PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
               (void *)g_thread_self(), who, self));

  g_static_mutex_lock(self->sup_mutex);

  PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...",
               (void *)g_thread_self(), who, self));
}

void su_source_port_unlock(su_port_t *self, char const *who)
{
  g_static_mutex_unlock(self->sup_mutex);

  PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n",
               (void *)g_thread_self(), who, self));
}

/** @internal Send a message to the port. */
00331 int su_source_send(su_port_t *self, su_msg_r rmsg)
{
  int wakeup = su_base_port_send(self, rmsg);
  GMainContext *gmc;

  if (wakeup < 0)
    return -1;
  if (wakeup == 0)
    return 0;

  gmc = g_source_get_context(self->sup_source);

  if (gmc)
    g_main_context_wakeup(gmc);

  return 0;
}

/** @internal
 *
 * Change or query ownership of the port object.
 *
 * @param self pointer to a port object
 * @param op operation
 *
 * @ERRORS
 * @ERROR EALREADY port already has an owner (or has no owner)
 */
00359 static int su_source_thread(su_port_t *self, enum su_port_thread_op op)
{
  GThread *me = g_thread_self();

  switch (op) {

  case su_port_thread_op_is_obtained:
    if (self->sup_tid == me)
      return 2;
    else if (self->sup_tid)
      return 1;
    else
      return 0;

  case su_port_thread_op_release:
    if (self->sup_tid != me)
      return errno = EALREADY, -1;
    self->sup_tid = NULL;
    g_static_mutex_unlock(self->sup_obtained);
    return 0;

  case su_port_thread_op_obtain:
    if (su_home_threadsafe(su_port_home(self)) == -1)
      return -1;
    g_static_mutex_lock(self->sup_obtained);
    self->sup_tid = me;
    return 0;

  default:
    return errno = ENOSYS, -1;
  }
}

/* -- Registering and unregistering ------------------------------------- */

/* Seconds from 1.1.1900 to 1.1.1970 */
#define NTP_EPOCH 2208988800UL

/** Prepare to wait - calculate time to next timer */
static
00399 gboolean su_source_prepare(GSource *gs, gint *return_tout)
{
  SuSource *ss = (SuSource *)gs;
  su_port_t *self = ss->ss_port;

  enter;

  if (self->sup_base->sup_head) {
    *return_tout = 0;
    return TRUE;
  }

  if (self->sup_base->sup_timers) {
    su_time_t now;
    GTimeVal  gtimeval;
    su_duration_t tout;

    g_source_get_current_time(gs, &gtimeval);
    now.tv_sec = gtimeval.tv_sec + 2208988800UL;
    now.tv_usec = gtimeval.tv_usec;

    tout = su_timer_next_expires(self->sup_base->sup_timers, now);

    *return_tout = (tout < 0 || tout > (su_duration_t)G_MAXINT)?
      -1 : (gint)tout;

    return (tout == 0);
  }

  return FALSE;
}

static
gboolean su_source_check(GSource *gs)
{
  SuSource *ss = (SuSource *)gs;
  su_port_t *self = ss->ss_port;
  gint tout;
  unsigned i, I;

  enter;

  I = self->sup_n_waits;

#if SU_HAVE_POLL
  for (i = 0; i < I; i++) {
    if (self->sup_waits[i].revents)
      return TRUE;
  }
#endif

  return su_source_prepare(gs, &tout);
}

static
gboolean su_source_dispatch(GSource *gs,
                      GSourceFunc callback,
                      gpointer user_data)
{
  SuSource *ss = (SuSource *)gs;
  su_port_t *self = ss->ss_port;

  enter;

  if (self->sup_base->sup_head)
    su_base_port_getmsgs(self);

  if (self->sup_base->sup_timers) {
    su_time_t now;
    GTimeVal  gtimeval;
    su_duration_t tout;
    int timers = 0;

    tout = SU_DURATION_MAX;

    g_source_get_current_time(gs, &gtimeval);

    now.tv_sec = gtimeval.tv_sec + 2208988800UL;
    now.tv_usec = gtimeval.tv_usec;

    timers = su_timer_expire(&self->sup_base->sup_timers, &tout, now);
  }

#if SU_HAVE_POLL
  {
    su_root_t *root;
    su_wait_t *waits = self->sup_waits;
    unsigned i, n = self->sup_n_waits;
    unsigned version = self->sup_registers;

    for (i = 0; i < n; i++) {
      if (waits[i].revents) {
      root = self->sup_wait_roots[i];
      self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
                        &waits[i],
                        self->sup_wait_args[i]);
      /* Callback used su_register()/su_unregister() */
      if (version != self->sup_registers)
        break;
      }
    }
  }
#endif

  if (!callback)
    return TRUE;

  return callback(user_data);
}

static void su_source_lock(su_port_t *self, char const *who)
{
  PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
               (void *)g_thread_self(), who, self));
  g_static_mutex_lock(self->sup_mutex);

  PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...",
               (void *)g_thread_self(), who, self));
}

static void su_source_unlock(su_port_t *self, char const *who)
{
  g_static_mutex_unlock(self->sup_mutex);

  PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n",
               (void *)g_thread_self(), who, self));
}

static void su_source_incref(su_port_t *self, char const *who)
{
  SU_SOURCE_INCREF(self, who);
}

static void su_source_decref(su_port_t *self, int blocking, char const *who)
{
  /* XXX - blocking? */
  SU_SOURCE_DECREF(self, who);
}

GSource *su_source_gsource(su_port_t *self)
{
  return self->sup_source;
}

/** @internal
 *
 *  Register a @c su_wait_t object. The wait object, a callback function and
 *  a argument pointer is stored in the port object.  The callback function
 *  will be called when the wait object is signaled.
 *
 *  Please note if identical wait objects are inserted, only first one is
 *  ever signalled.
 *
 * @param self         pointer to port
 * @param root         pointer to root object
 * @param waits        pointer to wait object
 * @param callback   callback function pointer
 * @param arg          argument given to callback function when it is invoked
 * @param priority   relative priority of the wait object
 *              (0 is normal, 1 important, 2 realtime)
 *
 * @return
 *   The function @su_source_register returns nonzero index of the wait object,
 *   or -1 upon an error.  */
00563 int su_source_register(su_port_t *self,
                   su_root_t *root,
                   su_wait_t *wait,
                   su_wakeup_f callback,
                   su_wakeup_arg_t *arg,
                   int priority)
{
  unsigned i, j, I;
  unsigned n;

  enter;

  assert(SU_SOURCE_OWN_THREAD(self));

  n = self->sup_n_waits;

  if (n >= self->sup_size_waits) {
    /* Reallocate size arrays */
    unsigned size;
    unsigned *indices;
    su_wait_t *waits;
    su_wakeup_f *wait_cbs;
    su_wakeup_arg_t **wait_args;
    su_root_t **wait_tasks;

    if (self->sup_size_waits == 0)
      size = SU_WAIT_MIN;
    else
      size = 2 * self->sup_size_waits;

    indices = realloc(self->sup_indices, size * sizeof(*indices));
    if (indices) {
      self->sup_indices = indices;

      for (i = self->sup_size_waits; i < size; i++)
      indices[i] = UINT_MAX;
    }

    for (i = 0; i < self->sup_n_waits; i++)
      g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[i]);

    waits = realloc(self->sup_waits, size * sizeof(*waits));
    if (waits)
      self->sup_waits = waits;

    for (i = 0; i < self->sup_n_waits; i++)
      g_source_add_poll(self->sup_source, (GPollFD*)&waits[i]);

    wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs));
    if (wait_cbs)
      self->sup_wait_cbs = wait_cbs;

    wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args));
    if (wait_args)
      self->sup_wait_args = wait_args;

    /* Add sup_wait_roots array, if needed */
    wait_tasks = realloc(self->sup_wait_roots, size * sizeof(*wait_tasks));
    if (wait_tasks)
      self->sup_wait_roots = wait_tasks;

    if (!(indices && waits && wait_cbs && wait_args && wait_tasks)) {
      return -1;
    }

    self->sup_size_waits = size;
  }

  self->sup_n_waits++;

  if (priority > 0) {
    /* Insert */
    for (; n > 0; n--) {
      g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n-1]);
      self->sup_waits[n] = self->sup_waits[n-1];
      g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);
      self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1];
      self->sup_wait_args[n] = self->sup_wait_args[n-1];
      self->sup_wait_roots[n] = self->sup_wait_roots[n-1];
    }
  }
  else {
    /* Append - no need to move anything */
  }

  self->sup_waits[n] = *wait;
  g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);
  self->sup_wait_cbs[n] = callback;
  self->sup_wait_args[n] = arg;
  self->sup_wait_roots[n] = root;

  I = self->sup_max_index;

  for (i = 0; i < I; i++)
    if (self->sup_indices[i] == UINT_MAX)
      break;
    else if (self->sup_indices[i] >= n)
      self->sup_indices[i]++;

  if (i == I)
    self->sup_max_index++;

  if (n + 1 < self->sup_n_waits)
    for (j = i; j < I; j++)
      if (self->sup_indices[j] != UINT_MAX &&
        self->sup_indices[j] >= n)
      self->sup_indices[j]++;

  self->sup_indices[i] = n;

  self->sup_registers++;

  return i + 1;               /* 0 is failure */
}

/** Unregister a su_wait_t object.
 *
 *  The function su_source_unregister() unregisters a su_wait_t object. The
 *  wait object, a callback function and a argument are removed from the
 *  port object.
 *
 * @param self     - pointer to port object
 * @param root     - pointer to root object
 * @param wait     - pointer to wait object
 * @param callback - callback function pointer (may be NULL)
 * @param arg      - argument given to callback function when it is invoked
 *                   (may be NULL)
 *
 * @return Nonzero index of the wait object, or -1 upon an error.
 */
00693 int su_source_unregister(su_port_t *self,
                   su_root_t *root,
                   su_wait_t *wait,
                   su_wakeup_f callback, /* XXX - ignored */
                   su_wakeup_arg_t *arg)
{
  unsigned n, N;
  unsigned i, I, j, *indices;

  enter;

  assert(self);
  assert(SU_SOURCE_OWN_THREAD(self));

  i = (unsigned)-1;
  N = self->sup_n_waits;
  I = self->sup_max_index;
  indices = self->sup_indices;

  for (n = 0; n < N; n++) {
    if (SU_WAIT_CMP(wait[0], self->sup_waits[n]) != 0)
      continue;

    /* Found - delete it */
    if (indices[n] == n)
      i = n;
    else for (i = 0; i < I; i++)
      if (indices[i] == n)
      break;

    assert(i < I);

    indices[i] = UINT_MAX;

    g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);

    self->sup_n_waits = N = N - 1;

    if (n < N)
      for (j = 0; j < I; j++)
      if (self->sup_indices[j] != UINT_MAX &&
          self->sup_indices[j] > n)
        self->sup_indices[j]--;

    for (; n < N; n++) {
      g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n+1]);
      self->sup_waits[n] = self->sup_waits[n+1];
      g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);
      self->sup_wait_cbs[n] = self->sup_wait_cbs[n+1];
      self->sup_wait_args[n] = self->sup_wait_args[n+1];
      self->sup_wait_roots[n] = self->sup_wait_roots[n+1];
    }

    i += 1; /* 0 is failure */

    if (i == I)
      self->sup_max_index--;

    break;
  }

  self->sup_registers++;

  return (int)i;
}

/** Deregister a su_wait_t object.
 *
 *  The function su_source_deregister() deregisters a su_wait_t registrattion.
 *  The wait object, a callback function and a argument are removed from the
 *  port object.
 *
 * @param self     - pointer to port object
 * @param i        - registration index
 *
 * @return Index of the wait object, or -1 upon an error.
 */
00770 int su_source_deregister(su_port_t *self, int i)
{
  unsigned j, n, N;
  unsigned I, *indices;
  su_wait_t wait[1];

  enter;

  assert(self);
  assert(SU_SOURCE_OWN_THREAD(self));

  if (i <= 0)
    return -1;

  N = self->sup_n_waits;
  I = self->sup_max_index;
  indices = self->sup_indices;

  assert((unsigned)i < I + 1);

  n = indices[i - 1];

  if (n == UINT_MAX)
    return -1;

  self->sup_n_waits = N = N - 1;

  wait[0] = self->sup_waits[n];

  g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);

  if (n < N)
    for (j = 0; j < I; j++)
      if (self->sup_indices[j] != UINT_MAX &&
        self->sup_indices[j] > n)
      self->sup_indices[j]--;

  for (; n < N; n++) {
    g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n + 1]);
    self->sup_waits[n] = self->sup_waits[n+1];
    g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);
    self->sup_wait_cbs[n] = self->sup_wait_cbs[n+1];
    self->sup_wait_args[n] = self->sup_wait_args[n+1];
    self->sup_wait_roots[n] = self->sup_wait_roots[n+1];
  }

  indices[i - 1] = UINT_MAX;

  if ((unsigned)i == I)
    self->sup_max_index--;

  su_wait_destroy(wait);

  self->sup_registers++;

  return (int)i;
}

/** @internal
 * Unregister all su_wait_t objects.
 *
 * The function su_source_unregister_all() unregisters all su_wait_t objects
 * associated with given root object destroys all queued timers.
 *
 * @param  self     - pointer to port object
 * @param  root     - pointer to root object
 *
 * @return Number of wait objects removed.
 */
00839 int su_source_unregister_all(su_port_t *self,
                     su_root_t *root)
{
  unsigned i, j;
  unsigned         n_waits;
  su_wait_t       *waits;
  su_wakeup_f     *wait_cbs;
  su_wakeup_arg_t**wait_args;
  su_root_t      **wait_roots;

  enter;

  assert(SU_SOURCE_OWN_THREAD(self));

  n_waits    = self->sup_n_waits;
  waits      = self->sup_waits;
  wait_cbs   = self->sup_wait_cbs;
  wait_args  = self->sup_wait_args;
  wait_roots = self->sup_wait_roots;

  for (i = j = 0; (unsigned)i < n_waits; i++) {
    if (wait_roots[i] == root) {
      /* XXX - we should free all resources associated with this */
      g_source_remove_poll(self->sup_source, (GPollFD*)&waits[i]);
      continue;
    }
    if (i != j) {
      g_source_remove_poll(self->sup_source, (GPollFD*)&waits[i]);
      waits[j] = waits[i];
      wait_cbs[j] = wait_cbs[i];
      wait_args[j] = wait_args[i];
      wait_roots[j] = wait_roots[i];
      g_source_add_poll(self->sup_source, (GPollFD*)&waits[i]);
    }
    j++;
  }

  self->sup_n_waits = j;
  self->sup_registers++;

  return n_waits - j;
}

/**Set mask for a registered event. @internal
 *
 * The function su_source_eventmask() sets the mask describing events that can
 * signal the registered callback.
 *
 * @param port   pointer to port object
 * @param index  registration index
 * @param socket socket
 * @param events new event mask
 *
 * @retval 0 when successful,
 * @retval -1 upon an error.
 */
00895 int su_source_eventmask(su_port_t *self, int index, int socket, int events)
{
  unsigned n;
  int retval;

  enter;

  assert(self);
  assert(SU_SOURCE_OWN_THREAD(self));
  assert(0 < index && (unsigned)index <= self->sup_max_index);

  if (index <= 0 || (unsigned)index > self->sup_max_index)
    return -1;

  n = self->sup_indices[index - 1];

  if (n == UINT_MAX)
    return -1;

  g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);

  retval = su_wait_mask(&self->sup_waits[n], socket, events);

  g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]);

  return retval;
}

static
int su_source_multishot(su_port_t *self, int multishot)
{
  if (multishot == -1)
    return 1;
  else if (multishot == 0 || multishot == 1)
    return 1;                 /* Always enabled */
  else
    return (errno = EINVAL), -1;
}


/** @internal Main loop.
 *
 * The function @c su_source_run() runs the main loop
 *
 * The function @c su_source_run() runs until @c su_source_break() is called
 * from a callback.
 *
 * @param self     pointer to root object
 * */
00944 void su_source_run(su_port_t *self)
{
  GMainContext *gmc;
  GMainLoop *gml;

  enter;

  gmc = g_source_get_context(self->sup_source);
  if (gmc && g_main_context_acquire(gmc)) {
    gml = g_main_loop_new(gmc, TRUE);
    self->sup_main_loop = gml;
    g_main_loop_run(gml);
    g_main_loop_unref(gml);
    self->sup_main_loop = NULL;
    g_main_context_release(gmc);
  }
}

/** @internal
 * The function @c su_source_break() is used to terminate execution of @c
 * su_source_run(). It can be called from a callback function.
 *
 * @param self     pointer to port
 *
 */
00969 void su_source_break(su_port_t *self)
{
  enter;

  if (self->sup_main_loop)
    g_main_loop_quit(self->sup_main_loop);
}

/** @internal Block until wait object is signaled or timeout.
 *
 * This function waits for wait objects and the timers associated with
 * the root object.  When any wait object is signaled or timer is
 * expired, it invokes the callbacks.
 *
 *   This function returns when a callback has been invoked or @c tout
 *   milliseconds is elapsed.
 *
 * @param self     pointer to port
 * @param tout     timeout in milliseconds
 *
 * @Return
 *   Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if
 *   there are no active timers.
 */
00993 su_duration_t su_source_step(su_port_t *self, su_duration_t tout)
{
  GMainContext *gmc;

  enter;

  gmc = g_source_get_context(self->sup_source);

  if (gmc && g_main_context_acquire(gmc)) {
    GPollFD *fds = NULL;
    gint fds_size = 0;
    gint fds_wait;
    gint priority = G_MAXINT;
    gint src_tout = -1;

    g_main_context_prepare(gmc, &priority);

    fds_wait = g_main_context_query(gmc, priority, &src_tout, NULL, 0);
    while (fds_wait > fds_size) {
      fds = g_alloca(fds_wait * sizeof(fds[0]));
      fds_size = fds_wait;
      fds_wait = g_main_context_query(gmc, priority, &src_tout, fds, fds_size);
    }

    if (src_tout >= 0 && tout > (su_duration_t)src_tout)
      tout = src_tout;

    su_wait((su_wait_t *)fds, fds_wait, tout);

    g_main_context_check(gmc, priority, fds, fds_wait);

    g_main_context_dispatch(gmc);

    g_main_context_release(gmc);
  }

  return 0;
}

static int su_source_add_prepoll(su_port_t *port,
                         su_root_t *root,
                         su_prepoll_f *prepoll,
                         su_prepoll_magic_t *magic)
{
  /* We could call prepoll in su_source_prepare()?? */
  return -1;
}

static int su_source_remove_prepoll(su_port_t *port,
                          su_root_t *root)
{
  return -1;
}

#if 0
/** @internal
 *  Prints out the contents of the port.
 *
 * @param self pointer to a port
 * @param f    pointer to a file (if @c NULL, uses @c stdout).
 */
void su_source_dump(su_port_t const *self, FILE *f)
{
  int i;
#define IS_WAIT_IN(x) (((x)->events & SU_WAIT_IN) ? "IN" : "")
#define IS_WAIT_OUT(x) (((x)->events & SU_WAIT_OUT) ? "OUT" : "")
#define IS_WAIT_ACCEPT(x) (((x)->events & SU_WAIT_ACCEPT) ? "ACCEPT" : "")

  if (f == NULL)
    f = stdout;

  fprintf(f, "su_port_t at %p:\n", self);
  fprintf(f, "\tport is%s running\n", self->sup_running ? "" : "not ");
  fprintf(f, "\tport tid %p\n", (void *)self->sup_tid);
  fprintf(f, "\t%d wait objects\n", self->sup_n_waits);
  for (i = 0; i < self->sup_n_waits; i++) {

  }
}

#endif

/**@internal
 *
 * Allocates and initializes a reactor and message port object.
 *
 * @return
 *   If successful a pointer to the new message port is returned, otherwise
 *   NULL is returned.
 */
static su_port_t *su_source_port_create(void)
{
  SuSource *ss;
  su_port_t *self = NULL;

  SU_DEBUG_9(("su_source_port_create() called\n"));

  ss = (SuSource *)g_source_new(su_source_funcs, (sizeof *ss));

  if (ss) {
    self = ss->ss_port;
    if (su_source_port_init(self, su_source_port_vtable) < 0)
      g_source_unref(ss->ss_source), self = NULL;
  } else {
    su_perror("su_source_port_create(): g_source_new");
  }

  SU_DEBUG_1(("su_source_port_create() returns %p\n", (void *)self));

  return self;
}

/* No su_source_port_start */

/** Use su_source implementation when su_root_create() is called.
 *
 * @NEW_1_12_5
 */
01111 void su_glib_prefer_gsource(void)
{
  su_port_prefer(su_source_port_create, NULL);
}


Generated by  Doxygen 1.6.0   Back to index