Logo Search packages:      
Sourcecode: sofia-sip version File versions  Download package

tport_threadpool.c

/*
 * This file is part of the Sofia-SIP package
 *
 * Copyright (C) 2006 Nokia Corporation.
 *
 * Contact: Pekka Pessi <pekka.pessi@nokia.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA
 *
 */

/**@CFILE tport_threadpool.c Multithreading transport
 *
 * See tport.docs for more detailed description of tport interface.
 *
 * @author Pekka Pessi <Pekka.Pessi@nokia.com>
 * @author Martti Mela <Martti.Mela@nokia.com>
 *
 * @date Created: Fri Mar 24 08:45:49 EET 2006 ppessi
 */

#include "config.h"

#undef HAVE_SIGCOMP

#define SU_ROOT_MAGIC_T         struct tport_threadpool
#define SU_WAKEUP_ARG_T         struct tport_s
#define SU_MSG_ARG_T            union tport_su_msg_arg

#include "tport_internal.h"

#include <stdlib.h>
#include <time.h>
#include <assert.h>
#include <errno.h>
#include <limits.h>

#if HAVE_FUNC
#elif HAVE_FUNCTION
#define __func__ __FUNCTION__
#else
static char const __func__[] = "tport_threadpool";
#endif

/* ==== Thread pools =================================================== */

typedef struct threadpool threadpool_t;

00062 typedef struct {
  tport_primary_t tptp_primary;
00064   threadpool_t   *tptp_pool;   /**< Worker threads */
  unsigned        tptp_poolsize;
} tport_threadpool_t;

00068 struct threadpool
{
  /* Shared */
  su_clone_r thrp_clone;
  tport_threadpool_t *thrp_tport;

  int        thrp_killing; /* Threadpool is being killed */

  /* Private variables */
  su_root_t    *thrp_root;
  int           thrp_reg;
  struct sigcomp_compartment *thrp_compartment;
  su_msg_r   thrp_rmsg;

  /* Slave thread counters */
  int        thrp_r_sent;
  int        thrp_s_recv;

  unsigned   thrp_rcvd_msgs;
  unsigned   thrp_rcvd_bytes;

  /* Master thread counters */
  int        thrp_s_sent;
  int        thrp_r_recv;

  int        thrp_yield;
};

00096 typedef struct
{
  threadpool_t *tpd_thrp;
  int  tpd_errorcode;
  msg_t *tpd_msg;
  su_time_t tpd_when;
  unsigned tpd_mtu;
#if HAVE_SIGCOMP
  struct sigcomp_compartment *tpd_cc;
#endif
  struct sigcomp_udvm *tpd_udvm;
  socklen_t tpd_namelen;
  su_sockaddr_t tpd_name[1];
} thrp_udp_deliver_t;

00111 union tport_su_msg_arg
{
  threadpool_t   *thrp;
  thrp_udp_deliver_t thrp_udp_deliver[1];
};

int tport_threadpool_init_primary(tport_primary_t *,
                          tp_name_t tpn[1],
                          su_addrinfo_t *,
                          tagi_t const *,
                          char const **return_culprit);
static void tport_threadpool_deinit_primary(tport_primary_t *pri);

static int tport_thread_send(tport_t *tp,
                       msg_t *msg,
                       tp_name_t const *tpn,
                       struct sigcomp_compartment *cc,
                       unsigned mtu);

tport_vtable_t const tport_threadpool_vtable =
{
  "udp", tport_type_local,
  sizeof (tport_threadpool_t),
  tport_threadpool_init_primary,
  tport_threadpool_deinit_primary,
  NULL,
  NULL,
  0,                    /* No secondary transports! */
  NULL,
  NULL,
  NULL,
  NULL,
  NULL,
  tport_recv_dgram,
  tport_send_dgram,
  NULL,
  tport_thread_send
};

static int thrp_udp_init(su_root_t *, threadpool_t *);
static void thrp_udp_deinit(su_root_t *, threadpool_t *);
static int thrp_udp_event(threadpool_t *thrp,
                      su_wait_t *w,
                      tport_t *_tp);
static int thrp_udp_recv_deliver(threadpool_t *thrp,
                         tport_t const *tp,
                         thrp_udp_deliver_t *tpd,
                         int events);
