Back to index

openldap  2.4.31
tpool.c
Go to the documentation of this file.
00001 /* $OpenLDAP$ */
00002 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
00003  *
00004  * Copyright 1998-2012 The OpenLDAP Foundation.
00005  * All rights reserved.
00006  *
00007  * Redistribution and use in source and binary forms, with or without
00008  * modification, are permitted only as authorized by the OpenLDAP
00009  * Public License.
00010  *
00011  * A copy of this license is available in file LICENSE in the
00012  * top-level directory of the distribution or, alternatively, at
00013  * <http://www.OpenLDAP.org/license.html>.
00014  */
00015 
00016 #include "portable.h"
00017 
00018 #include <stdio.h>
00019 
00020 #include <ac/signal.h>
00021 #include <ac/stdarg.h>
00022 #include <ac/stdlib.h>
00023 #include <ac/string.h>
00024 #include <ac/time.h>
00025 #include <ac/errno.h>
00026 
00027 #include "ldap-int.h"
00028 #include "ldap_pvt_thread.h" /* Get the thread interface */
00029 #include "ldap_queue.h"
00030 #define LDAP_THREAD_POOL_IMPLEMENTATION
00031 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
00032 
00033 #ifndef LDAP_THREAD_HAVE_TPOOL
00034 
00035 /* Thread-specific key with data and optional free function */
00036 typedef struct ldap_int_tpool_key_s {
00037        void *ltk_key;
00038        void *ltk_data;
00039        ldap_pvt_thread_pool_keyfree_t *ltk_free;
00040 } ldap_int_tpool_key_t;
00041 
00042 /* Max number of thread-specific keys we store per thread.
00043  * We don't expect to use many...
00044  */
00045 #define       MAXKEYS       32
00046 
00047 /* Max number of threads */
00048 #define       LDAP_MAXTHR   1024   /* must be a power of 2 */
00049 
00050 /* (Theoretical) max number of pending requests */
00051 #define MAX_PENDING (INT_MAX/2)    /* INT_MAX - (room to avoid overflow) */
00052 
00053 /* pool->ltp_pause values */
00054 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
00055 
00056 /* Context: thread ID and thread-specific key/data pairs */
00057 typedef struct ldap_int_thread_userctx_s {
00058        ldap_pvt_thread_t ltu_id;
00059        ldap_int_tpool_key_t ltu_key[MAXKEYS];
00060 } ldap_int_thread_userctx_t;
00061 
00062 
00063 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
00064  * Protected by ldap_pvt_thread_pool_mutex except during pauses,
00065  * when it is read-only (used by pool_purgekey and pool_context).
00066  * Protected by tpool->ltp_mutex during pauses.
00067  */
00068 static struct {
00069        ldap_int_thread_userctx_t *ctx;
00070        /* ctx is valid when not NULL or DELETED_THREAD_CTX */
00071 #      define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
00072 } thread_keys[LDAP_MAXTHR];
00073 
00074 #define       TID_HASH(tid, hash) do { \
00075        unsigned const char *ptr_ = (unsigned const char *)&(tid); \
00076        unsigned i_; \
00077        for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
00078               (hash) += ((hash) << 5) ^ ptr_[i_]; \
00079 } while(0)
00080 
00081 
00082 /* Task for a thread to perform */
00083 typedef struct ldap_int_thread_task_s {
00084        union {
00085               LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
00086               LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
00087        } ltt_next;
00088        ldap_pvt_thread_start_t *ltt_start_routine;
00089        void *ltt_arg;
00090 } ldap_int_thread_task_t;
00091 
00092 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
00093 
00094 struct ldap_int_thread_pool_s {
00095        LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
00096 
00097        /* protect members below, and protect thread_keys[] during pauses */
00098        ldap_pvt_thread_mutex_t ltp_mutex;
00099 
00100        /* not paused and something to do for pool_<wrapper/pause/destroy>() */
00101        ldap_pvt_thread_cond_t ltp_cond;
00102 
00103        /* ltp_active_count <= 1 && ltp_pause */
00104        ldap_pvt_thread_cond_t ltp_pcond;
00105 
00106        /* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
00107         * maintaned to reduce work for pool_wrapper()
00108         */
00109        ldap_int_tpool_plist_t *ltp_work_list;
00110 
00111        /* pending tasks, and unused task objects */
00112        ldap_int_tpool_plist_t ltp_pending_list;
00113        LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
00114 
00115        /* The pool is finishing, waiting for its threads to close.
00116         * They close when ltp_pending_list is done.  pool_submit()
00117         * rejects new tasks.  ltp_max_pending = -(its old value).
00118         */
00119        int ltp_finishing;
00120 
00121        /* Some active task needs to be the sole active task.
00122         * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
00123         * Note: Pauses adjust ltp_<open_count/vary_open_count/work_list>,
00124         * so pool_<submit/wrapper>() mostly can avoid testing ltp_pause.
00125         */
00126        volatile sig_atomic_t ltp_pause;
00127 
00128        /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
00129        int ltp_max_count;
00130 
00131        /* Max pending + paused + idle tasks, negated when ltp_finishing */
00132        int ltp_max_pending;
00133 
00134        int ltp_pending_count;             /* Pending + paused + idle tasks */
00135        int ltp_active_count;              /* Active, not paused/idle tasks */
00136        int ltp_open_count;                /* Number of threads, negated when ltp_pause */
00137        int ltp_starting;                  /* Currenlty starting threads */
00138 
00139        /* >0 if paused or we may open a thread, <0 if we should close a thread.
00140         * Updated when ltp_<finishing/pause/max_count/open_count> change.
00141         * Maintained to reduce the time ltp_mutex must be locked in
00142         * ldap_pvt_thread_pool_<submit/wrapper>().
00143         */
00144        int ltp_vary_open_count;
00145 #      define SET_VARY_OPEN_COUNT(pool)   \
00146               ((pool)->ltp_vary_open_count =     \
00147                (pool)->ltp_pause      ?  1 :     \
00148                (pool)->ltp_finishing  ? -1 :     \
00149                ((pool)->ltp_max_count ? (pool)->ltp_max_count : LDAP_MAXTHR) \
00150                - (pool)->ltp_open_count)
00151 };
00152 
00153 static ldap_int_tpool_plist_t empty_pending_list =
00154        LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
00155 
00156 static int ldap_int_has_thread_pool = 0;
00157 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
00158        ldap_int_thread_pool_list =
00159        LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
00160 
00161 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
00162 
00163 static void *ldap_int_thread_pool_wrapper( void *pool );
00164 
00165 static ldap_pvt_thread_key_t       ldap_tpool_key;
00166 
00167 /* Context of the main thread */
00168 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
00169 
00170 int
00171 ldap_int_thread_pool_startup ( void )
00172 {
00173        ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
00174        ldap_pvt_thread_key_create( &ldap_tpool_key );
00175        return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
00176 }
00177 
00178 int
00179 ldap_int_thread_pool_shutdown ( void )
00180 {
00181        struct ldap_int_thread_pool_s *pool;
00182 
00183        while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
00184               (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
00185        }
00186        ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
00187        ldap_pvt_thread_key_destroy( ldap_tpool_key );
00188        return(0);
00189 }
00190 
00191 
00192 /* Create a thread pool */
00193 int
00194 ldap_pvt_thread_pool_init (
00195        ldap_pvt_thread_pool_t *tpool,
00196        int max_threads,
00197        int max_pending )
00198 {
00199        ldap_pvt_thread_pool_t pool;
00200        int rc;
00201 
00202        /* multiple pools are currently not supported (ITS#4943) */
00203        assert(!ldap_int_has_thread_pool);
00204 
00205        if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
00206               max_threads = 0;
00207        if (! (1 <= max_pending && max_pending <= MAX_PENDING))
00208               max_pending = MAX_PENDING;
00209 
00210        *tpool = NULL;
00211        pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
00212               sizeof(struct ldap_int_thread_pool_s));
00213 
00214        if (pool == NULL) return(-1);
00215 
00216        rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
00217        if (rc != 0)
00218               return(rc);
00219        rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
00220        if (rc != 0)
00221               return(rc);
00222        rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
00223        if (rc != 0)
00224               return(rc);
00225 
00226        ldap_int_has_thread_pool = 1;
00227 
00228        pool->ltp_max_count = max_threads;
00229        SET_VARY_OPEN_COUNT(pool);
00230        pool->ltp_max_pending = max_pending;
00231 
00232        LDAP_STAILQ_INIT(&pool->ltp_pending_list);
00233        pool->ltp_work_list = &pool->ltp_pending_list;
00234        LDAP_SLIST_INIT(&pool->ltp_free_list);
00235 
00236        ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
00237        LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
00238        ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
00239 
00240        /* Start no threads just yet.  That can break if the process forks
00241         * later, as slapd does in order to daemonize.  On at least POSIX,
00242         * only the forking thread would survive in the child.  Yet fork()
00243         * can't unlock/clean up other threads' locks and data structures,
00244         * unless pthread_atfork() handlers have been set up to do so.
00245         */
00246 
00247        *tpool = pool;
00248        return(0);
00249 }
00250 
00251 
00252 /* Submit a task to be performed by the thread pool */
00253 int
00254 ldap_pvt_thread_pool_submit (
00255        ldap_pvt_thread_pool_t *tpool,
00256        ldap_pvt_thread_start_t *start_routine, void *arg )
00257 {
00258        struct ldap_int_thread_pool_s *pool;
00259        ldap_int_thread_task_t *task;
00260        ldap_pvt_thread_t thr;
00261 
00262        if (tpool == NULL)
00263               return(-1);
00264 
00265        pool = *tpool;
00266 
00267        if (pool == NULL)
00268               return(-1);
00269 
00270        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00271 
00272        if (pool->ltp_pending_count >= pool->ltp_max_pending)
00273               goto failed;
00274 
00275        task = LDAP_SLIST_FIRST(&pool->ltp_free_list);
00276        if (task) {
00277               LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
00278        } else {
00279               task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
00280               if (task == NULL)
00281                      goto failed;
00282        }
00283 
00284        task->ltt_start_routine = start_routine;
00285        task->ltt_arg = arg;
00286 
00287        pool->ltp_pending_count++;
00288        LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, task, ltt_next.q);
00289 
00290        /* true if ltp_pause != 0 or we should open (create) a thread */
00291        if (pool->ltp_vary_open_count > 0 &&
00292               pool->ltp_open_count < pool->ltp_active_count+pool->ltp_pending_count)
00293        {
00294               if (pool->ltp_pause)
00295                      goto done;
00296 
00297               pool->ltp_starting++;
00298               pool->ltp_open_count++;
00299               SET_VARY_OPEN_COUNT(pool);
00300 
00301               if (0 != ldap_pvt_thread_create(
00302                      &thr, 1, ldap_int_thread_pool_wrapper, pool))
00303               {
00304                      /* couldn't create thread.  back out of
00305                       * ltp_open_count and check for even worse things.
00306                       */
00307                      pool->ltp_starting--;
00308                      pool->ltp_open_count--;
00309                      SET_VARY_OPEN_COUNT(pool);
00310 
00311                      if (pool->ltp_open_count == 0) {
00312                             /* no open threads at all?!?
00313                              */
00314                             ldap_int_thread_task_t *ptr;
00315 
00316                             /* let pool_destroy know there are no more threads */
00317                             ldap_pvt_thread_cond_signal(&pool->ltp_cond);
00318 
00319                             LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltt_next.q)
00320                                    if (ptr == task) break;
00321                             if (ptr == task) {
00322                                    /* no open threads, task not handled, so
00323                                     * back out of ltp_pending_count, free the task,
00324                                     * report the error.
00325                                     */
00326                                    pool->ltp_pending_count--;
00327                                    LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, task,
00328                                           ldap_int_thread_task_s, ltt_next.q);
00329                                    LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task,
00330                                           ltt_next.l);
00331                                    goto failed;
00332                             }
00333                      }
00334                      /* there is another open thread, so this
00335                       * task will be handled eventually.
00336                       */
00337               }
00338        }
00339        ldap_pvt_thread_cond_signal(&pool->ltp_cond);
00340 
00341  done:
00342        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00343        return(0);
00344 
00345  failed:
00346        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00347        return(-1);
00348 }
00349 
00350 static void *
00351 no_task( void *ctx, void *arg )
00352 {
00353        return NULL;
00354 }
00355 
00356 /* Cancel a pending task that was previously submitted.
00357  * Return 1 if the task was successfully cancelled, 0 if
00358  * not found, -1 for invalid parameters
00359  */
00360 int
00361 ldap_pvt_thread_pool_retract (
00362        ldap_pvt_thread_pool_t *tpool,
00363        ldap_pvt_thread_start_t *start_routine, void *arg )
00364 {
00365        struct ldap_int_thread_pool_s *pool;
00366        ldap_int_thread_task_t *task;
00367 
00368        if (tpool == NULL)
00369               return(-1);
00370 
00371        pool = *tpool;
00372 
00373        if (pool == NULL)
00374               return(-1);
00375 
00376        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00377        LDAP_STAILQ_FOREACH(task, &pool->ltp_pending_list, ltt_next.q)
00378               if (task->ltt_start_routine == start_routine &&
00379                      task->ltt_arg == arg) {
00380                      /* Could LDAP_STAILQ_REMOVE the task, but that
00381                       * walks ltp_pending_list again to find it.
00382                       */
00383                      task->ltt_start_routine = no_task;
00384                      task->ltt_arg = NULL;
00385                      break;
00386               }
00387        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00388        return task != NULL;
00389 }
00390 
00391 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
00392 int
00393 ldap_pvt_thread_pool_maxthreads(
00394        ldap_pvt_thread_pool_t *tpool,
00395        int max_threads )
00396 {
00397        struct ldap_int_thread_pool_s *pool;
00398 
00399        if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
00400               max_threads = 0;
00401 
00402        if (tpool == NULL)
00403               return(-1);
00404 
00405        pool = *tpool;
00406 
00407        if (pool == NULL)
00408               return(-1);
00409 
00410        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00411 
00412        pool->ltp_max_count = max_threads;
00413        SET_VARY_OPEN_COUNT(pool);
00414 
00415        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00416        return(0);
00417 }
00418 
00419 /* Inspect the pool */
00420 int
00421 ldap_pvt_thread_pool_query(
00422        ldap_pvt_thread_pool_t *tpool,
00423        ldap_pvt_thread_pool_param_t param,
00424        void *value )
00425 {
00426        struct ldap_int_thread_pool_s      *pool;
00427        int                         count = -1;
00428 
00429        if ( tpool == NULL || value == NULL ) {
00430               return -1;
00431        }
00432 
00433        pool = *tpool;
00434 
00435        if ( pool == NULL ) {
00436               return 0;
00437        }
00438 
00439        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00440        switch ( param ) {
00441        case LDAP_PVT_THREAD_POOL_PARAM_MAX:
00442               count = pool->ltp_max_count;
00443               break;
00444 
00445        case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
00446               count = pool->ltp_max_pending;
00447               if (count < 0)
00448                      count = -count;
00449               if (count == MAX_PENDING)
00450                      count = 0;
00451               break;
00452 
00453        case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
00454               count = pool->ltp_open_count;
00455               if (count < 0)
00456                      count = -count;
00457               break;
00458 
00459        case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
00460               count = pool->ltp_starting;
00461               break;
00462 
00463        case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
00464               count = pool->ltp_active_count;
00465               break;
00466 
00467        case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
00468               count = (pool->ltp_pause != 0);
00469               break;
00470 
00471        case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
00472               count = pool->ltp_pending_count;
00473               break;
00474 
00475        case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
00476               count = pool->ltp_pending_count + pool->ltp_active_count;
00477               break;
00478 
00479        case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
00480               break;
00481 
00482        case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
00483               break;
00484 
00485        case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
00486               break;
00487 
00488        case LDAP_PVT_THREAD_POOL_PARAM_STATE:
00489               *((char **)value) =
00490                      pool->ltp_pause ? "pausing" :
00491                      !pool->ltp_finishing ? "running" :
00492                      pool->ltp_pending_count ? "finishing" : "stopping";
00493               break;
00494 
00495        case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
00496               break;
00497        }
00498        ldap_pvt_thread_mutex_unlock( &pool->ltp_mutex );
00499 
00500        if ( count > -1 ) {
00501               *((int *)value) = count;
00502        }
00503 
00504        return ( count == -1 ? -1 : 0 );
00505 }
00506 
00507 /*
00508  * true if pool is pausing; does not lock any mutex to check.
00509  * 0 if not pause, 1 if pause, -1 if error or no pool.
00510  */
00511 int
00512 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
00513 {
00514        int rc = -1;
00515        struct ldap_int_thread_pool_s *pool;
00516 
00517        if ( tpool != NULL && (pool = *tpool) != NULL ) {
00518               rc = (pool->ltp_pause != 0);
00519        }
00520 
00521        return rc;
00522 }
00523 
00524 /*
00525  * wrapper for ldap_pvt_thread_pool_query(), left around
00526  * for backwards compatibility
00527  */
00528 int
00529 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
00530 {
00531        int    rc, count;
00532 
00533        rc = ldap_pvt_thread_pool_query( tpool,
00534               LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
00535 
00536        if ( rc == 0 ) {
00537               return count;
00538        }
00539 
00540        return rc;
00541 }
00542 
00543 /* Destroy the pool after making its threads finish */
00544 int
00545 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
00546 {
00547        struct ldap_int_thread_pool_s *pool, *pptr;
00548        ldap_int_thread_task_t *task;
00549 
00550        if (tpool == NULL)
00551               return(-1);
00552 
00553        pool = *tpool;
00554 
00555        if (pool == NULL) return(-1);
00556 
00557        ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
00558        LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
00559               if (pptr == pool) break;
00560        if (pptr == pool)
00561               LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
00562                      ldap_int_thread_pool_s, ltp_next);
00563        ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
00564 
00565        if (pool != pptr) return(-1);
00566 
00567        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00568 
00569        pool->ltp_finishing = 1;
00570        SET_VARY_OPEN_COUNT(pool);
00571        if (pool->ltp_max_pending > 0)
00572               pool->ltp_max_pending = -pool->ltp_max_pending;
00573 
00574        if (!run_pending) {
00575               while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) {
00576                      LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
00577                      LDAP_FREE(task);
00578               }
00579               pool->ltp_pending_count = 0;
00580        }
00581 
00582        while (pool->ltp_open_count) {
00583               if (!pool->ltp_pause)
00584                      ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
00585               ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
00586        }
00587 
00588        while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
00589        {
00590               LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
00591               LDAP_FREE(task);
00592        }
00593 
00594        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00595        ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
00596        ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
00597        ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
00598        LDAP_FREE(pool);
00599        *tpool = NULL;
00600        ldap_int_has_thread_pool = 0;
00601        return(0);
00602 }
00603 
00604 /* Thread loop.  Accept and handle submitted tasks. */
00605 static void *
00606 ldap_int_thread_pool_wrapper ( 
00607        void *xpool )
00608 {
00609        struct ldap_int_thread_pool_s *pool = xpool;
00610        ldap_int_thread_task_t *task;
00611        ldap_int_tpool_plist_t *work_list;
00612        ldap_int_thread_userctx_t ctx, *kctx;
00613        unsigned i, keyslot, hash;
00614 
00615        assert(pool != NULL);
00616 
00617        for ( i=0; i<MAXKEYS; i++ ) {
00618               ctx.ltu_key[i].ltk_key = NULL;
00619        }
00620 
00621        ctx.ltu_id = ldap_pvt_thread_self();
00622        TID_HASH(ctx.ltu_id, hash);
00623 
00624        ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
00625 
00626        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00627 
00628        /* thread_keys[] is read-only when paused */
00629        while (pool->ltp_pause)
00630               ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
00631 
00632        /* find a key slot to give this thread ID and store a
00633         * pointer to our keys there; start at the thread ID
00634         * itself (mod LDAP_MAXTHR) and look for an empty slot.
00635         */
00636        ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
00637        for (keyslot = hash & (LDAP_MAXTHR-1);
00638               (kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
00639               keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
00640        thread_keys[keyslot].ctx = &ctx;
00641        ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
00642 
00643        pool->ltp_starting--;
00644        pool->ltp_active_count++;
00645 
00646        for (;;) {
00647               work_list = pool->ltp_work_list; /* help the compiler a bit */
00648               task = LDAP_STAILQ_FIRST(work_list);
00649               if (task == NULL) {  /* paused or no pending tasks */
00650                      if (--(pool->ltp_active_count) < 2) {
00651                             /* Notify pool_pause it is the sole active thread. */
00652                             ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
00653                      }
00654 
00655                      do {
00656                             if (pool->ltp_vary_open_count < 0) {
00657                                    /* Not paused, and either finishing or too many
00658                                     * threads running (can happen if ltp_max_count
00659                                     * was reduced).  Let this thread die.
00660                                     */
00661                                    goto done;
00662                             }
00663 
00664                             /* We could check an idle timer here, and let the
00665                              * thread die if it has been inactive for a while.
00666                              * Only die if there are other open threads (i.e.,
00667                              * always have at least one thread open).
00668                              * The check should be like this:
00669                              *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
00670                              *       check timer, wait if ltp_pause, leave thread;
00671                              *
00672                              * Just use pthread_cond_timedwait() if we want to
00673                              * check idle time.
00674                              */
00675                             ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
00676 
00677                             work_list = pool->ltp_work_list;
00678                             task = LDAP_STAILQ_FIRST(work_list);
00679                      } while (task == NULL);
00680 
00681                      pool->ltp_active_count++;
00682               }
00683 
00684               LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
00685               pool->ltp_pending_count--;
00686               ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00687 
00688               task->ltt_start_routine(&ctx, task->ltt_arg);
00689 
00690               ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00691               LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task, ltt_next.l);
00692        }
00693  done:
00694 
00695        assert(!pool->ltp_pause); /* thread_keys writable, ltp_open_count >= 0 */
00696 
00697        /* The ltp_mutex lock protects ctx->ltu_key from pool_purgekey()
00698         * during this call, since it prevents new pauses. */
00699        ldap_pvt_thread_pool_context_reset(&ctx);
00700 
00701        ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
00702        thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
00703        ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
00704 
00705        pool->ltp_open_count--;
00706        SET_VARY_OPEN_COUNT(pool);
00707        /* let pool_destroy know we're all done */
00708        if (pool->ltp_open_count == 0)
00709               ldap_pvt_thread_cond_signal(&pool->ltp_cond);
00710 
00711        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00712 
00713        ldap_pvt_thread_exit(NULL);
00714        return(NULL);
00715 }
00716 
00717 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
00718  * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
00719  */
00720 #define GO_IDLE             8
00721 #define GO_UNIDLE    16
00722 #define CHECK_PAUSE  32     /* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
00723 #define DO_PAUSE     64     /* CHECK_PAUSE; pause the pool */
00724 #define PAUSE_ARG(a) \
00725               ((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
00726 
00727 static int
00728 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
00729 {
00730        struct ldap_int_thread_pool_s *pool;
00731        int ret = 0, pause, max_ltp_pause;
00732 
00733        if (tpool == NULL)
00734               return(-1);
00735 
00736        pool = *tpool;
00737 
00738        if (pool == NULL)
00739               return(0);
00740 
00741        if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
00742               return(0);
00743 
00744        /* Let pool_unidle() ignore requests for new pauses */
00745        max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
00746 
00747        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00748 
00749        pause = pool->ltp_pause;    /* NOT_PAUSED, WANT_PAUSE or PAUSED */
00750 
00751        /* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
00752        pause_type -= pause;
00753 
00754        if (pause_type & GO_IDLE) {
00755               pool->ltp_pending_count++;
00756               pool->ltp_active_count--;
00757               if (pause && pool->ltp_active_count < 2) {
00758                      /* Tell the task waiting to DO_PAUSE it can proceed */
00759                      ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
00760               }
00761        }
00762 
00763        if (pause_type & GO_UNIDLE) {
00764               /* Wait out pause if any, then cancel GO_IDLE */
00765               if (pause > max_ltp_pause) {
00766                      ret = 1;
00767                      do {
00768                             ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
00769                      } while (pool->ltp_pause > max_ltp_pause);
00770               }
00771               pool->ltp_pending_count--;
00772               pool->ltp_active_count++;
00773        }
00774 
00775        if (pause_type & DO_PAUSE) {
00776               /* Tell everyone else to pause or finish, then await that */
00777               ret = 0;
00778               assert(!pool->ltp_pause);
00779               pool->ltp_pause = WANT_PAUSE;
00780               /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
00781                * and do not finish threads in ldap_pvt_thread_pool_wrapper() */
00782               pool->ltp_open_count = -pool->ltp_open_count;
00783               SET_VARY_OPEN_COUNT(pool);
00784               /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
00785               pool->ltp_work_list = &empty_pending_list;
00786               /* Wait for this task to become the sole active task */
00787               while (pool->ltp_active_count > 1) {
00788                      ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
00789               }
00790               assert(pool->ltp_pause == WANT_PAUSE);
00791               pool->ltp_pause = PAUSED;
00792        }
00793 
00794        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00795        return(ret);
00796 }
00797 
00798 /* Consider this task idle: It will not block pool_pause() in other tasks. */
00799 void
00800 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
00801 {
00802        handle_pause(tpool, PAUSE_ARG(GO_IDLE));
00803 }
00804 
00805 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
00806 void
00807 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
00808 {
00809        handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
00810 }
00811 
00812 /*
00813  * If a pause was requested, wait for it.  If several threads
00814  * are waiting to pause, let through one or more pauses.
00815  * The calling task must be active, not idle.
00816  * Return 1 if we waited, 0 if not, -1 at parameter error.
00817  */
00818 int
00819 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
00820 {
00821        return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
00822 }
00823 
00824 /*
00825  * Pause the pool.  The calling task must be active, not idle.
00826  * Return when all other tasks are paused or idle.
00827  */
00828 int
00829 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
00830 {
00831        return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
00832 }
00833 
00834 /* End a pause */
00835 int
00836 ldap_pvt_thread_pool_resume ( 
00837        ldap_pvt_thread_pool_t *tpool )
00838 {
00839        struct ldap_int_thread_pool_s *pool;
00840 
00841        if (tpool == NULL)
00842               return(-1);
00843 
00844        pool = *tpool;
00845 
00846        if (pool == NULL)
00847               return(0);
00848 
00849        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
00850 
00851        assert(pool->ltp_pause == PAUSED);
00852        pool->ltp_pause = 0;
00853        if (pool->ltp_open_count <= 0) /* true when paused, but be paranoid */
00854               pool->ltp_open_count = -pool->ltp_open_count;
00855        SET_VARY_OPEN_COUNT(pool);
00856        pool->ltp_work_list = &pool->ltp_pending_list;
00857 
00858        ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
00859 
00860        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
00861        return(0);
00862 }
00863 
00864 /*
00865  * Get the key's data and optionally free function in the given context.
00866  */
00867 int ldap_pvt_thread_pool_getkey(
00868        void *xctx,
00869        void *key,
00870        void **data,
00871        ldap_pvt_thread_pool_keyfree_t **kfree )
00872 {
00873        ldap_int_thread_userctx_t *ctx = xctx;
00874        int i;
00875 
00876        if ( !ctx || !key || !data ) return EINVAL;
00877 
00878        for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
00879               if ( ctx->ltu_key[i].ltk_key == key ) {
00880                      *data = ctx->ltu_key[i].ltk_data;
00881                      if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
00882                      return 0;
00883               }
00884        }
00885        return ENOENT;
00886 }
00887 
00888 static void
00889 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
00890 {
00891        for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
00892               ctx->ltu_key[i] = ctx->ltu_key[i+1];
00893        ctx->ltu_key[i].ltk_key = NULL;
00894 }
00895 
00896 /*
00897  * Set or remove data for the key in the given context.
00898  * key can be any unique pointer.
00899  * kfree() is an optional function to free the data (but not the key):
00900  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
00901  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
00902  *   responsibility to free any existing data with the same key.
00903  *   kfree() must not call functions taking a tpool argument.
00904  */
00905 int ldap_pvt_thread_pool_setkey(
00906        void *xctx,
00907        void *key,
00908        void *data,
00909        ldap_pvt_thread_pool_keyfree_t *kfree,
00910        void **olddatap,
00911        ldap_pvt_thread_pool_keyfree_t **oldkfreep )
00912 {
00913        ldap_int_thread_userctx_t *ctx = xctx;
00914        int i, found;
00915 
00916        if ( !ctx || !key ) return EINVAL;
00917 
00918        for ( i=found=0; i<MAXKEYS; i++ ) {
00919               if ( ctx->ltu_key[i].ltk_key == key ) {
00920                      found = 1;
00921                      break;
00922               } else if ( !ctx->ltu_key[i].ltk_key ) {
00923                      break;
00924               }
00925        }
00926 
00927        if ( olddatap ) {
00928               if ( found ) {
00929                      *olddatap = ctx->ltu_key[i].ltk_data;
00930               } else {
00931                      *olddatap = NULL;
00932               }
00933        }
00934 
00935        if ( oldkfreep ) {
00936               if ( found ) {
00937                      *oldkfreep = ctx->ltu_key[i].ltk_free;
00938               } else {
00939                      *oldkfreep = 0;
00940               }
00941        }
00942 
00943        if ( data || kfree ) {
00944               if ( i>=MAXKEYS )
00945                      return ENOMEM;
00946               ctx->ltu_key[i].ltk_key = key;
00947               ctx->ltu_key[i].ltk_data = data;
00948               ctx->ltu_key[i].ltk_free = kfree;
00949        } else if ( found ) {
00950               clear_key_idx( ctx, i );
00951        }
00952 
00953        return 0;
00954 }
00955 
00956 /* Free all elements with this key, no matter which thread they're in.
00957  * May only be called while the pool is paused.
00958  */
00959 void ldap_pvt_thread_pool_purgekey( void *key )
00960 {
00961        int i, j;
00962        ldap_int_thread_userctx_t *ctx;
00963 
00964        assert ( key != NULL );
00965 
00966        for ( i=0; i<LDAP_MAXTHR; i++ ) {
00967               ctx = thread_keys[i].ctx;
00968               if ( ctx && ctx != DELETED_THREAD_CTX ) {
00969                      for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
00970                             if ( ctx->ltu_key[j].ltk_key == key ) {
00971                                    if (ctx->ltu_key[j].ltk_free)
00972                                           ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
00973                                           ctx->ltu_key[j].ltk_data );
00974                                    clear_key_idx( ctx, j );
00975                                    break;
00976                             }
00977                      }
00978               }
00979        }
00980 }
00981 
00982 /*
00983  * Find the context of the current thread.
00984  * This is necessary if the caller does not have access to the
00985  * thread context handle (for example, a slapd plugin calling
00986  * slapi_search_internal()). No doubt it is more efficient
00987  * for the application to keep track of the thread context
00988  * handles itself.
00989  */
00990 void *ldap_pvt_thread_pool_context( )
00991 {
00992        void *ctx = NULL;
00993 
00994        ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
00995        return ctx ? ctx : (void *) &ldap_int_main_thrctx;
00996 }
00997 
00998 /*
00999  * Free the context's keys.
01000  * Must not call functions taking a tpool argument (because this
01001  * thread already holds ltp_mutex when called from pool_wrapper()).
01002  */
01003 void ldap_pvt_thread_pool_context_reset( void *vctx )
01004 {
01005        ldap_int_thread_userctx_t *ctx = vctx;
01006        int i;
01007 
01008        for ( i=MAXKEYS-1; i>=0; i--) {
01009               if ( !ctx->ltu_key[i].ltk_key )
01010                      continue;
01011               if ( ctx->ltu_key[i].ltk_free )
01012                      ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
01013                      ctx->ltu_key[i].ltk_data );
01014               ctx->ltu_key[i].ltk_key = NULL;
01015        }
01016 }
01017 
01018 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
01019 {
01020        ldap_int_thread_userctx_t *ctx = vctx;
01021 
01022        return ctx->ltu_id;
01023 }
01024 #endif /* LDAP_THREAD_HAVE_TPOOL */