• Main Page
  • Data Structures
  • Files
  • File List
  • Globals

/build/buildd-opendnssec_1.3.2-1~bpo60+1-s390-eF9Mr1/opendnssec-1.3.2/signer/src/daemon/worker.c

Go to the documentation of this file.
00001 /*
00002  * $Id: worker.c 5432 2011-08-22 12:55:04Z matthijs $
00003  *
00004  * Copyright (c) 2009 NLNet Labs. All rights reserved.
00005  *
00006  * Redistribution and use in source and binary forms, with or without
00007  * modification, are permitted provided that the following conditions
00008  * are met:
00009  * 1. Redistributions of source code must retain the above copyright
00010  *    notice, this list of conditions and the following disclaimer.
00011  * 2. Redistributions in binary form must reproduce the above copyright
00012  *    notice, this list of conditions and the following disclaimer in the
00013  *    documentation and/or other materials provided with the distribution.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
00016  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00017  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
00019  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00020  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
00021  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
00023  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
00024  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
00025  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  *
00027  */
00028 
00034 #include "adapter/adapi.h"
00035 #include "daemon/engine.h"
00036 #include "daemon/worker.h"
00037 #include "shared/allocator.h"
00038 #include "scheduler/schedule.h"
00039 #include "scheduler/task.h"
00040 #include "shared/locks.h"
00041 #include "shared/log.h"
00042 #include "shared/status.h"
00043 #include "shared/util.h"
00044 #include "signer/tools.h"
00045 #include "signer/zone.h"
00046 #include "signer/zonedata.h"
00047 
00048 #include <time.h> /* time() */
00049 
00050 ods_lookup_table worker_str[] = {
00051     { WORKER_WORKER, "worker" },
00052     { WORKER_DRUDGER, "drudger" },
00053     { 0, NULL }
00054 };
00055 
00056 
00061 worker_type*
00062 worker_create(allocator_type* allocator, int num, worker_id type)
00063 {
00064     worker_type* worker;
00065 
00066     if (!allocator) {
00067         return NULL;
00068     }
00069     ods_log_assert(allocator);
00070 
00071     worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
00072     if (!worker) {
00073         return NULL;
00074     }
00075 
00076     ods_log_debug("create worker[%i]", num +1);
00077     lock_basic_init(&worker->worker_lock);
00078     lock_basic_set(&worker->worker_alarm);
00079     lock_basic_lock(&worker->worker_lock);
00080     worker->allocator = allocator;
00081     worker->thread_num = num +1;
00082     worker->engine = NULL;
00083     worker->task = NULL;
00084     worker->working_with = TASK_NONE;
00085     worker->need_to_exit = 0;
00086     worker->type = type;
00087     worker->clock_in = 0;
00088     worker->jobs_appointed = 0;
00089     worker->jobs_completed = 0;
00090     worker->jobs_failed = 0;
00091     worker->sleeping = 0;
00092     worker->waiting = 0;
00093     lock_basic_unlock(&worker->worker_lock);
00094     return worker;
00095 }
00096 
00097 
00102 static const char*
00103 worker2str(worker_id type)
00104 {
00105     ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
00106     if (lt) {
00107         return lt->name;
00108     }
00109     return NULL;
00110 }
00111 
00112 
00117 static int
00118 worker_fulfilled(worker_type* worker)
00119 {
00120     return (worker->jobs_completed + worker->jobs_failed) ==
00121         worker->jobs_appointed;
00122 }
00123 
00124 
00129 static void
00130 worker_perform_task(worker_type* worker)
00131 {
00132     engine_type* engine = NULL;
00133     zone_type* zone = NULL;
00134     task_type* task = NULL;
00135     task_id what = TASK_NONE;
00136     time_t when = 0;
00137     time_t never = (3600*24*365);
00138     ods_status status = ODS_STATUS_OK;
00139     int fallthrough = 0;
00140     int backup = 0;
00141     char* working_dir = NULL;
00142     char* cfg_filename = NULL;
00143     uint32_t tmpserial = 0;
00144     time_t start = 0;
00145     time_t end = 0;
00146 
00147     /* sanity checking */
00148     if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
00149         return;
00150     }
00151     ods_log_assert(worker);
00152     ods_log_assert(worker->task);
00153     ods_log_assert(worker->task->zone);
00154 
00155     engine = (engine_type*) worker->engine;
00156     task = (task_type*) worker->task;
00157     zone = (zone_type*) worker->task->zone;
00158     ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
00159        worker2str(worker->type), worker->thread_num, task_what2str(task->what),
00160        task_who2str(task->who), (uint32_t) worker->clock_in);
00161 
00162     /* do what you have been told to do */
00163     switch (task->what) {
00164         case TASK_SIGNCONF:
00165             worker->working_with = TASK_SIGNCONF;
00166             /* perform 'load signconf' task */
00167             ods_log_verbose("[%s[%i]] load signconf for zone %s",
00168                 worker2str(worker->type), worker->thread_num,
00169                 task_who2str(task->who));
00170             status = zone_load_signconf(zone, &what);
00171             if (status == ODS_STATUS_UNCHANGED) {
00172                 if (!zone->signconf->last_modified) {
00173                     ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
00174                         worker2str(worker->type), worker->thread_num,
00175                         task_who2str(task->who));
00176                 }
00177                 status = ODS_STATUS_ERR;
00178             }
00179 
00180             /* what to do next */
00181             when = time_now();
00182             if (status == ODS_STATUS_UNCHANGED) {
00183                 if (task->halted != TASK_NONE) {
00184                     goto task_perform_continue;
00185                 } else {
00186                     status = ODS_STATUS_OK;
00187                 }
00188             }
00189 
00190             if (status == ODS_STATUS_OK) {
00191                 status = zone_publish_dnskeys(zone, 0);
00192             }
00193             if (status == ODS_STATUS_OK) {
00194                 status = zone_prepare_nsec3(zone, 0);
00195             }
00196             if (status == ODS_STATUS_OK) {
00197                 status = zonedata_commit(zone->zonedata);
00198             }
00199 
00200             if (status == ODS_STATUS_OK) {
00201                 zone->prepared = 1;
00202                 task->interrupt = TASK_NONE;
00203                 task->halted = TASK_NONE;
00204             } else {
00205                 if (task->halted == TASK_NONE) {
00206                     goto task_perform_fail;
00207                 }
00208                 goto task_perform_continue;
00209             }
00210             fallthrough = 0;
00211             break;
00212         case TASK_READ:
00213             worker->working_with = TASK_READ;
00214             /* perform 'read input adapter' task */
00215             ods_log_verbose("[%s[%i]] read zone %s",
00216                 worker2str(worker->type), worker->thread_num,
00217                 task_who2str(task->who));
00218             if (!zone->prepared) {
00219                 ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet",
00220                     worker2str(worker->type), worker->thread_num,
00221                     task_who2str(task->who));
00222                 status = ODS_STATUS_ERR;
00223             } else {
00224                 status = tools_input(zone);
00225             }
00226 
00227             /* what to do next */
00228             what = TASK_NSECIFY;
00229             when = time_now();
00230             if (status != ODS_STATUS_OK) {
00231                 if (task->halted == TASK_NONE) {
00232                     goto task_perform_fail;
00233                 }
00234                 goto task_perform_continue;
00235             }
00236             fallthrough = 1;
00237         case TASK_NSECIFY:
00238             worker->working_with = TASK_NSECIFY;
00239             ods_log_verbose("[%s[%i]] nsecify zone %s",
00240                 worker2str(worker->type), worker->thread_num,
00241                 task_who2str(task->who));
00242             status = tools_nsecify(zone);
00243 
00244             /* what to do next */
00245             what = TASK_SIGN;
00246             when = time_now();
00247             if (status == ODS_STATUS_OK) {
00248                 if (task->interrupt > TASK_SIGNCONF) {
00249                     task->interrupt = TASK_NONE;
00250                     task->halted = TASK_NONE;
00251                 }
00252             } else {
00253                 if (task->halted == TASK_NONE) {
00254                     goto task_perform_fail;
00255                 }
00256                 goto task_perform_continue;
00257             }
00258             fallthrough = 1;
00259         case TASK_SIGN:
00260             worker->working_with = TASK_SIGN;
00261             ods_log_verbose("[%s[%i]] sign zone %s",
00262                 worker2str(worker->type), worker->thread_num,
00263                 task_who2str(task->who));
00264             tmpserial = zone->zonedata->internal_serial;
00265             status = zone_update_serial(zone);
00266             if (status != ODS_STATUS_OK) {
00267                 ods_log_error("[%s[%i]] unable to sign zone %s: "
00268                     "failed to increment serial",
00269                     worker2str(worker->type), worker->thread_num,
00270                     task_who2str(task->who));
00271             } else {
00272                 /* start timer */
00273                 start = time(NULL);
00274                 if (zone->stats) {
00275                     lock_basic_lock(&zone->stats->stats_lock);
00276                     if (!zone->stats->start_time) {
00277                         zone->stats->start_time = start;
00278                     }
00279                     zone->stats->sig_count = 0;
00280                     zone->stats->sig_soa_count = 0;
00281                     zone->stats->sig_reuse = 0;
00282                     zone->stats->sig_time = 0;
00283                     lock_basic_unlock(&zone->stats->stats_lock);
00284                 }
00285 
00286                 /* queue menial, hard signing work */
00287                 status = zonedata_queue(zone->zonedata, engine->signq, worker);
00288                 ods_log_debug("[%s[%i]] wait until drudgers are finished "
00289                     " signing zone %s, %u signatures queued",
00290                     worker2str(worker->type), worker->thread_num,
00291                     task_who2str(task->who), worker->jobs_appointed);
00292 
00293                 /* sleep until work is done */
00294                 if (!worker->need_to_exit) {
00295                     worker_sleep_unless(worker, 0);
00296                 }
00297                 if (worker->jobs_failed) {
00298                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00299                         "signatures failed", worker2str(worker->type),
00300                         worker->thread_num, task_who2str(task->who),
00301                         worker->jobs_failed, worker->jobs_appointed);
00302                     status = ODS_STATUS_ERR;
00303                 } else if (!worker_fulfilled(worker)) {
00304                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00305                         "signatures completed", worker2str(worker->type),
00306                         worker->thread_num, task_who2str(task->who),
00307                         worker->jobs_completed, worker->jobs_appointed);
00308                     status = ODS_STATUS_ERR;
00309                 } else {
00310                     ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u "
00311                         "signatures succeeded", worker2str(worker->type),
00312                         worker->thread_num, task_who2str(task->who),
00313                         worker->jobs_completed, worker->jobs_appointed);
00314                     ods_log_assert(worker->jobs_appointed ==
00315                         worker->jobs_completed);
00316                 }
00317                 worker->jobs_appointed = 0;
00318                 worker->jobs_completed = 0;
00319                 worker->jobs_failed = 0;
00320 
00321                 /* stop timer */
00322                 end = time(NULL);
00323                 if (status == ODS_STATUS_OK && zone->stats) {
00324                     lock_basic_lock(&zone->stats->stats_lock);
00325                     zone->stats->sig_time = (end-start);
00326                     lock_basic_unlock(&zone->stats->stats_lock);
00327                 }
00328             }
00329 
00330             /* what to do next */
00331             if (status != ODS_STATUS_OK) {
00332                 /* rollback serial */
00333                 zone->zonedata->internal_serial = tmpserial;
00334                 if (task->halted == TASK_NONE) {
00335                     goto task_perform_fail;
00336                 }
00337                 goto task_perform_continue;
00338             } else {
00339                 if (task->interrupt > TASK_SIGNCONF) {
00340                     task->interrupt = TASK_NONE;
00341                     task->halted = TASK_NONE;
00342                 }
00343             }
00344             what = TASK_AUDIT;
00345             when = time_now();
00346             fallthrough = 1;
00347         case TASK_AUDIT:
00348             worker->working_with = TASK_AUDIT;
00349             if (zone->signconf->audit) {
00350                 ods_log_verbose("[%s[%i]] audit zone %s",
00351                     worker2str(worker->type), worker->thread_num,
00352                     task_who2str(task->who));
00353                 working_dir = strdup(engine->config->working_dir);
00354                 cfg_filename = strdup(engine->config->cfg_filename);
00355                 status = tools_audit(zone, working_dir, cfg_filename);
00356                 if (working_dir)  { free((void*)working_dir); }
00357                 if (cfg_filename) { free((void*)cfg_filename); }
00358                 working_dir = NULL;
00359                 cfg_filename = NULL;
00360             } else {
00361                 status = ODS_STATUS_OK;
00362             }
00363 
00364             /* what to do next */
00365             if (status != ODS_STATUS_OK) {
00366                 if (task->halted == TASK_NONE) {
00367                     goto task_perform_fail;
00368                 }
00369                 goto task_perform_continue;
00370             }
00371             what = TASK_WRITE;
00372             when = time_now();
00373             fallthrough = 1;
00374         case TASK_WRITE:
00375             worker->working_with = TASK_WRITE;
00376             ods_log_verbose("[%s[%i]] write zone %s",
00377                 worker2str(worker->type), worker->thread_num,
00378                 task_who2str(task->who));
00379 
00380             status = tools_output(zone);
00381             zone->processed = 1;
00382 
00383             /* what to do next */
00384             if (status != ODS_STATUS_OK) {
00385                 if (task->halted == TASK_NONE) {
00386                     goto task_perform_fail;
00387                 }
00388                 goto task_perform_continue;
00389             } else {
00390                 if (task->interrupt > TASK_SIGNCONF) {
00391                     task->interrupt = TASK_NONE;
00392                     task->halted = TASK_NONE;
00393                 }
00394             }
00395             if (duration2time(zone->signconf->sig_resign_interval)) {
00396                 what = TASK_SIGN;
00397                 when = time_now() +
00398                     duration2time(zone->signconf->sig_resign_interval);
00399             } else {
00400                 what = TASK_NONE;
00401                 when = time_now() + never;
00402             }
00403             backup = 1;
00404             fallthrough = 0;
00405             break;
00406         case TASK_NONE:
00407             worker->working_with = TASK_NONE;
00408             ods_log_warning("[%s[%i]] none task for zone %s",
00409                 worker2str(worker->type), worker->thread_num,
00410                 task_who2str(task->who));
00411             when = time_now() + never;
00412             fallthrough = 0;
00413             break;
00414         default:
00415             ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
00416                 worker2str(worker->type), worker->thread_num,
00417                 task_who2str(task->who));
00418             what = TASK_SIGNCONF;
00419             when = time_now();
00420             fallthrough = 0;
00421             break;
00422     }
00423 
00424     /* no error, reset backoff */
00425     task->backoff = 0;
00426 
00427     /* set next task */
00428     if (fallthrough == 0 && task->interrupt != TASK_NONE &&
00429         task->interrupt != what) {
00430         ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
00431             worker2str(worker->type), worker->thread_num,
00432             task_what2str(what), task_who2str(task->who));
00433 
00434         task->what = task->interrupt;
00435         task->when = time_now();
00436         task->halted = what;
00437     } else {
00438         ods_log_debug("[%s[%i]] next task %s for zone %s",
00439             worker2str(worker->type), worker->thread_num,
00440             task_what2str(what), task_who2str(task->who));
00441 
00442         task->what = what;
00443         task->when = when;
00444         if (!fallthrough) {
00445             task->interrupt = TASK_NONE;
00446             task->halted = TASK_NONE;
00447         }
00448     }
00449 
00450     /* backup the last successful run */
00451     if (backup) {
00452         status = zone_backup(zone);
00453         if (status != ODS_STATUS_OK) {
00454             ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
00455             worker2str(worker->type), worker->thread_num,
00456             task_who2str(task->who), ods_status2str(status));
00457             /* just a warning */
00458             status = ODS_STATUS_OK;
00459         }
00460         backup = 0;
00461     }
00462     return;
00463 
00464 task_perform_fail:
00465     /* in case of failure, also mark zone processed (for single run usage) */
00466     zone->processed = 1;
00467 
00468     if (task->backoff) {
00469         task->backoff *= 2;
00470         if (task->backoff > ODS_SE_MAX_BACKOFF) {
00471             task->backoff = ODS_SE_MAX_BACKOFF;
00472         }
00473     } else {
00474         task->backoff = 60;
00475     }
00476     ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
00477         worker2str(worker->type), worker->thread_num,
00478         task_what2str(task->what), task_who2str(task->who), task->backoff);
00479 
00480     task->when = time_now() + task->backoff;
00481     return;
00482 
00483 task_perform_continue:
00484     ods_log_info("[%s[%i]] continue task %s for zone %s",
00485         worker2str(worker->type), worker->thread_num,
00486         task_what2str(task->halted), task_who2str(task->who));
00487 
00488     what = task->halted;
00489     task->what = what;
00490     task->when = time_now();
00491     task->interrupt = TASK_NONE;
00492     task->halted = TASK_NONE;
00493     if (zone->processed) {
00494         task->when += duration2time(zone->signconf->sig_resign_interval);
00495     }
00496     return;
00497 }
00498 
00499 
00504 static void
00505 worker_work(worker_type* worker)
00506 {
00507     time_t now, timeout = 1;
00508     zone_type* zone = NULL;
00509     ods_status status = ODS_STATUS_OK;
00510 
00511     ods_log_assert(worker);
00512     ods_log_assert(worker->type == WORKER_WORKER);
00513 
00514     while (worker->need_to_exit == 0) {
00515         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00516             worker->thread_num);
00517         lock_basic_lock(&worker->engine->taskq->schedule_lock);
00518         /* [LOCK] schedule */
00519         worker->task = schedule_pop_task(worker->engine->taskq);
00520         /* [UNLOCK] schedule */
00521         if (worker->task) {
00522             worker->working_with = worker->task->what;
00523             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00524 
00525             zone = worker->task->zone;
00526             lock_basic_lock(&zone->zone_lock);
00527             /* [LOCK] zone */
00528             ods_log_debug("[%s[%i]] start working on zone %s",
00529                 worker2str(worker->type), worker->thread_num, zone->name);
00530 
00531             worker->clock_in = time(NULL);
00532             worker_perform_task(worker);
00533 
00534             zone->task = worker->task;
00535 
00536             ods_log_debug("[%s[%i]] finished working on zone %s",
00537                 worker2str(worker->type), worker->thread_num, zone->name);
00538             /* [UNLOCK] zone */
00539 
00540             lock_basic_lock(&worker->engine->taskq->schedule_lock);
00541             /* [LOCK] zone, schedule */
00542             worker->task = NULL;
00543             worker->working_with = TASK_NONE;
00544             status = schedule_task(worker->engine->taskq, zone->task, 1);
00545             /* [UNLOCK] zone, schedule */
00546             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00547             lock_basic_unlock(&zone->zone_lock);
00548 
00549             timeout = 1;
00550         } else {
00551             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00552                 worker->thread_num);
00553 
00554             /* [LOCK] schedule */
00555             worker->task = schedule_get_first_task(worker->engine->taskq);
00556             /* [UNLOCK] schedule */
00557             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00558 
00559             now = time_now();
00560             if (worker->task && !worker->engine->taskq->loading) {
00561                 timeout = (worker->task->when - now);
00562             } else {
00563                 timeout *= 2;
00564                 if (timeout > ODS_SE_MAX_BACKOFF) {
00565                     timeout = ODS_SE_MAX_BACKOFF;
00566                 }
00567             }
00568             worker->task = NULL;
00569             worker_sleep(worker, timeout);
00570         }
00571     }
00572     return;
00573 }
00574 
00575 
00580 static void
00581 worker_drudge(worker_type* worker)
00582 {
00583     zone_type* zone = NULL;
00584     task_type* task = NULL;
00585     rrset_type* rrset = NULL;
00586     ods_status status = ODS_STATUS_OK;
00587     worker_type* chief = NULL;
00588     hsm_ctx_t* ctx = NULL;
00589 
00590     ods_log_assert(worker);
00591     ods_log_assert(worker->type == WORKER_DRUDGER);
00592 
00593     ctx = hsm_create_context();
00594     if (ctx == NULL) {
00595         ods_log_error("[%s[%i]] unable to drudge: error "
00596             "creating libhsm context", worker2str(worker->type),
00597             worker->thread_num);
00598     }
00599 
00600     while (worker->need_to_exit == 0) {
00601         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00602             worker->thread_num);
00603         chief = NULL;
00604         zone = NULL;
00605         task = NULL;
00606 
00607         lock_basic_lock(&worker->engine->signq->q_lock);
00608         /* [LOCK] schedule */
00609         rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
00610         /* [UNLOCK] schedule */
00611         lock_basic_unlock(&worker->engine->signq->q_lock);
00612         if (rrset) {
00613             /* set up the work */
00614             if (chief) {
00615                 task = chief->task;
00616             }
00617             if (task) {
00618                 zone = task->zone;
00619             }
00620             if (!zone) {
00621                 ods_log_error("[%s[%i]] unable to drudge: no zone reference",
00622                     worker2str(worker->type), worker->thread_num);
00623             }
00624             if (zone && ctx) {
00625                 ods_log_assert(rrset);
00626                 ods_log_assert(zone);
00627                 ods_log_assert(zone->dname);
00628                 ods_log_assert(zone->signconf);
00629                 ods_log_assert(ctx);
00630 
00631                 worker->clock_in = time(NULL);
00632                 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
00633                     chief->clock_in, zone->stats);
00634             } else {
00635                 status = ODS_STATUS_ASSERT_ERR;
00636             }
00637 
00638             if (chief) {
00639                 lock_basic_lock(&chief->worker_lock);
00640                 if (status == ODS_STATUS_OK) {
00641                     chief->jobs_completed += 1;
00642                 } else {
00643                     chief->jobs_failed += 1;
00644                     /* destroy context? */
00645                 }
00646                 lock_basic_unlock(&chief->worker_lock);
00647 
00648                 if (worker_fulfilled(chief) && chief->sleeping) {
00649                     ods_log_debug("[%s[%i]] wake up chief[%u], work is done",
00650                         worker2str(worker->type), worker->thread_num,
00651                         chief->thread_num);
00652                     worker_wakeup(chief);
00653                     chief = NULL;
00654                 }
00655             }
00656             rrset = NULL;
00657         } else {
00658             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00659                 worker->thread_num);
00660             worker_wait(&worker->engine->signq->q_lock,
00661                 &worker->engine->signq->q_threshold);
00662         }
00663     }
00664     /* wake up chief */
00665     if (chief && chief->sleeping) {
00666         ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting",
00667             worker2str(worker->type), worker->thread_num, chief->thread_num);
00668          worker_wakeup(chief);
00669     }
00670 
00671     /* cleanup open HSM sessions */
00672     hsm_destroy_context(ctx);
00673     ctx = NULL;
00674     return;
00675 }
00676 
00677 
00682 void
00683 worker_start(worker_type* worker)
00684 {
00685     ods_log_assert(worker);
00686     switch (worker->type) {
00687         case WORKER_DRUDGER:
00688             worker_drudge(worker);
00689             break;
00690         case WORKER_WORKER:
00691             worker_work(worker);
00692             break;
00693         default:
00694             ods_log_error("[worker] illegal worker (id=%i)", worker->type);
00695             return;
00696     }
00697     return;
00698 }
00699 
00700 
00705 void
00706 worker_sleep(worker_type* worker, time_t timeout)
00707 {
00708     ods_log_assert(worker);
00709     lock_basic_lock(&worker->worker_lock);
00710     /* [LOCK] worker */
00711     worker->sleeping = 1;
00712     lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00713         timeout);
00714     /* [UNLOCK] worker */
00715     lock_basic_unlock(&worker->worker_lock);
00716     return;
00717 }
00718 
00719 
00724 void
00725 worker_sleep_unless(worker_type* worker, time_t timeout)
00726 {
00727     ods_log_assert(worker);
00728     lock_basic_lock(&worker->worker_lock);
00729     /* [LOCK] worker */
00730     while (!worker->need_to_exit && !worker_fulfilled(worker)) {
00731         worker->sleeping = 1;
00732         lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00733             timeout);
00734 
00735         ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
00736            "appointed, %u completed, %u failed", worker2str(worker->type),
00737            worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
00738            worker->jobs_failed);
00739     }
00740     /* [UNLOCK] worker */
00741     lock_basic_unlock(&worker->worker_lock);
00742     return;
00743 }
00744 
00745 
00750 void
00751 worker_wakeup(worker_type* worker)
00752 {
00753     ods_log_assert(worker);
00754     if (worker && worker->sleeping && !worker->waiting) {
00755         ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
00756            worker->thread_num);
00757         lock_basic_lock(&worker->worker_lock);
00758         /* [LOCK] worker */
00759         lock_basic_alarm(&worker->worker_alarm);
00760         worker->sleeping = 0;
00761         /* [UNLOCK] worker */
00762         lock_basic_unlock(&worker->worker_lock);
00763     }
00764     return;
00765 }
00766 
00767 
00772 void
00773 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
00774 {
00775     lock_basic_lock(lock);
00776     /* [LOCK] worker */
00777     lock_basic_sleep(condition, lock, 0);
00778     /* [UNLOCK] worker */
00779     lock_basic_unlock(lock);
00780     return;
00781 }
00782 
00783 
00788 void
00789 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
00790 {
00791     lock_basic_lock(lock);
00792     /* [LOCK] lock */
00793     lock_basic_alarm(condition);
00794     /* [UNLOCK] lock */
00795     lock_basic_unlock(lock);
00796     return;
00797 }
00798 
00799 
00804 void
00805 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
00806 {
00807     lock_basic_lock(lock);
00808     /* [LOCK] lock */
00809     lock_basic_broadcast(condition);
00810     /* [UNLOCK] lock */
00811     lock_basic_unlock(lock);
00812     return;
00813 }
00814 
00815 
00820 void
00821 worker_cleanup(worker_type* worker)
00822 {
00823     allocator_type* allocator;
00824     cond_basic_type worker_cond;
00825     lock_basic_type worker_lock;
00826 
00827     if (!worker) {
00828         return;
00829     }
00830     allocator = worker->allocator;
00831     worker_cond = worker->worker_alarm;
00832     worker_lock = worker->worker_lock;
00833 
00834     allocator_deallocate(allocator, (void*) worker);
00835     lock_basic_destroy(&worker_lock);
00836     lock_basic_off(&worker_cond);
00837     return;
00838 }

Generated on Sat Dec 17 2011 09:57:56 for OpenDNSSEC-signer by  doxygen 1.7.1