static int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd);
#if HAVE_SIGCOMP
static int thrp_udvm_decompress(threadpool_t *thrp,
                        thrp_udp_deliver_t *tpd);
#endif
static void thrp_udp_deliver(threadpool_t *thrp,
                       su_msg_r msg,
                       union tport_su_msg_arg *arg);
static void thrp_udp_deliver_report(threadpool_t *thrp,
                            su_msg_r m,
                            union tport_su_msg_arg *arg);
static void thrp_udp_send(threadpool_t *thrp,
                    su_msg_r msg,
                    union tport_su_msg_arg *arg);
static void thrp_udp_send_report(threadpool_t *thrp,
                         su_msg_r msg,
                         union tport_su_msg_arg *arg);


/** Launch threads in the tport pool. */
int tport_threadpool_init_primary(tport_primary_t *pri,
                          tp_name_t tpn[1],
                          su_addrinfo_t *ai,
                          tagi_t const *tags,
                          char const **return_culprit)
{
  tport_threadpool_t *tptp = (tport_threadpool_t *)pri;
  tport_t *tp = pri->pri_primary;
  threadpool_t *thrp;
  int i, N = tp->tp_params->tpp_thrpsize;

  assert(ai->ai_socktype == SOCK_DGRAM);

  if (tport_udp_init_primary(pri, tpn, ai, tags, return_culprit) < 0)
    return -1;

  if (N == 0)
    return 0;

  thrp = su_zalloc(tp->tp_home, (sizeof *thrp) * N);
  if (!thrp)
    return -1;

  su_setblocking(tp->tp_socket, 0);

  tptp->tptp_pool = thrp;
  tptp->tptp_poolsize = N;

  for (i = 0; i < N; i++) {
#if HAVE_SIGCOMP
    if (tport_has_sigcomp(tp))
      thrp[i].thrp_compartment = tport_primary_compartment(tp->tp_master);
#endif
    thrp[i].thrp_tport = tptp;
    if (su_clone_start(pri->pri_master->mr_root,
                   thrp[i].thrp_clone,
                   thrp + i,
                   thrp_udp_init,
                   thrp_udp_deinit) < 0)
      goto error;
  }

  tp->tp_events = 0;

  return 0;

 error:
  assert(!"tport_launch_threadpool");
  return -1;
}

/** Kill threads in the tport pool.
 *
 * @note Executed by stack thread only.
 */
static
void tport_threadpool_deinit_primary(tport_primary_t *pri)
{
  tport_threadpool_t *tptp = (tport_threadpool_t *)pri;
  threadpool_t *thrp = tptp->tptp_pool;
  int i, N = pri->tptp_poolsize;

  if (!thrp)
    return;

  /* Prevent application from using these. */
  for (i = 0; i < N; i++)
    thrp[i].thrp_killing = 1;

  /* Stop every task in the threadpool. */
  for (i = 0; i < N; i++)
    su_clone_wait(pri->pri_master->mr_root, thrp[i].thrp_clone);

  su_free(pri->pri_home, tptp), tptp->tptp_pool = NULL;
  tptp->tptp_poolsize = 0;

  SU_DEBUG_3(("%s(%p): zapped threadpool\n", __func__, pri));
}

static int thrp_udp_init(su_root_t *root, threadpool_t *thrp)
{
  tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
  su_wait_t wait[1];

  assert(tp);

  thrp->thrp_root = root;

  if (su_wait_create(wait, tp->tp_socket, SU_WAIT_IN | SU_WAIT_ERR) < 0)
    return -1;

  thrp->thrp_reg = su_root_register(root, wait, thrp_udp_event, tp, 0);

  if (thrp->thrp_reg  == -1)
    return -1;

  return 0;
}

static void thrp_udp_deinit(su_root_t *root, threadpool_t *thrp)
{
  if (thrp->thrp_reg)
    su_root_deregister(root, thrp->thrp_reg), thrp->thrp_reg = 0;
  su_msg_destroy(thrp->thrp_rmsg);
}

su_inline void
thrp_yield(threadpool_t *thrp)
{
  tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
  su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, 0);
  thrp->thrp_yield = 1;
}

