OpenDNSSEC-signer  1.3.9
worker.c
Go to the documentation of this file.
1 /*
2  * $Id: worker.c 6290 2012-04-25 07:42:07Z matthijs $
3  *
4  * Copyright (c) 2009 NLNet Labs. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
21  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
23  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
25  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  *
27  */
28 
34 #include "adapter/adapi.h"
35 #include "daemon/engine.h"
36 #include "daemon/worker.h"
37 #include "shared/allocator.h"
38 #include "scheduler/schedule.h"
39 #include "scheduler/task.h"
40 #include "shared/hsm.h"
41 #include "shared/locks.h"
42 #include "shared/log.h"
43 #include "shared/status.h"
44 #include "shared/util.h"
45 #include "signer/tools.h"
46 #include "signer/zone.h"
47 #include "signer/zonedata.h"
48 
49 #include <time.h> /* time() */
50 
52  { WORKER_WORKER, "worker" },
53  { WORKER_DRUDGER, "drudger" },
54  { 0, NULL }
55 };
56 
57 
64 {
65  worker_type* worker;
66 
67  if (!allocator) {
68  return NULL;
69  }
70  ods_log_assert(allocator);
71 
72  worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
73  if (!worker) {
74  return NULL;
75  }
76 
77  ods_log_debug("create worker[%i]", num +1);
78  lock_basic_init(&worker->worker_lock);
79  lock_basic_set(&worker->worker_alarm);
80  lock_basic_lock(&worker->worker_lock);
81  worker->allocator = allocator;
82  worker->thread_num = num +1;
83  worker->engine = NULL;
84  worker->task = NULL;
85  worker->working_with = TASK_NONE;
86  worker->need_to_exit = 0;
87  worker->type = type;
88  worker->clock_in = 0;
89  worker->jobs_appointed = 0;
90  worker->jobs_completed = 0;
91  worker->jobs_failed = 0;
92  worker->sleeping = 0;
93  worker->waiting = 0;
95  return worker;
96 }
97 
98 
103 static const char*
104 worker2str(worker_id type)
105 {
106  ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
107  if (lt) {
108  return lt->name;
109  }
110  return NULL;
111 }
112 
113 
118 static int
119 worker_fulfilled(worker_type* worker)
120 {
121  return (worker->jobs_completed + worker->jobs_failed) ==
122  worker->jobs_appointed;
123 }
124 
125 
130 static void
131 worker_perform_task(worker_type* worker)
132 {
133  engine_type* engine = NULL;
134  zone_type* zone = NULL;
135  task_type* task = NULL;
136  task_id what = TASK_NONE;
137  time_t when = 0;
138  time_t never = (3600*24*365);
139  ods_status status = ODS_STATUS_OK;
140  int fallthrough = 0;
141  int backup = 0;
142  char* working_dir = NULL;
143  char* cfg_filename = NULL;
144  uint32_t tmpserial = 0;
145  time_t start = 0;
146  time_t end = 0;
147 
148  /* sanity checking */
149  if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
150  return;
151  }
152  ods_log_assert(worker);
153  ods_log_assert(worker->task);
154  ods_log_assert(worker->task->zone);
155 
156  engine = (engine_type*) worker->engine;
157  task = (task_type*) worker->task;
158  zone = (zone_type*) worker->task->zone;
159  ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
160  worker2str(worker->type), worker->thread_num, task_what2str(task->what),
161  task_who2str(task->who), (uint32_t) worker->clock_in);
162 
163  /* do what you have been told to do */
164  switch (task->what) {
165  case TASK_SIGNCONF:
166  worker->working_with = TASK_SIGNCONF;
167  /* perform 'load signconf' task */
168  ods_log_verbose("[%s[%i]] load signconf for zone %s",
169  worker2str(worker->type), worker->thread_num,
170  task_who2str(task->who));
171  status = zone_load_signconf(zone, &what);
172  if (status == ODS_STATUS_UNCHANGED) {
173  if (!zone->signconf->last_modified) {
174  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
175  worker2str(worker->type), worker->thread_num,
176  task_who2str(task->who));
177  status = ODS_STATUS_ERR;
178  }
179  }
180 
181  /* what to do next */
182  when = time_now();
183  if (status == ODS_STATUS_UNCHANGED) {
184  if (task->halted != TASK_NONE) {
185  goto task_perform_continue;
186  } else {
187  status = ODS_STATUS_OK;
188  }
189  }
190 
191  if (status == ODS_STATUS_OK) {
196  lhsm_check_connection((void*)engine);
197  status = zone_publish_dnskeys(zone, 0);
198  }
199  if (status == ODS_STATUS_OK) {
200  status = zone_prepare_nsec3(zone, 0);
201  }
202  if (status == ODS_STATUS_OK) {
203  status = zonedata_commit(zone->zonedata);
204  }
205 
206  if (status == ODS_STATUS_OK) {
207  zone->prepared = 1;
208  task->interrupt = TASK_NONE;
209  task->halted = TASK_NONE;
210  } else {
211  if (task->halted == TASK_NONE) {
212  goto task_perform_fail;
213  }
214  goto task_perform_continue;
215  }
216  fallthrough = 0;
217  break;
218  case TASK_READ:
219  worker->working_with = TASK_READ;
220  /* perform 'read input adapter' task */
221  ods_log_verbose("[%s[%i]] read zone %s",
222  worker2str(worker->type), worker->thread_num,
223  task_who2str(task->who));
224  if (!zone->prepared) {
225  ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet",
226  worker2str(worker->type), worker->thread_num,
227  task_who2str(task->who));
228  status = ODS_STATUS_ERR;
229  } else {
230  status = tools_input(zone);
231  }
232 
233  /* what to do next */
234  what = TASK_NSECIFY;
235  when = time_now();
236  if (status != ODS_STATUS_OK) {
237  if (task->halted == TASK_NONE) {
238  goto task_perform_fail;
239  }
240  goto task_perform_continue;
241  }
242  fallthrough = 1;
243  case TASK_NSECIFY:
244  worker->working_with = TASK_NSECIFY;
245  ods_log_verbose("[%s[%i]] nsecify zone %s",
246  worker2str(worker->type), worker->thread_num,
247  task_who2str(task->who));
248  status = tools_nsecify(zone);
249 
250  /* what to do next */
251  what = TASK_SIGN;
252  when = time_now();
253  if (status == ODS_STATUS_OK) {
254  if (task->interrupt > TASK_SIGNCONF) {
255  task->interrupt = TASK_NONE;
256  task->halted = TASK_NONE;
257  }
258  } else {
259  if (task->halted == TASK_NONE) {
260  goto task_perform_fail;
261  }
262  goto task_perform_continue;
263  }
264  fallthrough = 1;
265  case TASK_SIGN:
266  worker->working_with = TASK_SIGN;
267  ods_log_verbose("[%s[%i]] sign zone %s",
268  worker2str(worker->type), worker->thread_num,
269  task_who2str(task->who));
270  tmpserial = zone->zonedata->internal_serial;
271  status = zone_update_serial(zone);
272  if (status != ODS_STATUS_OK) {
273  ods_log_error("[%s[%i]] unable to sign zone %s: "
274  "failed to increment serial",
275  worker2str(worker->type), worker->thread_num,
276  task_who2str(task->who));
277  } else {
278  /* start timer */
279  start = time(NULL);
280  if (zone->stats) {
282  if (!zone->stats->start_time) {
283  zone->stats->start_time = start;
284  }
285  zone->stats->sig_count = 0;
286  zone->stats->sig_soa_count = 0;
287  zone->stats->sig_reuse = 0;
288  zone->stats->sig_time = 0;
290  }
291  /* check the HSM connection before queuing sign operations */
292  lhsm_check_connection((void*)engine);
293  /* queue menial, hard signing work */
294  status = zonedata_queue(zone->zonedata, engine->signq, worker);
295  ods_log_debug("[%s[%i]] wait until drudgers are finished "
296  "signing zone %s, %u signatures queued",
297  worker2str(worker->type), worker->thread_num,
298  task_who2str(task->who), worker->jobs_appointed);
299  /* sleep until work is done */
300  if (!worker->need_to_exit) {
301  worker_sleep_unless(worker, 0);
302  }
303  if (worker->jobs_failed) {
304  ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
305  "signatures failed", worker2str(worker->type),
306  worker->thread_num, task_who2str(task->who),
307  worker->jobs_failed, worker->jobs_appointed);
308  status = ODS_STATUS_ERR;
309  } else if (!worker_fulfilled(worker)) {
310  ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
311  "signatures completed", worker2str(worker->type),
312  worker->thread_num, task_who2str(task->who),
313  worker->jobs_completed, worker->jobs_appointed);
314  status = ODS_STATUS_ERR;
315  } else if (worker->need_to_exit) {
316  ods_log_debug("[%s[%i]] sign zone %s failed: worker "
317  "needs to exit", worker2str(worker->type),
318  worker->thread_num, task_who2str(task->who));
319  status = ODS_STATUS_ERR;
320  } else {
321  ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u "
322  "signatures succeeded", worker2str(worker->type),
323  worker->thread_num, task_who2str(task->who),
324  worker->jobs_completed, worker->jobs_appointed);
325  ods_log_assert(worker->jobs_appointed ==
326  worker->jobs_completed);
327  }
328  worker->jobs_appointed = 0;
329  worker->jobs_completed = 0;
330  worker->jobs_failed = 0;
331  /* stop timer */
332  end = time(NULL);
333  if (status == ODS_STATUS_OK && zone->stats) {
335  zone->stats->sig_time = (end-start);
337  }
338  }
339 
340  /* what to do next */
341  if (status != ODS_STATUS_OK) {
342  /* rollback serial */
343  zone->zonedata->internal_serial = tmpserial;
344  if (task->halted == TASK_NONE) {
345  goto task_perform_fail;
346  }
347  goto task_perform_continue;
348  } else {
349  if (task->interrupt > TASK_SIGNCONF) {
350  task->interrupt = TASK_NONE;
351  task->halted = TASK_NONE;
352  }
353  }
354  what = TASK_AUDIT;
355  when = time_now();
356  fallthrough = 1;
357  case TASK_AUDIT:
358  worker->working_with = TASK_AUDIT;
359  if (zone->signconf->audit) {
360  ods_log_verbose("[%s[%i]] audit zone %s",
361  worker2str(worker->type), worker->thread_num,
362  task_who2str(task->who));
363  working_dir = strdup(engine->config->working_dir);
364  cfg_filename = strdup(engine->config->cfg_filename);
365  status = tools_audit(zone, working_dir, cfg_filename);
366  if (working_dir) { free((void*)working_dir); }
367  if (cfg_filename) { free((void*)cfg_filename); }
368  working_dir = NULL;
369  cfg_filename = NULL;
370  } else {
371  status = ODS_STATUS_OK;
372  }
373 
374  /* what to do next */
375  if (status != ODS_STATUS_OK) {
376  if (task->halted == TASK_NONE) {
377  goto task_perform_fail;
378  }
379  goto task_perform_continue;
380  }
381  what = TASK_WRITE;
382  when = time_now();
383  fallthrough = 1;
384  case TASK_WRITE:
385  worker->working_with = TASK_WRITE;
386  ods_log_verbose("[%s[%i]] write zone %s",
387  worker2str(worker->type), worker->thread_num,
388  task_who2str(task->who));
389 
390  status = tools_output(zone);
391  zone->processed = 1;
392 
393  /* what to do next */
394  if (status != ODS_STATUS_OK) {
395  if (task->halted == TASK_NONE) {
396  goto task_perform_fail;
397  }
398  goto task_perform_continue;
399  } else {
400  if (task->interrupt > TASK_SIGNCONF) {
401  task->interrupt = TASK_NONE;
402  task->halted = TASK_NONE;
403  }
404  }
406  what = TASK_SIGN;
407  when = time_now() +
409  } else {
410  what = TASK_NONE;
411  when = time_now() + never;
412  }
413  backup = 1;
414  fallthrough = 0;
415  break;
416  case TASK_NONE:
417  worker->working_with = TASK_NONE;
418  ods_log_warning("[%s[%i]] none task for zone %s",
419  worker2str(worker->type), worker->thread_num,
420  task_who2str(task->who));
421  when = time_now() + never;
422  fallthrough = 0;
423  break;
424  default:
425  ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
426  worker2str(worker->type), worker->thread_num,
427  task_who2str(task->who));
428  what = TASK_SIGNCONF;
429  when = time_now();
430  fallthrough = 0;
431  break;
432  }
433 
434  /* no error, reset backoff */
435  task->backoff = 0;
436 
437  /* set next task */
438  if (fallthrough == 0 && task->interrupt != TASK_NONE &&
439  task->interrupt != what) {
440  ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
441  worker2str(worker->type), worker->thread_num,
442  task_what2str(what), task_who2str(task->who));
443 
444  task->what = task->interrupt;
445  task->when = time_now();
446  task->halted = what;
447  } else {
448  ods_log_debug("[%s[%i]] next task %s for zone %s",
449  worker2str(worker->type), worker->thread_num,
450  task_what2str(what), task_who2str(task->who));
451 
452  task->what = what;
453  task->when = when;
454  if (!fallthrough) {
455  task->interrupt = TASK_NONE;
456  task->halted = TASK_NONE;
457  }
458  }
459 
460  /* backup the last successful run */
461  if (backup) {
462  status = zone_backup(zone);
463  if (status != ODS_STATUS_OK) {
464  ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
465  worker2str(worker->type), worker->thread_num,
466  task_who2str(task->who), ods_status2str(status));
467  /* just a warning */
468  status = ODS_STATUS_OK;
469  }
470  backup = 0;
471  }
472  return;
473 
474 task_perform_fail:
475  /* in case of failure, also mark zone processed (for single run usage) */
476  zone->processed = 1;
477 
478  if (task->backoff) {
479  task->backoff *= 2;
480  if (task->backoff > ODS_SE_MAX_BACKOFF) {
481  task->backoff = ODS_SE_MAX_BACKOFF;
482  }
483  } else {
484  task->backoff = 60;
485  }
486  ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
487  worker2str(worker->type), worker->thread_num,
488  task_what2str(task->what), task_who2str(task->who), task->backoff);
489 
490  task->when = time_now() + task->backoff;
491  return;
492 
493 task_perform_continue:
494  ods_log_info("[%s[%i]] continue task %s for zone %s",
495  worker2str(worker->type), worker->thread_num,
496  task_what2str(task->halted), task_who2str(task->who));
497 
498  what = task->halted;
499  task->what = what;
500  task->when = time_now();
501  task->interrupt = TASK_NONE;
502  task->halted = TASK_NONE;
503  if (zone->processed) {
505  }
506  return;
507 }
508 
509 
514 static void
515 worker_work(worker_type* worker)
516 {
517  time_t now, timeout = 1;
518  zone_type* zone = NULL;
519 
520  ods_log_assert(worker);
521  ods_log_assert(worker->type == WORKER_WORKER);
522 
523  while (worker->need_to_exit == 0) {
524  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
525  worker->thread_num);
527  /* [LOCK] schedule */
528  worker->task = schedule_pop_task(worker->engine->taskq);
529  /* [UNLOCK] schedule */
530  if (worker->task) {
531  worker->working_with = worker->task->what;
533 
534  zone = worker->task->zone;
535  lock_basic_lock(&zone->zone_lock);
536  /* [LOCK] zone */
537  ods_log_debug("[%s[%i]] start working on zone %s",
538  worker2str(worker->type), worker->thread_num, zone->name);
539 
540  worker->clock_in = time(NULL);
541  worker_perform_task(worker);
542 
543  zone->task = worker->task;
544 
545  ods_log_debug("[%s[%i]] finished working on zone %s",
546  worker2str(worker->type), worker->thread_num, zone->name);
547  /* [UNLOCK] zone */
548 
550  /* [LOCK] zone, schedule */
551  worker->task = NULL;
552  worker->working_with = TASK_NONE;
553  (void) schedule_task(worker->engine->taskq, zone->task, 1);
554  /* [UNLOCK] zone, schedule */
557 
558  timeout = 1;
559  } else {
560  ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
561  worker->thread_num);
562 
563  /* [LOCK] schedule */
564  worker->task = schedule_get_first_task(worker->engine->taskq);
565  /* [UNLOCK] schedule */
567 
568  now = time_now();
569  if (worker->task && !worker->engine->taskq->loading) {
570  timeout = (worker->task->when - now);
571  } else {
572  timeout *= 2;
573  if (timeout > ODS_SE_MAX_BACKOFF) {
574  timeout = ODS_SE_MAX_BACKOFF;
575  }
576  }
577  worker->task = NULL;
578  worker_sleep(worker, timeout);
579  }
580  }
581  /* stop worker, wipe queue */
582  fifoq_wipe(worker->engine->signq);
583  return;
584 }
585 
586 
591 static void
592 worker_drudge(worker_type* worker)
593 {
594  zone_type* zone = NULL;
595  rrset_type* rrset = NULL;
596  ods_status status = ODS_STATUS_OK;
597  worker_type* chief = NULL;
598  hsm_ctx_t* ctx = NULL;
599 
600  ods_log_assert(worker);
601  ods_log_assert(worker->type == WORKER_DRUDGER);
602 
603  ods_log_debug("[%s[%i]] create hsm context",
604  worker2str(worker->type), worker->thread_num);
605  ctx = hsm_create_context();
606  if (!ctx) {
607  ods_log_crit("[%s[%i]] error creating libhsm context",
608  worker2str(worker->type), worker->thread_num);
609  }
610 
611  while (worker->need_to_exit == 0) {
612  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
613  worker->thread_num);
614  chief = NULL;
615  zone = NULL;
616 
617  lock_basic_lock(&worker->engine->signq->q_lock);
618  /* [LOCK] schedule */
619  rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
620  /* [UNLOCK] schedule */
621  lock_basic_unlock(&worker->engine->signq->q_lock);
622  if (rrset) {
623  /* set up the work */
624  if (chief && chief->task) {
625  zone = chief->task->zone;
626  }
627  if (!zone) {
628  ods_log_error("[%s[%i]] unable to drudge: no zone reference",
629  worker2str(worker->type), worker->thread_num);
630  }
631  if (zone && ctx) {
632  ods_log_assert(rrset);
633  ods_log_assert(zone->dname);
634  ods_log_assert(zone->signconf);
635 
636  worker->clock_in = time(NULL);
637  status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
638  chief->clock_in, zone->stats);
639  } else {
640  status = ODS_STATUS_ASSERT_ERR;
641  }
642 
643  if (chief) {
644  lock_basic_lock(&chief->worker_lock);
645  if (status == ODS_STATUS_OK) {
646  chief->jobs_completed += 1;
647  } else {
648  chief->jobs_failed += 1;
649  /* destroy context? */
650  }
652 
653  if (worker_fulfilled(chief) && chief->sleeping) {
654  ods_log_debug("[%s[%i]] wake up chief[%u], work is done",
655  worker2str(worker->type), worker->thread_num,
656  chief->thread_num);
657  worker_wakeup(chief);
658  chief = NULL;
659  }
660  }
661  rrset = NULL;
662  } else {
663  ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
664  worker->thread_num);
665 
666  worker_wait(&worker->engine->signq->q_lock,
667  &worker->engine->signq->q_threshold);
668  }
669  }
670  /* stop drudger */
671 
672  if (chief && chief->sleeping) {
673  /* wake up chief */
674  ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting",
675  worker2str(worker->type), worker->thread_num, chief->thread_num);
676  worker_wakeup(chief);
677  }
678  if (ctx) {
679  /* cleanup open HSM sessions */
680  ods_log_debug("[%s[%i]] destroy hsm context",
681  worker2str(worker->type), worker->thread_num);
682  hsm_destroy_context(ctx);
683  }
684  return;
685 }
686 
687 
692 void
694 {
695  ods_log_assert(worker);
696  switch (worker->type) {
697  case WORKER_DRUDGER:
698  worker_drudge(worker);
699  break;
700  case WORKER_WORKER:
701  worker_work(worker);
702  break;
703  default:
704  ods_log_error("[worker] illegal worker (id=%i)", worker->type);
705  return;
706  }
707  return;
708 }
709 
710 
715 void
716 worker_sleep(worker_type* worker, time_t timeout)
717 {
718  ods_log_assert(worker);
719  lock_basic_lock(&worker->worker_lock);
720  /* [LOCK] worker */
721  worker->sleeping = 1;
722  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
723  timeout);
724  /* [UNLOCK] worker */
725  lock_basic_unlock(&worker->worker_lock);
726  return;
727 }
728 
729 
734 void
735 worker_sleep_unless(worker_type* worker, time_t timeout)
736 {
737  ods_log_assert(worker);
738  lock_basic_lock(&worker->worker_lock);
739  /* [LOCK] worker */
740  while (!worker->need_to_exit && !worker_fulfilled(worker)) {
741  worker->sleeping = 1;
742  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
743  timeout);
744 
745  ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
746  "appointed, %u completed, %u failed", worker2str(worker->type),
747  worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
748  worker->jobs_failed);
749  }
750  /* [UNLOCK] worker */
751  lock_basic_unlock(&worker->worker_lock);
752  return;
753 }
754 
755 
760 void
762 {
763  ods_log_assert(worker);
764  if (worker->sleeping) {
765  ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
766  worker->thread_num);
767  lock_basic_lock(&worker->worker_lock);
768  /* [LOCK] worker */
769  lock_basic_alarm(&worker->worker_alarm);
770  worker->sleeping = 0;
771  /* [UNLOCK] worker */
772  lock_basic_unlock(&worker->worker_lock);
773  }
774  return;
775 }
776 
777 
782 void
783 worker_wait_timeout(lock_basic_type* lock, cond_basic_type* condition,
784  time_t timeout)
785 {
786  lock_basic_lock(lock);
787  /* [LOCK] worker */
788  lock_basic_sleep(condition, lock, timeout);
789  /* [UNLOCK] worker */
790  lock_basic_unlock(lock);
791  return;
792 }
793 
794 
799 void
800 worker_wait_timeout_locked(lock_basic_type* lock, cond_basic_type* condition,
801  time_t timeout)
802 {
803  lock_basic_sleep(condition, lock, timeout);
804  return;
805 }
806 
807 
812 void
813 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
814 {
815  worker_wait_timeout(lock, condition, 0);
816  return;
817 }
818 
819 
824 void
825 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
826 {
827  lock_basic_lock(lock);
828  /* [LOCK] lock */
829  lock_basic_alarm(condition);
830  /* [UNLOCK] lock */
831  lock_basic_unlock(lock);
832  return;
833 }
834 
835 
840 void
841 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
842 {
843  lock_basic_lock(lock);
844  /* [LOCK] lock */
845  lock_basic_broadcast(condition);
846  /* [UNLOCK] lock */
847  lock_basic_unlock(lock);
848  return;
849 }
850 
851 
856 void
858 {
860  cond_basic_type worker_cond;
861  lock_basic_type worker_lock;
862 
863  if (!worker) {
864  return;
865  }
866  allocator = worker->allocator;
867  worker_cond = worker->worker_alarm;
868  worker_lock = worker->worker_lock;
869 
870  allocator_deallocate(allocator, (void*) worker);
871  lock_basic_destroy(&worker_lock);
872  lock_basic_off(&worker_cond);
873  return;
874 }