Logo Search packages:      
Sourcecode: sofia-sip version File versions

su_pthread_port.c

/*
 * This file is part of the Sofia-SIP package
 *
 * Copyright (C) 2005 Nokia Corporation.
 *
 * Contact: Pekka Pessi <pekka.pessi@nokia.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA
 *
 */

/**@ingroup su_wait
 * @CFILE su_pthread_port.c
 *
 * OS-Independent Syncronization Interface with pthreads
 *
 * This implements #su_msg_t message passing functionality using pthreads.
 *
 * @author Pekka Pessi <Pekka.Pessi@nokia.com>
 * @author Kai Vehmanen <kai.vehmanen@nokia.com>
 *
 * @date Created: Tue Sep 14 15:51:04 1999 ppessi
 */

#include "config.h"

#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <errno.h>

00048 #define su_pthread_port_s su_port_s
#define SU_CLONE_T su_msg_t

#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"

#if 1
#define PORT_LOCK_DEBUG(x)  ((void)0)
#else
#define PORT_LOCK_DEBUG(x)  printf x
#endif

#define SU_TASK_COPY(d, s, by) (void)((d)[0]=(s)[0], \
  (s)->sut_port?(void)su_port_incref(s->sut_port, #by):(void)0)

/**@internal
 *
 * Initializes a message port. It creates a mailbox used to wake up the
 * thread waiting on the port if needed. Currently, the mailbox is a
 * socketpair or an UDP socket connected to itself.
 */
int su_pthread_port_init(su_port_t *self, su_port_vtable_t const *vtable)
{
  SU_DEBUG_9(("su_pthread_port_init(%p, %p) called\n",
            (void *)self, (void *)vtable));

  self->sup_tid = pthread_self();

  if (su_base_port_init(self, vtable) == 0 &&
      su_base_port_threadsafe(self) == 0) {
    return 0;
  }
  else
    su_base_port_deinit(self);

  return -1;
}


/** @internal Deinit a base implementation of port. */
void su_pthread_port_deinit(su_port_t *self)
{
  assert(self);

#if 0
  pthread_mutex_destroy(self->sup_runlock);
  pthread_cond_destroy(self->sup_resume);
#endif

  su_base_port_deinit(self);
}

void su_pthread_port_lock(su_port_t *self, char const *who)
{
  PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
               (void *)pthread_self(), who, self));

  su_home_mutex_lock(self->sup_base->sup_home);

  PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...", 
               (void *)pthread_self(), who, self));
}

void su_pthread_port_unlock(su_port_t *self, char const *who)
{
  su_home_mutex_unlock(self->sup_base->sup_home);

  PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n", 
               (void *)pthread_self(), who, self));
}

/** @internal
 * Checks if the calling thread owns the port object.
 *
 * @param self pointer to a port object
 *
 * @retval true (nonzero) if the calling thread owns the port,
 * @retval false (zero) otherwise.
 */
int su_pthread_port_own_thread(su_port_t const *self)
{
  return self == NULL || 
    pthread_equal(self->sup_tid, pthread_self());
}

/* -- Clones ------------------------------------------------------------ */

struct clone_args
{
  su_port_create_f*create;
  su_root_t       *parent;
  su_root_magic_t *magic;
  su_root_init_f   init;
  su_root_deinit_f deinit;
  pthread_mutex_t  mutex[1];
  pthread_cond_t   cv[1];
  int              retval;
  su_msg_r         clone;
};

static void *su_pthread_port_clone_main(void *varg);
static void su_pthread_port_return_to_parent(struct clone_args *arg,
                                   int retval);
static su_msg_function su_pthread_port_clone_break;

/* Structure used to synchronize parent and clone in su_clone_wait() */
struct su_pthread_port_waiting_parent {
  pthread_mutex_t deinit[1];
  pthread_mutex_t mutex[1];
  pthread_cond_t cv[1];
  int waiting;
};