su_inline void
thrp_gain(threadpool_t *thrp)
{
  tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
  int events = SU_WAIT_IN | SU_WAIT_ERR;
  su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, events);
  thrp->thrp_yield = 0;
}

static int thrp_udp_event(threadpool_t *thrp,
                    su_wait_t *w,
                    tport_t *tp)
{
#if HAVE_POLL
  assert(w->fd == tp->tp_socket);
#endif

  for (;;) {
    thrp_udp_deliver_t *tpd;
    int events;

    if (!*thrp->thrp_rmsg) {
      if (su_msg_create(thrp->thrp_rmsg,
                  su_root_parent(thrp->thrp_root),
                  su_root_task(thrp->thrp_root),
                  thrp_udp_deliver,
                  sizeof (*tpd)) == -1) {
      SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp,
                strerror(errno)));
      return 0;
      }
    }

    tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver; assert(tpd);
    tpd->tpd_thrp = thrp;

    events = su_wait_events(w, tp->tp_socket);
    if (!events)
      return 0;

    thrp_udp_recv_deliver(thrp, tp, tpd, events);

    if (*thrp->thrp_rmsg) {
      SU_DEBUG_7(("thrp_udp_event(%p): no msg sent\n", thrp));
      tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver;
      memset(tpd, 0, sizeof *tpd);
      return 0;
    }

    if (thrp->thrp_yield || (thrp->thrp_s_sent - thrp->thrp_s_recv) > 0)
      return 0;

    su_wait(w, 1, 0);
  }
}

static int thrp_udp_recv_deliver(threadpool_t *thrp,
                         tport_t const *tp,
                         thrp_udp_deliver_t *tpd,
                         int events)
{
  unsigned qlen = thrp->thrp_r_sent - thrp->thrp_r_recv;

  SU_DEBUG_7(("thrp_udp_event(%p): events%s%s%s%s for %p\n", thrp,
            events & SU_WAIT_IN ? " IN" : "",
            events & SU_WAIT_HUP ? " HUP" : "",
            events & SU_WAIT_OUT ? " OUT" : "",
            events & SU_WAIT_ERR ? " ERR" : "",
            tpd));

  if (events & SU_WAIT_ERR) {
    tpd->tpd_errorcode = tport_udp_error(tp, tpd->tpd_name);
    if (tpd->tpd_errorcode) {
      if (thrp->thrp_yield)
      su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report);
      tpd->tpd_when = su_now();
      su_msg_send(thrp->thrp_rmsg);
      thrp->thrp_r_sent++;
      return 0;
    }
  }

  if (events & SU_WAIT_IN) {
    if (thrp_udp_recv(thrp, tpd) < 0) {
      tpd->tpd_errorcode = su_errno();
      assert(tpd->tpd_errorcode);
      if (su_is_blocking(tpd->tpd_errorcode))
      return 0;
    }
    else if (tpd->tpd_msg) {
      int n = msg_extract(tpd->tpd_msg); (void)n;

      thrp->thrp_rcvd_msgs++;
      thrp->thrp_rcvd_bytes += msg_size(tpd->tpd_msg);
    }

#if HAVE_SIGCOMP
    if (tpd->tpd_udvm && !tpd->tpd_msg)
      sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;
#endif

    assert(!tpd->tpd_msg || !tpd->tpd_errorcode);

    if (tpd->tpd_msg || tpd->tpd_errorcode) {
      if (qlen >= tp->tp_params->tpp_thrprqsize) {
      SU_DEBUG_7(("tport recv queue %i: %u\n",
                (int)(thrp - tp->tp_pri->tptp_pool), qlen));
      thrp_yield(thrp);
      }

      if (qlen >= tp->tp_params->tpp_thrprqsize / 2)
      su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report);
      tpd->tpd_when = su_now();
      su_msg_send(thrp->thrp_rmsg);
      thrp->thrp_r_sent++;
      return 0;
    }
  }

  return 0;
}

#include <pthread.h>

/** Mutex for reading from socket */
static pthread_mutex_t mutex[1] = { PTHREAD_MUTEX_INITIALIZER };

