OpenDNSSEC-signer  1.3.14
worker.c
Go to the documentation of this file.
1 /*
2  * $Id: worker.c 7124 2013-05-03 09:49:26Z matthijs $
3  *
4  * Copyright (c) 2009 NLNet Labs. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
21  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
23  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
25  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  *
27  */
28 
34 #include "adapter/adapi.h"
35 #include "daemon/engine.h"
36 #include "daemon/worker.h"
37 #include "shared/allocator.h"
38 #include "scheduler/schedule.h"
39 #include "scheduler/task.h"
40 #include "shared/hsm.h"
41 #include "shared/locks.h"
42 #include "shared/log.h"
43 #include "shared/status.h"
44 #include "shared/util.h"
45 #include "signer/tools.h"
46 #include "signer/zone.h"
47 #include "signer/zonedata.h"
48 
49 #include <time.h> /* time() */
50 
52  { WORKER_WORKER, "worker" },
53  { WORKER_DRUDGER, "drudger" },
54  { 0, NULL }
55 };
56 
57 
64 {
65  worker_type* worker;
66 
67  if (!allocator) {
68  return NULL;
69  }
70  ods_log_assert(allocator);
71 
72  worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
73  if (!worker) {
74  return NULL;
75  }
76 
77  ods_log_debug("create worker[%i]", num +1);
78  lock_basic_init(&worker->worker_lock);
79  lock_basic_set(&worker->worker_alarm);
80  lock_basic_lock(&worker->worker_lock);
82  worker->allocator = allocator;
83  worker->thread_num = num +1;
84  worker->engine = NULL;
85  worker->task = NULL;
86  worker->working_with = TASK_NONE;
87  worker->need_to_exit = 0;
88  worker->type = type;
89  worker->clock_in = 0;
90  worker->jobs_appointed = 0;
91  worker->jobs_completed = 0;
92  worker->jobs_failed = 0;
93  worker->sleeping = 0;
94  worker->waiting = 0;
96  worker->worker_locked = 0;
97  return worker;
98 }
99 
100 
105 static const char*
106 worker2str(worker_id type)
107 {
108  ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
109  if (lt) {
110  return lt->name;
111  }
112  return NULL;
113 }
114 
115 
120 static int
121 worker_fulfilled(worker_type* worker)
122 {
123  return (worker->jobs_completed + worker->jobs_failed) ==
124  worker->jobs_appointed;
125 }
126 
127 
132 static void
133 worker_perform_task(worker_type* worker)
134 {
135  engine_type* engine = NULL;
136  zone_type* zone = NULL;
137  task_type* task = NULL;
138  task_id what = TASK_NONE;
139  time_t when = 0;
140  time_t never = (3600*24*365);
141  ods_status status = ODS_STATUS_OK;
142  int fallthrough = 0;
143  int backup = 0;
144  char* working_dir = NULL;
145  char* cfg_filename = NULL;
146  uint32_t tmpserial = 0;
147  time_t start = 0;
148  time_t end = 0;
149 
150  /* sanity checking */
151  if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
152  return;
153  }
154  ods_log_assert(worker);
155  ods_log_assert(worker->task);
156  ods_log_assert(worker->task->zone);
157 
158  engine = (engine_type*) worker->engine;
159  task = (task_type*) worker->task;
160  zone = (zone_type*) worker->task->zone;
161  ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
162  worker2str(worker->type), worker->thread_num, task_what2str(task->what),
163  task_who2str(task->who), (uint32_t) worker->clock_in);
164 
165  /* do what you have been told to do */
166  switch (task->what) {
167  case TASK_SIGNCONF:
168  worker->working_with = TASK_SIGNCONF;
169  /* perform 'load signconf' task */
170  ods_log_verbose("[%s[%i]] load signconf for zone %s",
171  worker2str(worker->type), worker->thread_num,
172  task_who2str(task->who));
173  status = zone_load_signconf(zone, &what);
174  if (status == ODS_STATUS_UNCHANGED) {
175  if (!zone->signconf->last_modified) {
176  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
177  worker2str(worker->type), worker->thread_num,
178  task_who2str(task->who));
179  status = ODS_STATUS_ERR;
180  }
181  }
182 
183  /* what to do next */
184  when = time_now();
185  if (status == ODS_STATUS_UNCHANGED) {
186  if (task->halted != TASK_NONE && task->halted != TASK_SIGNCONF) {
187  goto task_perform_continue;
188  } else {
189  status = ODS_STATUS_OK;
190  }
191  }
192 
193  if (status == ODS_STATUS_OK) {
198  lhsm_check_connection((void*)engine);
199  status = zone_publish_dnskeys(zone, 0);
200  }
201  if (status == ODS_STATUS_OK) {
202  status = zone_prepare_nsec3(zone, 0);
203  }
204  if (status == ODS_STATUS_OK) {
205  status = zonedata_commit(zone->zonedata);
206  }
207 
208  if (status == ODS_STATUS_OK) {
209  zone->prepared = 1;
210  task->interrupt = TASK_NONE;
211  task->halted = TASK_NONE;
212  } else {
213  if (task->halted == TASK_NONE) {
214  goto task_perform_fail;
215  }
216  goto task_perform_continue;
217  }
218  fallthrough = 0;
219  break;
220  case TASK_READ:
221  worker->working_with = TASK_READ;
222  /* perform 'read input adapter' task */
223  ods_log_verbose("[%s[%i]] read zone %s",
224  worker2str(worker->type), worker->thread_num,
225  task_who2str(task->who));
226  if (!zone->prepared) {
227  ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet",
228  worker2str(worker->type), worker->thread_num,
229  task_who2str(task->who));
230  status = ODS_STATUS_ERR;
231  } else {
232  status = tools_input(zone);
233  }
234 
235  /* what to do next */
236  what = TASK_NSECIFY;
237  when = time_now();
238  if (status != ODS_STATUS_OK) {
239  if (task->halted == TASK_NONE) {
240  goto task_perform_fail;
241  }
242  goto task_perform_continue;
243  }
244  fallthrough = 1;
245  case TASK_NSECIFY:
246  worker->working_with = TASK_NSECIFY;
247  ods_log_verbose("[%s[%i]] nsecify zone %s",
248  worker2str(worker->type), worker->thread_num,
249  task_who2str(task->who));
250  status = tools_nsecify(zone);
251 
252  /* what to do next */
253  what = TASK_SIGN;
254  when = time_now();
255  if (status == ODS_STATUS_OK) {
256  if (task->interrupt > TASK_SIGNCONF) {
257  task->interrupt = TASK_NONE;
258  task->halted = TASK_NONE;
259  }
260  } else {
261  if (task->halted == TASK_NONE) {
262  goto task_perform_fail;
263  }
264  goto task_perform_continue;
265  }
266  fallthrough = 1;
267  case TASK_SIGN:
268  worker->working_with = TASK_SIGN;
269  ods_log_verbose("[%s[%i]] sign zone %s",
270  worker2str(worker->type), worker->thread_num,
271  task_who2str(task->who));
272  tmpserial = zone->zonedata->internal_serial;
273  status = zone_update_serial(zone);
274  if (status != ODS_STATUS_OK) {
275  ods_log_error("[%s[%i]] unable to sign zone %s: "
276  "failed to increment serial",
277  worker2str(worker->type), worker->thread_num,
278  task_who2str(task->who));
279  } else {
280  /* start timer */
281  start = time(NULL);
282  if (zone->stats) {
285  if (!zone->stats->start_time) {
286  zone->stats->start_time = start;
287  }
288  zone->stats->sig_count = 0;
289  zone->stats->sig_soa_count = 0;
290  zone->stats->sig_reuse = 0;
291  zone->stats->sig_time = 0;
293  zone->stats->stats_locked = 0;
294  }
295  /* check the HSM connection before queuing sign operations */
296  lhsm_check_connection((void*)engine);
297  /* queue menial, hard signing work */
298  status = zonedata_queue(zone->zonedata, engine->signq, worker);
299  ods_log_debug("[%s[%i]] wait until drudgers are finished "
300  "signing zone %s, %u signatures queued",
301  worker2str(worker->type), worker->thread_num,
302  task_who2str(task->who), worker->jobs_appointed);
303  /* sleep until work is done */
304  if (!worker->need_to_exit) {
305  worker_sleep_unless(worker, 0);
306  }
307  lock_basic_lock(&worker->worker_lock);
309  if (worker->jobs_failed) {
310  ods_log_error("[%s[%i]] sign zone %s failed: %u "
311  "RRsets failed", worker2str(worker->type),
312  worker->thread_num, task_who2str(task->who),
313  worker->jobs_failed);
314  status = ODS_STATUS_ERR;
315  } else if (!worker_fulfilled(worker)) {
316  ods_log_error("[%s[%i]] sign zone %s failed: processed "
317  "%u of %u RRsets", worker2str(worker->type),
318  worker->thread_num, task_who2str(task->who),
319  worker->jobs_completed, worker->jobs_appointed);
320  status = ODS_STATUS_ERR;
321  } else if (worker->need_to_exit) {
322  ods_log_warning("[%s[%i]] sign zone %s failed: worker "
323  "needs to exit", worker2str(worker->type),
324  worker->thread_num, task_who2str(task->who));
325  status = ODS_STATUS_ERR;
326  } else {
327  ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u "
328  "RRsets succeeded", worker2str(worker->type),
329  worker->thread_num, task_who2str(task->who),
330  worker->jobs_completed, worker->jobs_appointed);
331  ods_log_assert(worker->jobs_appointed ==
332  worker->jobs_completed);
333  }
334  worker->jobs_appointed = 0;
335  worker->jobs_completed = 0;
336  worker->jobs_failed = 0;
337  lock_basic_unlock(&worker->worker_lock);
338  worker->worker_locked = 0;
339  /* stop timer */
340  end = time(NULL);
341  if (status == ODS_STATUS_OK && zone->stats) {
344  zone->stats->sig_time = (end-start);
346  zone->stats->stats_locked = 0;
347  }
348  }
349 
350  /* what to do next */
351  if (status != ODS_STATUS_OK) {
352  /* rollback serial */
353  zone->zonedata->internal_serial = tmpserial;
354  if (task->halted == TASK_NONE) {
355  goto task_perform_fail;
356  }
357  goto task_perform_continue;
358  } else {
359  if (task->interrupt > TASK_SIGNCONF) {
360  task->interrupt = TASK_NONE;
361  task->halted = TASK_NONE;
362  }
363  }
364  what = TASK_AUDIT;
365  when = time_now();
366  fallthrough = 1;
367  case TASK_AUDIT:
368  worker->working_with = TASK_AUDIT;
369  if (zone->signconf->audit) {
370  ods_log_verbose("[%s[%i]] audit zone %s",
371  worker2str(worker->type), worker->thread_num,
372  task_who2str(task->who));
373  working_dir = strdup(engine->config->working_dir);
374  cfg_filename = strdup(engine->config->cfg_filename);
375  status = tools_audit(zone, working_dir, cfg_filename);
376  if (working_dir) { free((void*)working_dir); }
377  if (cfg_filename) { free((void*)cfg_filename); }
378  working_dir = NULL;
379  cfg_filename = NULL;
380  } else {
381  status = ODS_STATUS_OK;
382  }
383 
384  /* what to do next */
385  if (status != ODS_STATUS_OK) {
386  if (task->halted == TASK_NONE) {
387  goto task_perform_fail;
388  }
389  goto task_perform_continue;
390  }
391  what = TASK_WRITE;
392  when = time_now();
393  fallthrough = 1;
394  case TASK_WRITE:
395  worker->working_with = TASK_WRITE;
396  ods_log_verbose("[%s[%i]] write zone %s",
397  worker2str(worker->type), worker->thread_num,
398  task_who2str(task->who));
399 
400  status = tools_output(zone);
401  zone->processed = 1;
402 
403  /* what to do next */
404  if (status != ODS_STATUS_OK) {
405  if (task->halted == TASK_NONE) {
406  goto task_perform_fail;
407  }
408  goto task_perform_continue;
409  } else {
410  if (task->interrupt > TASK_SIGNCONF) {
411  task->interrupt = TASK_NONE;
412  task->halted = TASK_NONE;
413  }
414  }
416  what = TASK_SIGN;
417  when = time_now() +
419  } else {
420  what = TASK_NONE;
421  when = time_now() + never;
422  }
423  backup = 1;
424  fallthrough = 0;
425  break;
426  case TASK_NONE:
427  worker->working_with = TASK_NONE;
428  ods_log_warning("[%s[%i]] none task for zone %s",
429  worker2str(worker->type), worker->thread_num,
430  task_who2str(task->who));
431  when = time_now() + never;
432  fallthrough = 0;
433  break;
434  default:
435  ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
436  worker2str(worker->type), worker->thread_num,
437  task_who2str(task->who));
438  what = TASK_SIGNCONF;
439  when = time_now();
440  fallthrough = 0;
441  break;
442  }
443 
444  /* no error, reset backoff */
445  task->backoff = 0;
446 
447  /* set next task */
448  if (fallthrough == 0 && task->interrupt != TASK_NONE &&
449  task->interrupt != what) {
450  ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
451  worker2str(worker->type), worker->thread_num,
452  task_what2str(what), task_who2str(task->who));
453 
454  task->what = task->interrupt;
455  task->when = time_now();
456  task->halted = what;
457  } else {
458  ods_log_debug("[%s[%i]] next task %s for zone %s",
459  worker2str(worker->type), worker->thread_num,
460  task_what2str(what), task_who2str(task->who));
461 
462  task->what = what;
463  task->when = when;
464  if (!fallthrough) {
465  task->interrupt = TASK_NONE;
466  task->halted = TASK_NONE;
467  }
468  }
469 
470  /* backup the last successful run */
471  if (backup) {
472  status = zone_backup(zone);
473  if (status != ODS_STATUS_OK) {
474  ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
475  worker2str(worker->type), worker->thread_num,
476  task_who2str(task->who), ods_status2str(status));
477  /* just a warning */
478  status = ODS_STATUS_OK;
479  }
480  backup = 0;
481  }
482  return;
483 
484 task_perform_fail:
485  /* in case of failure, also mark zone processed (for single run usage) */
486  zone->processed = 1;
487 
488  if (task->backoff) {
489  task->backoff *= 2;
490  } else {
491  task->backoff = 60;
492  }
493  if (task->backoff > ODS_SE_MAX_BACKOFF) {
494  task->backoff = ODS_SE_MAX_BACKOFF;
495  }
496  ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
497  worker2str(worker->type), worker->thread_num,
498  task_what2str(task->what), task_who2str(task->who), task->backoff);
499 
500  task->when = time_now() + task->backoff;
501  return;
502 
503 task_perform_continue:
504  ods_log_info("[%s[%i]] continue task %s for zone %s",
505  worker2str(worker->type), worker->thread_num,
506  task_what2str(task->halted), task_who2str(task->who));
507 
508  what = task->halted;
509  task->what = what;
510  task->when = time_now();
511  task->interrupt = TASK_NONE;
512  task->halted = TASK_NONE;
513  if (zone->processed) {
515  }
516  return;
517 }
518 
519 
524 static void
525 worker_work(worker_type* worker)
526 {
527  time_t now, timeout = 1;
528  zone_type* zone = NULL;
529 
530  ods_log_assert(worker);
531  ods_log_assert(worker->type == WORKER_WORKER);
532 
533  while (worker->need_to_exit == 0) {
534  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
535  worker->thread_num);
536  now = time_now();
539  worker->task = schedule_pop_task(worker->engine->taskq);
540  if (worker->task) {
541  worker->working_with = worker->task->what;
543  worker->engine->taskq->schedule_locked = 0;
544 
545  zone = worker->task->zone;
546  lock_basic_lock(&zone->zone_lock);
547  zone->zone_locked = LOCKED_ZONE_WORKER(worker->thread_num);
548  ods_log_debug("[%s[%i]] start working on zone %s",
549  worker2str(worker->type), worker->thread_num, zone->name);
550 
551  worker->clock_in = time(NULL);
552  worker_perform_task(worker);
553 
554  zone->task = worker->task;
555 
556  ods_log_debug("[%s[%i]] finished working on zone %s",
557  worker2str(worker->type), worker->thread_num, zone->name);
558 
561  worker->task = NULL;
562  worker->working_with = TASK_NONE;
563  (void) schedule_task(worker->engine->taskq, zone->task, 1);
565  worker->engine->taskq->schedule_locked = 0;
567  zone->zone_locked = 0;
568 
569  timeout = 1;
570  } else {
571  ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
572  worker->thread_num);
573  worker->task = schedule_get_first_task(worker->engine->taskq);
575  if (worker->task && !worker->engine->taskq->loading) {
576  timeout = (worker->task->when - now);
577  } else {
578  timeout *= 2;
579  }
580  if (timeout > ODS_SE_MAX_BACKOFF) {
581  timeout = ODS_SE_MAX_BACKOFF;
582  }
583  worker->task = NULL;
584  worker_sleep(worker, timeout);
585  }
586  }
587  /* stop worker, wipe queue */
588  fifoq_wipe(worker->engine->signq);
589  return;
590 }
591 
592 
597 static void
598 worker_drudge(worker_type* worker)
599 {
600  engine_type* engine = NULL;
601  zone_type* zone = NULL;
602  task_type* task = NULL;
603  rrset_type* rrset = NULL;
604  ods_status status = ODS_STATUS_OK;
605  worker_type* chief = NULL;
606  hsm_ctx_t* ctx = NULL;
607 
608  ods_log_assert(worker);
609  ods_log_assert(worker->type == WORKER_DRUDGER);
610 
611  engine = (engine_type*) worker->engine;
612  while (worker->need_to_exit == 0) {
613  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
614  worker->thread_num);
615  chief = NULL;
616  zone = NULL;
617  task = NULL;
618 
619  lock_basic_lock(&worker->engine->signq->q_lock);
620  worker->engine->signq->q_locked = LOCKED_Q_DRUDGER(worker->thread_num);
621  rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
622  if (!rrset) {
623  ods_log_deeebug("[%s[%i]] nothing to do", worker2str(worker->type),
624  worker->thread_num);
625  worker_wait_locked(&engine->signq->q_lock,
626  &engine->signq->q_threshold);
627  rrset = (rrset_type*) fifoq_pop(engine->signq, &chief);
628  }
629  lock_basic_unlock(&worker->engine->signq->q_lock);
630  worker->engine->signq->q_locked = 0;
631  if (rrset) {
632  ods_log_assert(chief);
633  ods_log_debug("[%s[%i]] create hsm context",
634  worker2str(worker->type), worker->thread_num);
635  if (!ctx) {
636  ctx = hsm_create_context();
637  }
638  if (!ctx) {
639  ods_log_crit("[%s[%i]] error creating libhsm context",
640  worker2str(worker->type), worker->thread_num);
641  engine->need_to_reload = 1;
642  chief->jobs_failed++;
643  } else {
644  ods_log_assert(ctx);
645  lock_basic_lock(&chief->worker_lock);
647  task = chief->task;
648  ods_log_assert(task);
649  zone = task->zone;
651  chief->worker_locked = 0;
652  ods_log_assert(zone);
653  ods_log_assert(zone->signconf);
654  ods_log_assert(rrset);
655 
656  worker->clock_in = time(NULL);
657  status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
658  chief->clock_in, zone->stats);
659  lock_basic_lock(&chief->worker_lock);
661  if (status == ODS_STATUS_OK) {
662  chief->jobs_completed += 1;
663  } else {
664  chief->jobs_failed += 1;
665  }
667  worker->worker_locked = 0;
668  }
669  if (worker_fulfilled(chief) && chief->sleeping) {
670  ods_log_debug("[%s[%i]] wake up chief[%u], work is done",
671  worker2str(worker->type), worker->thread_num,
672  chief->thread_num);
673  worker_wakeup(chief);
674  }
675  }
676  }
677  /* stop drudger */
678 
679  if (chief && chief->sleeping) {
680  /* wake up chief */
681  ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting",
682  worker2str(worker->type), worker->thread_num, chief->thread_num);
683  worker_wakeup(chief);
684  }
685  if (ctx) {
686  /* cleanup open HSM sessions */
687  ods_log_debug("[%s[%i]] destroy hsm context",
688  worker2str(worker->type), worker->thread_num);
689  hsm_destroy_context(ctx);
690  }
691  return;
692 }
693 
694 
699 void
701 {
702  ods_log_assert(worker);
703  switch (worker->type) {
704  case WORKER_DRUDGER:
705  worker_drudge(worker);
706  break;
707  case WORKER_WORKER:
708  worker_work(worker);
709  break;
710  default:
711  ods_log_error("[worker] illegal worker (id=%i)", worker->type);
712  return;
713  }
714  return;
715 }
716 
717 
722 void
723 worker_sleep(worker_type* worker, time_t timeout)
724 {
725  ods_log_assert(worker);
726  lock_basic_lock(&worker->worker_lock);
728  worker->sleeping = 1;
729  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
730  timeout);
731  lock_basic_unlock(&worker->worker_lock);
732  worker->worker_locked = 0;
733  return;
734 }
735 
736 
741 void
742 worker_sleep_unless(worker_type* worker, time_t timeout)
743 {
744  ods_log_assert(worker);
745  lock_basic_lock(&worker->worker_lock);
747  while (!worker->need_to_exit && !worker_fulfilled(worker)) {
748  worker->sleeping = 1;
749  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
750  timeout);
751 
752  ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
753  "appointed, %u completed, %u failed", worker2str(worker->type),
754  worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
755  worker->jobs_failed);
756  }
757  lock_basic_unlock(&worker->worker_lock);
758  worker->worker_locked = 0;
759  return;
760 }
761 
762 
767 void
769 {
770  ods_log_assert(worker);
771  if (worker->sleeping) {
772  ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
773  worker->thread_num);
774  lock_basic_lock(&worker->worker_lock);
776  lock_basic_alarm(&worker->worker_alarm);
777  worker->sleeping = 0;
778  lock_basic_unlock(&worker->worker_lock);
779  worker->worker_locked = 0;
780  }
781  return;
782 }
783 
784 
789 void
790 worker_wait_timeout(lock_basic_type* lock, cond_basic_type* condition,
791  time_t timeout)
792 {
793  lock_basic_lock(lock);
794  lock_basic_sleep(condition, lock, timeout);
795  lock_basic_unlock(lock);
796  return;
797 }
798 
799 
804 void
805 worker_wait_timeout_locked(lock_basic_type* lock, cond_basic_type* condition,
806  time_t timeout)
807 {
808  lock_basic_sleep(condition, lock, timeout);
809  return;
810 }
811 
812 
817 void
818 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
819 {
820  worker_wait_timeout(lock, condition, 0);
821  return;
822 }
823 
824 
829 void
830 worker_wait_locked(lock_basic_type* lock, cond_basic_type* condition)
831 {
832  worker_wait_timeout_locked(lock, condition, 0);
833  return;
834 }
835 
836 
841 void
842 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
843 {
844  lock_basic_lock(lock);
845  lock_basic_alarm(condition);
846  lock_basic_unlock(lock);
847  return;
848 }
849 
850 
855 void
856 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
857 {
858  lock_basic_lock(lock);
859  lock_basic_broadcast(condition);
860  lock_basic_unlock(lock);
861  return;
862 }
863 
864 
869 void
871 {
873  cond_basic_type worker_cond;
874  lock_basic_type worker_lock;
875 
876  if (!worker) {
877  return;
878  }
879  allocator = worker->allocator;
880  worker_cond = worker->worker_alarm;
881  worker_lock = worker->worker_lock;
882 
883  allocator_deallocate(allocator, (void*) worker);
884  lock_basic_destroy(&worker_lock);
885  lock_basic_off(&worker_cond);
886  return;
887 }