/** Start a clone task running under a pthread.
 *
 * @internal
 *
 * Allocates and initializes a sub-task with its own pthread. The sub-task is
 * represented by clone handle to the rest of the application. The function
 * su_clone_start() returns the clone handle in @a return_clone. The clone
 * handle is used to communicate with the newly created clone task using
 * messages.
 *
 * A new #su_root_t object is created for the sub-task with the @a magic as
 * the root context pointer. Because the sub-task may or may not have its
 * own thread, all its activity must be scheduled via this root object. In
 * other words, the sub-task can be schedule
 * -# I/O events with su_root_register()
 * -# timers with su_timer_set(), su_timer_set_at() or su_timer_run()
 * -# messages with su_msg_send().
 *
 * Messages can also be used to pass information between tasks or threads.
 *
 * After the new thread has been launched, the initialization routine is
 * executed by the newly created thread. The calling thread blocks until
 * the initialization routine completes. If the initialization routine
 * returns #su_success (0), the sub-task is considered to be created
 * successfully. After the successful initialization, the sub-task continues
 * to execeute the function su_root_run().
 *
 * If the initalization function @a init fails, the sub-task (either the
 * newly created thread or the current thread executing the su_clone_start()
 * function) calls the deinitialization function, and su_clone_start()
 * returns NULL.
 *
 * @param parent   root to be cloned (may be NULL if multi-threaded)
 * @param return_clone reference to a clone [OUT]
 * @param magic    pointer to user data
 * @param init     initialization function
 * @param deinit   deinitialization function
 *
 * @return 0 if successfull, -1 upon an error.
 *
 * @sa su_root_threading(), su_clone_task(), su_clone_stop(), su_clone_wait(),
 * su_clone_forget().
 * 
 */
int su_pthreaded_port_start(su_port_create_f *create,
                      su_root_t *parent,
                      su_clone_r return_clone,
                      su_root_magic_t *magic,
                      su_root_init_f init,
                      su_root_deinit_f deinit)
{
  struct clone_args arg = {
    /* create: */ NULL,
    /* parent: */ NULL,
    /* magic: */  NULL,
    /* init: */   NULL,
    /* deinit: */ NULL,
    /* mutex: */  { PTHREAD_MUTEX_INITIALIZER },
    /* cv: */     { PTHREAD_COND_INITIALIZER },
    /* retval: */ -1,
    /* clone: */  SU_MSG_R_INIT,
  };

  int thread_created = 0;
  pthread_t tid;

  arg.create = create;
  arg.parent = parent;
  arg.magic = magic;
  arg.init = init;
  arg.deinit = deinit;

  pthread_mutex_lock(arg.mutex);
  if (pthread_create(&tid, NULL, su_pthread_port_clone_main, &arg) == 0) {
    pthread_cond_wait(arg.cv, arg.mutex);
    thread_created = 1;
  }
  pthread_mutex_unlock(arg.mutex);

  pthread_mutex_destroy(arg.mutex);
  pthread_cond_destroy(arg.cv);

  if (arg.retval != 0) {
    if (thread_created)
      pthread_join(tid, NULL);
    return -1;
  }

  *return_clone = *arg.clone;

  return 0;
}

/** Main function for clone thread.
 *
 * @internal
 */