/** Receive a UDP packet by threadpool. */
static
int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd)
{
  tport_t const *tp = thrp->thrp_tport->pri_primary;
  unsigned char sample[2];
  int N;
  int s = tp->tp_socket;

  pthread_mutex_lock(mutex);

  /* Simulate packet loss */
  if (tp->tp_params->tpp_drop &&
      su_randint(0, 1000) < tp->tp_params->tpp_drop) {
    recv(s, sample, 1, 0);
    pthread_mutex_unlock(mutex);
    SU_DEBUG_3(("tport(%p): simulated packet loss!\n", tp));
    return 0;
  }

  /* Peek for first two bytes in message:
     determine if this is stun, sigcomp or sip
  */
  N = recv(s, sample, sizeof sample, MSG_PEEK | MSG_TRUNC);

  if (N < 0) {
    if (su_is_blocking(su_errno()))
      N = 0;
  }
  else if (N <= 1) {
    SU_DEBUG_1(("%s(%p): runt of %u bytes\n", "thrp_udp_recv", thrp, N));
    recv(s, sample, sizeof sample, 0);
    N = 0;
  }
#if !HAVE_MSG_TRUNC
  else if ((N = su_getmsgsize(tp->tp_socket)) < 0)
    ;
#endif
  else if ((sample[0] & 0xf8) == 0xf8) {
#if HAVE_SIGCOMP
    if (thrp->thrp_compartment) {
      struct sigcomp_buffer *input;
      void *data;
      int dlen;

      tpd->tpd_udvm =
      sigcomp_udvm_create_for_compartment(thrp->thrp_compartment);
      input = sigcomp_udvm_input_buffer(tpd->tpd_udvm, N); assert(input);

      data = input->b_data + input->b_avail;
      dlen = input->b_size - input->b_avail;

      if (dlen < N)
      dlen = 0;

      tpd->tpd_namelen = sizeof(tpd->tpd_name);

      dlen = recvfrom(tp->tp_socket, data, dlen, 0,
                  &tpd->tpd_name->su_sa, &tpd->tpd_namelen);

      SU_CANONIZE_SOCKADDR(tpd->tpd_name);

      if (dlen < N) {
      su_seterrno(EMSGSIZE);        /* Protocol error */
      N = -1;
      } else if (dlen == -1)
      N = -1;
      else {
      input->b_avail += dlen;
      input->b_complete = 1;

      pthread_mutex_unlock(mutex);

      N = thrp_udvm_decompress(thrp, tpd);

      if (N == -1)
        /* Do not report decompression errors as ICMP errors */
        memset(tpd->tpd_name, 0, tpd->tpd_namelen);

      return N;
      }
      pthread_mutex_unlock(mutex);
      return N;
    }
#endif
    recv(s, sample, 1, 0);
    pthread_mutex_unlock(mutex);
    /* XXX - send NACK ? */
    su_seterrno(EBADMSG);
    N = -1;
  }
  else {
    /* receive as usual */
    N = tport_recv_dgram_r(tp, &tpd->tpd_msg, N);
  }

  pthread_mutex_unlock(mutex);

  return N;
}

#if HAVE_SIGCOMP
static
int thrp_udvm_decompress(threadpool_t *thrp, thrp_udp_deliver_t *tpd)
{
  struct sigcomp_udvm *udvm = tpd->tpd_udvm;
  struct sigcomp_buffer *output;
  msg_iovec_t iovec[msg_n_fragments] = {{ 0 }};
  su_addrinfo_t *ai;
  tport_t *tp = thrp->thrp_tport->pri_primary;
  size_t n, m, i, dlen;
  int eos;
  void *data;
  ssize_t veclen;

  output = sigcomp_udvm_output_buffer(udvm, -1);

  if (sigcomp_udvm_decompress(udvm, output, NULL) < 0) {
    int error = sigcomp_udvm_errno(udvm);
    SU_DEBUG_3(("%s: UDVM error %d: %s\n", __func__,
            error, sigcomp_udvm_strerror(udvm)));
    su_seterrno(EREMOTEIO);
    return -1;
  }

  data = output->b_data + output->b_used;
  dlen = output->b_avail - output->b_used;
  /* XXX - if a message is larger than default output size... */
  eos = output->b_complete; assert(output->b_complete);

  veclen = tport_recv_iovec(tp, &tpd->tpd_msg, iovec, dlen, eos);

  if (veclen <= 0) {
    n = -1;
  } else {
    for (i = 0, n = 0; i < veclen; i++) {
      m = iovec[i].mv_len; assert(dlen >= n + m);
      memcpy(iovec[i].mv_base, data + n, m);
      n += m;
    }
    assert(dlen == n);

    msg_recv_commit(tpd->tpd_msg, dlen, eos);    /* Mark buffer as used */

    /* Message address */
    ai = msg_addrinfo(tpd->tpd_msg);
    ai->ai_flags |= TP_AI_COMPRESSED;
    ai->ai_family = tpd->tpd_name->su_sa.sa_family;
    ai->ai_socktype = SOCK_DGRAM;
    ai->ai_protocol = IPPROTO_UDP;
    memcpy(ai->ai_addr, tpd->tpd_name, ai->ai_addrlen = tpd->tpd_namelen);

    SU_DEBUG_9(("%s(%p): sigcomp msg sz = %d\n", __func__, tp, n));
  }

  return n;
}
#endif

