OpenDNSSEC-signer 1.3.0
|
00001 /* 00002 * $Id: worker.c 5320 2011-07-12 10:42:26Z jakob $ 00003 * 00004 * Copyright (c) 2009 NLNet Labs. All rights reserved. 00005 * 00006 * Redistribution and use in source and binary forms, with or without 00007 * modification, are permitted provided that the following conditions 00008 * are met: 00009 * 1. Redistributions of source code must retain the above copyright 00010 * notice, this list of conditions and the following disclaimer. 00011 * 2. Redistributions in binary form must reproduce the above copyright 00012 * notice, this list of conditions and the following disclaimer in the 00013 * documentation and/or other materials provided with the distribution. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 00016 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00017 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 00019 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00020 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 00021 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 00023 * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 00024 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN 00025 * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00026 * 00027 */ 00028 00034 #include "adapter/adapi.h" 00035 #include "daemon/engine.h" 00036 #include "daemon/worker.h" 00037 #include "shared/allocator.h" 00038 #include "scheduler/schedule.h" 00039 #include "scheduler/task.h" 00040 #include "shared/locks.h" 00041 #include "shared/log.h" 00042 #include "shared/status.h" 00043 #include "shared/util.h" 00044 #include "signer/tools.h" 00045 #include "signer/zone.h" 00046 #include "signer/zonedata.h" 00047 00048 #include <time.h> /* time() */ 00049 00050 ods_lookup_table worker_str[] = { 00051 { WORKER_WORKER, "worker" }, 00052 { WORKER_DRUDGER, "drudger" }, 00053 { 0, NULL } 00054 }; 00055 00056 00061 worker_type* 00062 worker_create(allocator_type* allocator, int num, worker_id type) 00063 { 00064 worker_type* worker; 00065 00066 if (!allocator) { 00067 return NULL; 00068 } 00069 ods_log_assert(allocator); 00070 00071 worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type)); 00072 if (!worker) { 00073 return NULL; 00074 } 00075 00076 ods_log_debug("create worker[%i]", num +1); 00077 lock_basic_init(&worker->worker_lock); 00078 lock_basic_set(&worker->worker_alarm); 00079 lock_basic_lock(&worker->worker_lock); 00080 worker->allocator = allocator; 00081 worker->thread_num = num +1; 00082 worker->engine = NULL; 00083 worker->task = NULL; 00084 worker->working_with = TASK_NONE; 00085 worker->need_to_exit = 0; 00086 worker->type = type; 00087 worker->clock_in = 0; 00088 worker->jobs_appointed = 0; 00089 worker->jobs_completed = 0; 00090 worker->jobs_failed = 0; 00091 worker->sleeping = 0; 00092 worker->waiting = 0; 00093 lock_basic_unlock(&worker->worker_lock); 00094 return worker; 00095 } 00096 00097 00102 static const char* 00103 worker2str(worker_id type) 00104 { 00105 ods_lookup_table *lt = ods_lookup_by_id(worker_str, type); 00106 if (lt) { 00107 return lt->name; 00108 } 00109 return NULL; 00110 } 00111 00112 00117 static int 00118 worker_fulfilled(worker_type* worker) 00119 { 00120 return (worker->jobs_completed + worker->jobs_failed) == 00121 worker->jobs_appointed; 00122 } 00123 00124 00129 static void 00130 worker_perform_task(worker_type* worker) 00131 { 00132 engine_type* engine = NULL; 00133 zone_type* zone = NULL; 00134 task_type* task = NULL; 00135 task_id what = TASK_NONE; 00136 time_t when = 0; 00137 time_t never = (3600*24*365); 00138 ods_status status = ODS_STATUS_OK; 00139 int fallthrough = 0; 00140 int backup = 0; 00141 char* working_dir = NULL; 00142 char* cfg_filename = NULL; 00143 uint32_t tmpserial = 0; 00144 time_t start = 0; 00145 time_t end = 0; 00146 00147 /* sanity checking */ 00148 if (!worker || !worker->task || !worker->task->zone || !worker->engine) { 00149 return; 00150 } 00151 ods_log_assert(worker); 00152 ods_log_assert(worker->task); 00153 ods_log_assert(worker->task->zone); 00154 00155 engine = (engine_type*) worker->engine; 00156 task = (task_type*) worker->task; 00157 zone = (zone_type*) worker->task->zone; 00158 ods_log_debug("[%s[%i]] perform task %s for zone %s at %u", 00159 worker2str(worker->type), worker->thread_num, task_what2str(task->what), 00160 task_who2str(task->who), (uint32_t) worker->clock_in); 00161 00162 /* do what you have been told to do */ 00163 switch (task->what) { 00164 case TASK_SIGNCONF: 00165 worker->working_with = TASK_SIGNCONF; 00166 /* perform 'load signconf' task */ 00167 ods_log_verbose("[%s[%i]] load signconf for zone %s", 00168 worker2str(worker->type), worker->thread_num, 00169 task_who2str(task->who)); 00170 status = zone_load_signconf(zone, &what); 00171 00172 /* what to do next */ 00173 when = time_now(); 00174 if (status == ODS_STATUS_UNCHANGED) { 00175 if (task->halted != TASK_NONE) { 00176 goto task_perform_continue; 00177 } else { 00178 status = ODS_STATUS_OK; 00179 } 00180 } 00181 00182 if (status == ODS_STATUS_OK) { 00183 status = zone_publish_dnskeys(zone, 0); 00184 } 00185 if (status == ODS_STATUS_OK) { 00186 status = zone_prepare_nsec3(zone, 0); 00187 } 00188 if (status == ODS_STATUS_OK) { 00189 status = zonedata_commit(zone->zonedata); 00190 } 00191 00192 if (status == ODS_STATUS_OK) { 00193 task->interrupt = TASK_NONE; 00194 task->halted = TASK_NONE; 00195 } else { 00196 if (task->halted == TASK_NONE) { 00197 goto task_perform_fail; 00198 } 00199 goto task_perform_continue; 00200 } 00201 fallthrough = 0; 00202 break; 00203 case TASK_READ: 00204 worker->working_with = TASK_READ; 00205 /* perform 'read input adapter' task */ 00206 ods_log_verbose("[%s[%i]] read zone %s", 00207 worker2str(worker->type), worker->thread_num, 00208 task_who2str(task->who)); 00209 status = tools_input(zone); 00210 00211 /* what to do next */ 00212 what = TASK_NSECIFY; 00213 when = time_now(); 00214 if (status != ODS_STATUS_OK) { 00215 if (task->halted == TASK_NONE) { 00216 goto task_perform_fail; 00217 } 00218 goto task_perform_continue; 00219 } 00220 fallthrough = 1; 00221 case TASK_NSECIFY: 00222 worker->working_with = TASK_NSECIFY; 00223 ods_log_verbose("[%s[%i]] nsecify zone %s", 00224 worker2str(worker->type), worker->thread_num, 00225 task_who2str(task->who)); 00226 status = tools_nsecify(zone); 00227 00228 /* what to do next */ 00229 what = TASK_SIGN; 00230 when = time_now(); 00231 if (status == ODS_STATUS_OK) { 00232 if (task->interrupt > TASK_SIGNCONF) { 00233 task->interrupt = TASK_NONE; 00234 task->halted = TASK_NONE; 00235 } 00236 } else { 00237 if (task->halted == TASK_NONE) { 00238 goto task_perform_fail; 00239 } 00240 goto task_perform_continue; 00241 } 00242 fallthrough = 1; 00243 case TASK_SIGN: 00244 worker->working_with = TASK_SIGN; 00245 ods_log_verbose("[%s[%i]] sign zone %s", 00246 worker2str(worker->type), worker->thread_num, 00247 task_who2str(task->who)); 00248 tmpserial = zone->zonedata->internal_serial; 00249 status = zone_update_serial(zone); 00250 if (status != ODS_STATUS_OK) { 00251 ods_log_error("[%s[%i]] unable to sign zone %s: " 00252 "failed to increment serial", 00253 worker2str(worker->type), worker->thread_num, 00254 task_who2str(task->who)); 00255 } else { 00256 /* start timer */ 00257 start = time(NULL); 00258 if (zone->stats) { 00259 lock_basic_lock(&zone->stats->stats_lock); 00260 if (!zone->stats->start_time) { 00261 zone->stats->start_time = start; 00262 } 00263 zone->stats->sig_count = 0; 00264 zone->stats->sig_soa_count = 0; 00265 zone->stats->sig_reuse = 0; 00266 zone->stats->sig_time = 0; 00267 lock_basic_unlock(&zone->stats->stats_lock); 00268 } 00269 00270 /* queue menial, hard signing work */ 00271 status = zonedata_queue(zone->zonedata, engine->signq, worker); 00272 ods_log_debug("[%s[%i]] wait until drudgers are finished " 00273 " signing zone %s", worker2str(worker->type), 00274 worker->thread_num, task_who2str(task->who)); 00275 00276 /* sleep until work is done */ 00277 worker_sleep_unless(worker, 0); 00278 if (worker->jobs_failed > 0) { 00279 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u " 00280 "signatures failed", worker2str(worker->type), 00281 worker->thread_num, task_who2str(task->who), 00282 worker->jobs_failed, worker->jobs_appointed); 00283 status = ODS_STATUS_ERR; 00284 } 00285 worker->jobs_appointed = 0; 00286 worker->jobs_completed = 0; 00287 worker->jobs_failed = 0; 00288 00289 /* stop timer */ 00290 end = time(NULL); 00291 if (status == ODS_STATUS_OK && zone->stats) { 00292 lock_basic_lock(&zone->stats->stats_lock); 00293 zone->stats->sig_time = (end-start); 00294 lock_basic_unlock(&zone->stats->stats_lock); 00295 } 00296 } 00297 00298 /* what to do next */ 00299 if (status != ODS_STATUS_OK) { 00300 /* rollback serial */ 00301 zone->zonedata->internal_serial = tmpserial; 00302 if (task->halted == TASK_NONE) { 00303 goto task_perform_fail; 00304 } 00305 goto task_perform_continue; 00306 } else { 00307 if (task->interrupt > TASK_SIGNCONF) { 00308 task->interrupt = TASK_NONE; 00309 task->halted = TASK_NONE; 00310 } 00311 } 00312 what = TASK_AUDIT; 00313 when = time_now(); 00314 fallthrough = 1; 00315 case TASK_AUDIT: 00316 worker->working_with = TASK_AUDIT; 00317 if (zone->signconf->audit) { 00318 ods_log_verbose("[%s[%i]] audit zone %s", 00319 worker2str(worker->type), worker->thread_num, 00320 task_who2str(task->who)); 00321 working_dir = strdup(engine->config->working_dir); 00322 cfg_filename = strdup(engine->config->cfg_filename); 00323 status = tools_audit(zone, working_dir, cfg_filename); 00324 if (working_dir) { free((void*)working_dir); } 00325 if (cfg_filename) { free((void*)cfg_filename); } 00326 working_dir = NULL; 00327 cfg_filename = NULL; 00328 } else { 00329 status = ODS_STATUS_OK; 00330 } 00331 00332 /* what to do next */ 00333 if (status != ODS_STATUS_OK) { 00334 if (task->halted == TASK_NONE) { 00335 goto task_perform_fail; 00336 } 00337 goto task_perform_continue; 00338 } 00339 what = TASK_WRITE; 00340 when = time_now(); 00341 fallthrough = 1; 00342 case TASK_WRITE: 00343 worker->working_with = TASK_WRITE; 00344 ods_log_verbose("[%s[%i]] write zone %s", 00345 worker2str(worker->type), worker->thread_num, 00346 task_who2str(task->who)); 00347 00348 status = tools_output(zone); 00349 zone->processed = 1; 00350 00351 /* what to do next */ 00352 if (status != ODS_STATUS_OK) { 00353 if (task->halted == TASK_NONE) { 00354 goto task_perform_fail; 00355 } 00356 goto task_perform_continue; 00357 } else { 00358 if (task->interrupt > TASK_SIGNCONF) { 00359 task->interrupt = TASK_NONE; 00360 task->halted = TASK_NONE; 00361 } 00362 } 00363 if (duration2time(zone->signconf->sig_resign_interval)) { 00364 what = TASK_SIGN; 00365 when = time_now() + 00366 duration2time(zone->signconf->sig_resign_interval); 00367 } else { 00368 what = TASK_NONE; 00369 when = time_now() + never; 00370 } 00371 backup = 1; 00372 fallthrough = 0; 00373 break; 00374 case TASK_NONE: 00375 worker->working_with = TASK_NONE; 00376 ods_log_warning("[%s[%i]] none task for zone %s", 00377 worker2str(worker->type), worker->thread_num, 00378 task_who2str(task->who)); 00379 when = time_now() + never; 00380 fallthrough = 0; 00381 break; 00382 default: 00383 ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s", 00384 worker2str(worker->type), worker->thread_num, 00385 task_who2str(task->who)); 00386 what = TASK_SIGNCONF; 00387 when = time_now(); 00388 fallthrough = 0; 00389 break; 00390 } 00391 00392 /* no error, reset backoff */ 00393 task->backoff = 0; 00394 00395 /* set next task */ 00396 if (fallthrough == 0 && task->interrupt != TASK_NONE && 00397 task->interrupt != what) { 00398 ods_log_debug("[%s[%i]] interrupt task %s for zone %s", 00399 worker2str(worker->type), worker->thread_num, 00400 task_what2str(what), task_who2str(task->who)); 00401 00402 task->what = task->interrupt; 00403 task->when = time_now(); 00404 task->halted = what; 00405 } else { 00406 ods_log_debug("[%s[%i]] next task %s for zone %s", 00407 worker2str(worker->type), worker->thread_num, 00408 task_what2str(what), task_who2str(task->who)); 00409 00410 task->what = what; 00411 task->when = when; 00412 if (!fallthrough) { 00413 task->interrupt = TASK_NONE; 00414 task->halted = TASK_NONE; 00415 } 00416 } 00417 00418 /* backup the last successful run */ 00419 if (backup) { 00420 status = zone_backup(zone); 00421 if (status != ODS_STATUS_OK) { 00422 ods_log_warning("[%s[%i]] unable to backup zone %s: %s", 00423 worker2str(worker->type), worker->thread_num, 00424 task_who2str(task->who), ods_status2str(status)); 00425 /* just a warning */ 00426 status = ODS_STATUS_OK; 00427 } 00428 backup = 0; 00429 } 00430 return; 00431 00432 task_perform_fail: 00433 /* in case of failure, also mark zone processed (for single run usage) */ 00434 zone->processed = 1; 00435 00436 if (task->backoff) { 00437 task->backoff *= 2; 00438 if (task->backoff > ODS_SE_MAX_BACKOFF) { 00439 task->backoff = ODS_SE_MAX_BACKOFF; 00440 } 00441 } else { 00442 task->backoff = 60; 00443 } 00444 ods_log_error("[%s[%i]] backoff task %s for zone %s with %u seconds", 00445 worker2str(worker->type), worker->thread_num, 00446 task_what2str(task->what), task_who2str(task->who), task->backoff); 00447 00448 task->when = time_now() + task->backoff; 00449 return; 00450 00451 task_perform_continue: 00452 ods_log_info("[%s[%i]] continue task %s for zone %s", 00453 worker2str(worker->type), worker->thread_num, 00454 task_what2str(task->halted), task_who2str(task->who)); 00455 00456 what = task->halted; 00457 task->what = what; 00458 task->when = time_now(); 00459 task->interrupt = TASK_NONE; 00460 task->halted = TASK_NONE; 00461 if (zone->processed) { 00462 task->when += duration2time(zone->signconf->sig_resign_interval); 00463 } 00464 return; 00465 } 00466 00467 00472 static void 00473 worker_work(worker_type* worker) 00474 { 00475 time_t now, timeout = 1; 00476 zone_type* zone = NULL; 00477 ods_status status = ODS_STATUS_OK; 00478 00479 ods_log_assert(worker); 00480 ods_log_assert(worker->type == WORKER_WORKER); 00481 00482 while (worker->need_to_exit == 0) { 00483 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00484 worker->thread_num); 00485 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00486 /* [LOCK] schedule */ 00487 worker->task = schedule_pop_task(worker->engine->taskq); 00488 /* [UNLOCK] schedule */ 00489 if (worker->task) { 00490 worker->working_with = worker->task->what; 00491 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00492 00493 zone = worker->task->zone; 00494 lock_basic_lock(&zone->zone_lock); 00495 /* [LOCK] zone */ 00496 ods_log_debug("[%s[%i]] start working on zone %s", 00497 worker2str(worker->type), worker->thread_num, zone->name); 00498 00499 worker->clock_in = time(NULL); 00500 worker_perform_task(worker); 00501 00502 zone->task = worker->task; 00503 00504 ods_log_debug("[%s[%i]] finished working on zone %s", 00505 worker2str(worker->type), worker->thread_num, zone->name); 00506 /* [UNLOCK] zone */ 00507 00508 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00509 /* [LOCK] zone, schedule */ 00510 worker->task = NULL; 00511 worker->working_with = TASK_NONE; 00512 status = schedule_task(worker->engine->taskq, zone->task, 1); 00513 /* [UNLOCK] zone, schedule */ 00514 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00515 lock_basic_unlock(&zone->zone_lock); 00516 00517 timeout = 1; 00518 } else { 00519 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00520 worker->thread_num); 00521 00522 /* [LOCK] schedule */ 00523 worker->task = schedule_get_first_task(worker->engine->taskq); 00524 /* [UNLOCK] schedule */ 00525 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00526 00527 now = time_now(); 00528 if (worker->task && !worker->engine->taskq->loading) { 00529 timeout = (worker->task->when - now); 00530 } else { 00531 timeout *= 2; 00532 if (timeout > ODS_SE_MAX_BACKOFF) { 00533 timeout = ODS_SE_MAX_BACKOFF; 00534 } 00535 } 00536 worker->task = NULL; 00537 worker_sleep(worker, timeout); 00538 } 00539 } 00540 return; 00541 } 00542 00543 00548 static void 00549 worker_drudge(worker_type* worker) 00550 { 00551 zone_type* zone = NULL; 00552 task_type* task = NULL; 00553 rrset_type* rrset = NULL; 00554 ods_status status = ODS_STATUS_OK; 00555 worker_type* chief = NULL; 00556 hsm_ctx_t* ctx = NULL; 00557 00558 ods_log_assert(worker); 00559 ods_log_assert(worker->type == WORKER_DRUDGER); 00560 00561 ctx = hsm_create_context(); 00562 if (ctx == NULL) { 00563 ods_log_error("[%s[%i]] unable to drudge: error " 00564 "creating libhsm context", worker2str(worker->type), 00565 worker->thread_num); 00566 } 00567 00568 while (worker->need_to_exit == 0) { 00569 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00570 worker->thread_num); 00571 00572 lock_basic_lock(&worker->engine->signq->q_lock); 00573 /* [LOCK] schedule */ 00574 rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief); 00575 /* [UNLOCK] schedule */ 00576 lock_basic_unlock(&worker->engine->signq->q_lock); 00577 if (rrset) { 00578 /* set up the work */ 00579 if (chief) { 00580 task = chief->task; 00581 } 00582 if (task) { 00583 zone = task->zone; 00584 } 00585 if (!zone) { 00586 ods_log_error("[%s[%i]] unable to drudge: no zone reference", 00587 worker2str(worker->type), worker->thread_num); 00588 } 00589 if (zone && ctx) { 00590 ods_log_assert(rrset); 00591 ods_log_assert(zone); 00592 ods_log_assert(zone->dname); 00593 ods_log_assert(zone->signconf); 00594 ods_log_assert(ctx); 00595 00596 worker->clock_in = time(NULL); 00597 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf, 00598 chief->clock_in, zone->stats); 00599 } else { 00600 status = ODS_STATUS_ASSERT_ERR; 00601 } 00602 00603 if (chief) { 00604 lock_basic_lock(&chief->worker_lock); 00605 if (status == ODS_STATUS_OK) { 00606 chief->jobs_completed += 1; 00607 } else { 00608 chief->jobs_failed += 1; 00609 /* destroy context? */ 00610 } 00611 lock_basic_unlock(&chief->worker_lock); 00612 00613 if (worker_fulfilled(chief) && chief->sleeping) { 00614 worker_wakeup(chief); 00615 } 00616 } 00617 rrset = NULL; 00618 zone = NULL; 00619 task = NULL; 00620 chief = NULL; 00621 } else { 00622 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00623 worker->thread_num); 00624 worker_wait(&worker->engine->signq->q_lock, 00625 &worker->engine->signq->q_threshold); 00626 } 00627 } 00628 00629 /* cleanup open HSM sessions */ 00630 hsm_destroy_context(ctx); 00631 ctx = NULL; 00632 return; 00633 } 00634 00635 00640 void 00641 worker_start(worker_type* worker) 00642 { 00643 ods_log_assert(worker); 00644 switch (worker->type) { 00645 case WORKER_DRUDGER: 00646 worker_drudge(worker); 00647 break; 00648 case WORKER_WORKER: 00649 worker_work(worker); 00650 break; 00651 default: 00652 ods_log_error("[worker] illegal worker (id=%i)", worker->type); 00653 return; 00654 } 00655 return; 00656 } 00657 00658 00663 void 00664 worker_sleep(worker_type* worker, time_t timeout) 00665 { 00666 ods_log_assert(worker); 00667 lock_basic_lock(&worker->worker_lock); 00668 /* [LOCK] worker */ 00669 worker->sleeping = 1; 00670 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00671 timeout); 00672 /* [UNLOCK] worker */ 00673 lock_basic_unlock(&worker->worker_lock); 00674 return; 00675 } 00676 00677 00682 void 00683 worker_sleep_unless(worker_type* worker, time_t timeout) 00684 { 00685 ods_log_assert(worker); 00686 lock_basic_lock(&worker->worker_lock); 00687 /* [LOCK] worker */ 00688 if (!worker_fulfilled(worker)) { 00689 worker->sleeping = 1; 00690 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00691 timeout); 00692 } 00693 /* [UNLOCK] worker */ 00694 lock_basic_unlock(&worker->worker_lock); 00695 return; 00696 } 00697 00698 00703 void 00704 worker_wakeup(worker_type* worker) 00705 { 00706 ods_log_assert(worker); 00707 if (worker && worker->sleeping && !worker->waiting) { 00708 ods_log_debug("[%s[%i]] wake up", worker2str(worker->type), 00709 worker->thread_num); 00710 lock_basic_lock(&worker->worker_lock); 00711 /* [LOCK] worker */ 00712 lock_basic_alarm(&worker->worker_alarm); 00713 worker->sleeping = 0; 00714 /* [UNLOCK] worker */ 00715 lock_basic_unlock(&worker->worker_lock); 00716 } 00717 return; 00718 } 00719 00720 00725 void 00726 worker_wait(lock_basic_type* lock, cond_basic_type* condition) 00727 { 00728 lock_basic_lock(lock); 00729 /* [LOCK] worker */ 00730 lock_basic_sleep(condition, lock, 0); 00731 /* [UNLOCK] worker */ 00732 lock_basic_unlock(lock); 00733 return; 00734 } 00735 00736 00741 void 00742 worker_notify(lock_basic_type* lock, cond_basic_type* condition) 00743 { 00744 lock_basic_lock(lock); 00745 /* [LOCK] lock */ 00746 lock_basic_alarm(condition); 00747 /* [UNLOCK] lock */ 00748 lock_basic_unlock(lock); 00749 return; 00750 } 00751 00752 00757 void 00758 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition) 00759 { 00760 lock_basic_lock(lock); 00761 /* [LOCK] lock */ 00762 lock_basic_broadcast(condition); 00763 /* [UNLOCK] lock */ 00764 lock_basic_unlock(lock); 00765 return; 00766 } 00767 00768 00773 void 00774 worker_cleanup(worker_type* worker) 00775 { 00776 allocator_type* allocator; 00777 cond_basic_type worker_cond; 00778 lock_basic_type worker_lock; 00779 00780 if (!worker) { 00781 return; 00782 } 00783 allocator = worker->allocator; 00784 worker_cond = worker->worker_alarm; 00785 worker_lock = worker->worker_lock; 00786 00787 allocator_deallocate(allocator, (void*) worker); 00788 lock_basic_destroy(&worker_lock); 00789 lock_basic_off(&worker_cond); 00790 return; 00791 }