static void *su_pthread_port_clone_main(void *varg)
{
  struct clone_args *arg = (struct clone_args *)varg;
  su_task_r task;
  int zap = 1;

#if SU_HAVE_WINSOCK
  su_init();
#endif

  task->sut_port = arg->create();

  if (task->sut_port) {
    task->sut_port->sup_thread = 1;

    task->sut_root = su_salloc(su_port_home(task->sut_port),
                         sizeof *task->sut_root);
    if (task->sut_root) {

      task->sut_root->sur_threading = 1;  /* By default */

      SU_TASK_COPY(task->sut_root->sur_parent, su_root_task(arg->parent), 
               su_pthread_port_clone_main);
      SU_TASK_COPY(task->sut_root->sur_task, task,
               su_pthread_port_clone_main);

      if (su_msg_create(arg->clone,
                  task,
                  su_root_task(arg->parent),
                  su_pthread_port_clone_break,
                  0) == 0) {
      task->sut_root->sur_magic = arg->magic;
      task->sut_root->sur_deinit = arg->deinit;

      if (arg->init(task->sut_root, arg->magic) == 0) {
        su_pthread_port_return_to_parent(arg, 0), arg = NULL;

        su_root_run(task->sut_root); /* Do the work */

        /* Cleanup */
        if (task->sut_port->sup_waiting_parent) {
          struct su_pthread_port_waiting_parent *mom;

          mom = task->sut_port->sup_waiting_parent;
          pthread_mutex_lock(mom->mutex);
          mom->waiting = 0;
          pthread_cond_signal(mom->cv);
          pthread_mutex_unlock(mom->mutex);
    
          pthread_mutex_lock(mom->deinit);
          su_port_getmsgs(task->sut_port);
          pthread_mutex_unlock(mom->deinit);
        }
        else
          zap = 0;
      }
      else
        su_msg_destroy(arg->clone);

      su_root_destroy(task->sut_root);
      }
    }

    task->sut_port->sup_thread = 0;

    task->sut_port->sup_base->sup_vtable->
      su_port_decref(task->sut_port, zap, 
                 "su_pthread_port_clone_main");
  }

#if SU_HAVE_WINSOCK
  su_deinit();
#endif

  if (arg)
    su_pthread_port_return_to_parent(arg, -1);

  return NULL;                /* Exit from thread */
}

/* Signal that parent can resume execution */
static void su_pthread_port_return_to_parent(struct clone_args *arg,
                                   int retval)
{
  arg->retval = retval;

  pthread_mutex_lock(arg->mutex);
  pthread_cond_signal(arg->cv);
  pthread_mutex_unlock(arg->mutex);
}

/** "Stop" message function for pthread clone.
 *
 * @sa su_clone_wait()
 * @internal 
 */
static void su_pthread_port_clone_break(su_root_magic_t *m,
                              su_msg_r msg,
                              su_msg_arg_t *a)
{
  su_root_t *root = su_msg_to(msg)->sut_root;

  root->sur_deiniting = 1;

  su_root_break(root);
}

/** Wait for the pthread clone to exit.
 * @internal
 */
void su_pthread_port_wait(su_clone_r rclone)
{
  su_port_t *clone, *parent;
  struct su_pthread_port_waiting_parent mom[1];
  pthread_t tid;

  clone = su_msg_to(rclone)->sut_port;
  parent = su_msg_from(rclone)->sut_port;

  if (clone == parent) {
    su_base_port_wait(rclone);
    return;
  }
  
  assert(parent); assert(clone); 
  assert(rclone[0]->sum_func == su_pthread_port_clone_break);
#if 0
  assert(!clone->sup_paused);
#endif

  tid = clone->sup_tid;

  if (!clone->sup_thread) {   /* Already died */
    su_msg_destroy(rclone);
    pthread_join(tid, NULL);
    return;  
  }

  pthread_mutex_init(mom->deinit, NULL);
  pthread_mutex_lock(mom->deinit);

  pthread_cond_init(mom->cv, NULL);
  pthread_mutex_init(mom->mutex, NULL);
  pthread_mutex_lock(mom->mutex);

  mom->waiting = 1;

  clone->sup_waiting_parent = mom;

  su_msg_send(rclone);

  while (mom->waiting)
    pthread_cond_wait(mom->cv, mom->mutex);

  /* Run all messages from clone */
  while (su_port_getmsgs_from(parent, clone))
    ;

  /* Allow clone thread to exit */
  pthread_mutex_unlock(mom->deinit);
  pthread_join(tid, NULL);

  pthread_mutex_destroy(mom->deinit);

  pthread_mutex_unlock(mom->mutex);
  pthread_mutex_destroy(mom->mutex);
  pthread_cond_destroy(mom->cv);
}