/** Deliver message from threadpool to the stack
 *
 * @note Executed by stack thread only.
 */
static
void thrp_udp_deliver(su_root_magic_t *magic,
                  su_msg_r m,
                  union tport_su_msg_arg *arg)
{
  thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
  threadpool_t *thrp = tpd->tpd_thrp;
  tport_t *tp = thrp->thrp_tport->pri_primary;
  su_time_t now = su_now();

  assert(magic != thrp);

  thrp->thrp_r_recv++;

  if (thrp->thrp_killing) {
#if HAVE_SIGCOMP
    sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;
#endif
    msg_destroy(tpd->tpd_msg);
    return;
  }

  SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n",
            thrp, tpd, 1000 * su_time_diff(now, tpd->tpd_when)));

  if (tpd->tpd_errorcode)
    tport_error_report(tp, tpd->tpd_errorcode, tpd->tpd_name);
  else if (tpd->tpd_msg) {
    tport_deliver(tp, tpd->tpd_msg, NULL, &tpd->tpd_udvm, tpd->tpd_when);
    tp->tp_rlogged = NULL;
  }

#if HAVE_SIGCOMP
  if (tpd->tpd_udvm) {
    sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;
  }
#endif
}

static
void thrp_udp_deliver_report(threadpool_t *thrp,
                       su_msg_r m,
                       union tport_su_msg_arg *arg)
{
  if (thrp->thrp_yield) {
    int qlen = thrp->thrp_r_sent - thrp->thrp_r_recv;
    int qsize = thrp->thrp_tport->pri_params->tpp_thrprqsize;
    if (qlen == 0 || qlen < qsize / 2)
      thrp_gain(thrp);
  }
}

/** Send a message to network using threadpool.
 *
 * @note Executed by stack thread only.
 */
static
int tport_thread_send(tport_t *tp,
                  msg_t *msg,
                  tp_name_t const *tpn,
                  struct sigcomp_compartment *cc,
                  unsigned mtu)
{

  threadpool_t *thrp = tp->tp_pri->tptp_pool;
  thrp_udp_deliver_t *tpd;
  int i, N = tp->tp_pri->tptp_poolsize;
  su_msg_r m;
  unsigned totalqlen = 0;
  unsigned qlen;

  if (!tp->tp_pri->tptp_pool)
    return tport_prepare_and_send(tp, msg, tpn, cc, mtu);

  SU_DEBUG_9(("tport_thread_send()\n"));

  if (thrp->thrp_killing)
    return (su_seterrno(ECHILD)), -1;

  qlen = totalqlen = thrp->thrp_s_sent - thrp->thrp_s_recv;

  /* Select thread with shortest queue */
  for (i = 1; i < N; i++) {
    threadpool_t *other = tp->tp_pri->tptp_pool + i;
    unsigned len = other->thrp_s_sent - other->thrp_s_recv;

    if (len < qlen ||
      (len == qlen && (other->thrp_s_sent - thrp->thrp_s_sent) < 0))
      thrp = other, qlen = len;

    totalqlen += len;
  }

  if (totalqlen >= N * tp->tp_params->tpp_qsize)
    SU_DEBUG_3(("tport send queue: %u (shortest %u)\n", totalqlen, qlen));

  if (su_msg_create(m,
                su_clone_task(thrp->thrp_clone),
                su_root_task(tp->tp_master->mr_root),
                thrp_udp_send,
                sizeof (*tpd)) != su_success) {
    SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp,
            strerror(errno)));
    return -1;
  }

  tpd = su_msg_data(m)->thrp_udp_deliver;
  tpd->tpd_thrp = thrp;
  tpd->tpd_when = su_now();
  tpd->tpd_mtu = mtu;
  tpd->tpd_msg = msg_ref_create(msg);

#if HAVE_SIGCOMP
  tpd->tpd_cc = cc;
#endif

  su_msg_report(m, thrp_udp_send_report);

  if (su_msg_send(m) == su_success) {
    thrp->thrp_s_sent++;
    return 0;
  }

  msg_ref_destroy(msg);
  return -1;
}

/** thrp_udp_send() is run by threadpool to send the message. */
static
void thrp_udp_send(threadpool_t *thrp,
               su_msg_r m,
               union tport_su_msg_arg *arg)
{
  thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
  tport_t *tp = thrp->thrp_tport->pri_primary;
  msg_t *msg = tpd->tpd_msg;
  msg_iovec_t *iov, auto_iov[40], *iov0 = NULL;
  int iovlen, iovused, n;

  assert(thrp == tpd->tpd_thrp);

  thrp->thrp_s_recv++;

  {
    double delay = 1000 * su_time_diff(su_now(), tpd->tpd_when);
    if (delay > 100)
      SU_DEBUG_3(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay));
    else
      SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay));
  }

  if (!msg) {
    tpd->tpd_errorcode = EINVAL;
    return;
  }

  /* Prepare message for sending - i.e., encode it */
  if (msg_prepare(msg) < 0) {
    tpd->tpd_errorcode = errno;
    return;
  }

  if (tpd->tpd_mtu != 0 && msg_size(msg) > tpd->tpd_mtu) {
    tpd->tpd_errorcode = EMSGSIZE;
    return;
  }

  /* Use initially the I/O vector from stack */
  iov = auto_iov, iovlen = sizeof(auto_iov)/sizeof(auto_iov[0]);

  /* Get a iovec for message contents */
  for (;;) {
    iovused = msg_iovec(msg, iov, iovlen);
    if (iovused <= iovlen)
      break;

    iov = iov0 = realloc(iov0, sizeof(*iov) * iovused);
    iovlen = iovused;

    if (iov0 == NULL) {
      tpd->tpd_errorcode = errno;
      return;
    }
  }

  assert(iovused > 0);

  tpd->tpd_when = su_now();

  if (0)
    ;
#if HAVE_SIGCOMP
  else if (tpd->tpd_cc) {
    tport_sigcomp_t sc[1] = {{ NULL }};

    n = tport_sigcomp_vsend(tp, msg, iov, iovused, tpd->tpd_cc, sc);
  }
#endif
  else
    n = tport_send_dgram(tp, msg, iov, iovused);

  if (n == -1)
    tpd->tpd_errorcode = su_errno();

  if (iov0)
    free(iov0);
}

static
void thrp_udp_send_report(su_root_magic_t *magic,
                    su_msg_r msg,
                    union tport_su_msg_arg *arg)
{
  thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
  threadpool_t *thrp = tpd->tpd_thrp;
  tport_t *tp = thrp->thrp_tport->pri_primary;

  assert(magic != thrp);

  SU_DEBUG_7(("thrp_udp_send_report(%p): got %p delay %f\n",
            thrp, tpd, 1000 * su_time_diff(su_now(), tpd->tpd_when)));

  if (tp->tp_master->mr_log)
    tport_log_msg(tp, tpd->tpd_msg, "sent", "to", tpd->tpd_when);

  if (tpd->tpd_errorcode)
    tport_error_report(tp, tpd->tpd_errorcode, msg_addr(tpd->tpd_msg));

  msg_ref_destroy(tpd->tpd_msg);
}

Generated by  Doxygen 1.6.0   Back to index