struct su_pthread_port_execute
{
  pthread_mutex_t mutex[1];
  pthread_cond_t cond[1];
  int (*function)(void *);
  void *arg;
  int value;
};

static su_msg_function _su_pthread_port_execute;

/** Execute the @a function by a pthread @a task.
 *
 * @retval 0 if successful
 * @retval -1 upon an error
 *
 * @sa su_task_execute()
 *
 * @internal
 */
int su_pthread_port_execute(su_task_r const task,
                      int (*function)(void *), void *arg,
                      int *return_value)
{
  int success;
  su_msg_r m = SU_MSG_R_INIT;
  struct su_pthread_port_execute frame = {
    { PTHREAD_MUTEX_INITIALIZER },
    { PTHREAD_COND_INITIALIZER },
    function, arg, 0
  };

  if (su_msg_create(m, task, su_task_null,
                _su_pthread_port_execute, (sizeof &frame)) < 0)
    return -1;

  *(struct su_pthread_port_execute **)su_msg_data(m) = &frame;

  pthread_mutex_lock(frame.mutex);

  success = su_msg_send(m);

  if (success == 0)
    while (frame.function)
      pthread_cond_wait(frame.cond, frame.mutex);
  else
    su_msg_destroy(m);

  pthread_mutex_unlock(frame.mutex);
  pthread_mutex_destroy(frame.mutex);
  pthread_cond_destroy(frame.cond);

  if (return_value)
    *return_value = frame.value;

  return success;
}

static void _su_pthread_port_execute(su_root_magic_t *m,
                             su_msg_r msg,
                             su_msg_arg_t *a)
{
  struct su_pthread_port_execute *frame;
  frame = *(struct su_pthread_port_execute **)a;
  pthread_mutex_lock(frame->mutex);
  frame->value = frame->function(frame->arg);
  frame->function = NULL;     /* Mark as completed */
  pthread_cond_signal(frame->cond);
  pthread_mutex_unlock(frame->mutex);
}

#if 0                   /* pausing and resuming are not used */

/** Pause the pthread port.
 *
 * This is a message function invoked by su_pthread_port_pause() and called
 * from the message dispatcher. It releases the lock sup_runlock and waits
 * until the condition variable sup_resume is signaled and sup_paused is
 * cleared by su_pthread_port_resume().
 */
static
void su_pthread_port_paused(su_root_magic_t *magic,
                      su_msg_r msg,
                      su_msg_arg_t *arg)
{
  su_port_t *self = su_msg_to(msg)->sut_port;
  self->sup_paused = 1;
  while (self->sup_paused)
    pthread_cond_wait(self->sup_resume, self->sup_runlock);
}

/** Pause a port.
 *
 * Obtain an exclusive lock on port's private data.
 *
 * @retval 0 if successful (and clone is paused)
 * @retval -1 upon an error
 */
int su_pthread_port_pause(su_port_t *self)
{
  su_msg_r m = SU_MSG_R_INIT;
  _su_task_t task[1] = {{ self, NULL }};

  if (su_msg_create(m, task, su_task_null, su_pthread_port_paused, 0) < 0)
    return -1;

  if (su_msg_send(m) < 0)
    return -1;

  if (pthread_mutex_lock(self->sup_runlock) < 0)
    return -1;

  return 0;
}

/** Resume a port.
 *
 * Give up an exclusive lock on port's private data.
 *
 * @retval 0 if successful (and clone is resumed)
 * @retval -1 upon an error
 */
int su_pthread_port_resume(su_port_t *self)
{
  assert(self && self->sup_paused);

  self->sup_paused = 0;

  if (pthread_cond_signal(self->sup_resume) < 0 ||
      pthread_mutex_unlock(self->sup_runlock) < 0)
    return -1;

  return 0;
}

#endif

Generated by  Doxygen 1.6.0   Back to index