Libav 0.7.1
|
00001 /* 00002 * Copyright (c) 2004 Roman Shaposhnik 00003 * Copyright (c) 2008 Alexander Strange (astrange@ithinksw.com) 00004 * 00005 * Many thanks to Steven M. Schultz for providing clever ideas and 00006 * to Michael Niedermayer <michaelni@gmx.at> for writing initial 00007 * implementation. 00008 * 00009 * This file is part of Libav. 00010 * 00011 * Libav is free software; you can redistribute it and/or 00012 * modify it under the terms of the GNU Lesser General Public 00013 * License as published by the Free Software Foundation; either 00014 * version 2.1 of the License, or (at your option) any later version. 00015 * 00016 * Libav is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00019 * Lesser General Public License for more details. 00020 * 00021 * You should have received a copy of the GNU Lesser General Public 00022 * License along with Libav; if not, write to the Free Software 00023 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 00024 */ 00025 00032 #include <pthread.h> 00033 00034 #include "avcodec.h" 00035 #include "thread.h" 00036 00037 typedef int (action_func)(AVCodecContext *c, void *arg); 00038 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr); 00039 00040 typedef struct ThreadContext { 00041 pthread_t *workers; 00042 action_func *func; 00043 action_func2 *func2; 00044 void *args; 00045 int *rets; 00046 int rets_count; 00047 int job_count; 00048 int job_size; 00049 00050 pthread_cond_t last_job_cond; 00051 pthread_cond_t current_job_cond; 00052 pthread_mutex_t current_job_lock; 00053 int current_job; 00054 int done; 00055 } ThreadContext; 00056 00058 #define MAX_BUFFERS (32+1) 00059 00063 typedef struct PerThreadContext { 00064 struct FrameThreadContext *parent; 00065 00066 pthread_t thread; 00067 pthread_cond_t input_cond; 00068 pthread_cond_t progress_cond; 00069 pthread_cond_t output_cond; 00070 00071 pthread_mutex_t mutex; 00072 pthread_mutex_t progress_mutex; 00073 00074 AVCodecContext *avctx; 00075 00076 AVPacket avpkt; 00077 int allocated_buf_size; 00078 00079 AVFrame frame; 00080 int got_frame; 00081 int result; 00082 00083 enum { 00084 STATE_INPUT_READY, 00085 STATE_SETTING_UP, 00086 STATE_GET_BUFFER, 00090 STATE_SETUP_FINISHED 00091 } state; 00092 00097 AVFrame released_buffers[MAX_BUFFERS]; 00098 int num_released_buffers; 00099 00103 int progress[MAX_BUFFERS][2]; 00104 uint8_t progress_used[MAX_BUFFERS]; 00105 00106 AVFrame *requested_frame; 00107 } PerThreadContext; 00108 00112 typedef struct FrameThreadContext { 00113 PerThreadContext *threads; 00114 PerThreadContext *prev_thread; 00115 00116 pthread_mutex_t buffer_mutex; 00117 00118 int next_decoding; 00119 int next_finished; 00120 00121 int delaying; 00126 int die; 00127 } FrameThreadContext; 00128 00129 static void* attribute_align_arg worker(void *v) 00130 { 00131 AVCodecContext *avctx = v; 00132 ThreadContext *c = avctx->thread_opaque; 00133 int our_job = c->job_count; 00134 int thread_count = avctx->thread_count; 00135 int self_id; 00136 00137 pthread_mutex_lock(&c->current_job_lock); 00138 self_id = c->current_job++; 00139 for (;;){ 00140 while (our_job >= c->job_count) { 00141 if (c->current_job == thread_count + c->job_count) 00142 pthread_cond_signal(&c->last_job_cond); 00143 00144 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); 00145 our_job = self_id; 00146 00147 if (c->done) { 00148 pthread_mutex_unlock(&c->current_job_lock); 00149 return NULL; 00150 } 00151 } 00152 pthread_mutex_unlock(&c->current_job_lock); 00153 00154 c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size): 00155 c->func2(avctx, c->args, our_job, self_id); 00156 00157 pthread_mutex_lock(&c->current_job_lock); 00158 our_job = c->current_job++; 00159 } 00160 } 00161 00162 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count) 00163 { 00164 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); 00165 pthread_mutex_unlock(&c->current_job_lock); 00166 } 00167 00168 static void thread_free(AVCodecContext *avctx) 00169 { 00170 ThreadContext *c = avctx->thread_opaque; 00171 int i; 00172 00173 pthread_mutex_lock(&c->current_job_lock); 00174 c->done = 1; 00175 pthread_cond_broadcast(&c->current_job_cond); 00176 pthread_mutex_unlock(&c->current_job_lock); 00177 00178 for (i=0; i<avctx->thread_count; i++) 00179 pthread_join(c->workers[i], NULL); 00180 00181 pthread_mutex_destroy(&c->current_job_lock); 00182 pthread_cond_destroy(&c->current_job_cond); 00183 pthread_cond_destroy(&c->last_job_cond); 00184 av_free(c->workers); 00185 av_freep(&avctx->thread_opaque); 00186 } 00187 00188 static int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size) 00189 { 00190 ThreadContext *c= avctx->thread_opaque; 00191 int dummy_ret; 00192 00193 if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1) 00194 return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size); 00195 00196 if (job_count <= 0) 00197 return 0; 00198 00199 pthread_mutex_lock(&c->current_job_lock); 00200 00201 c->current_job = avctx->thread_count; 00202 c->job_count = job_count; 00203 c->job_size = job_size; 00204 c->args = arg; 00205 c->func = func; 00206 if (ret) { 00207 c->rets = ret; 00208 c->rets_count = job_count; 00209 } else { 00210 c->rets = &dummy_ret; 00211 c->rets_count = 1; 00212 } 00213 pthread_cond_broadcast(&c->current_job_cond); 00214 00215 avcodec_thread_park_workers(c, avctx->thread_count); 00216 00217 return 0; 00218 } 00219 00220 static int avcodec_thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count) 00221 { 00222 ThreadContext *c= avctx->thread_opaque; 00223 c->func2 = func2; 00224 return avcodec_thread_execute(avctx, NULL, arg, ret, job_count, 0); 00225 } 00226 00227 static int thread_init(AVCodecContext *avctx) 00228 { 00229 int i; 00230 ThreadContext *c; 00231 int thread_count = avctx->thread_count; 00232 00233 if (thread_count <= 1) 00234 return 0; 00235 00236 c = av_mallocz(sizeof(ThreadContext)); 00237 if (!c) 00238 return -1; 00239 00240 c->workers = av_mallocz(sizeof(pthread_t)*thread_count); 00241 if (!c->workers) { 00242 av_free(c); 00243 return -1; 00244 } 00245 00246 avctx->thread_opaque = c; 00247 c->current_job = 0; 00248 c->job_count = 0; 00249 c->job_size = 0; 00250 c->done = 0; 00251 pthread_cond_init(&c->current_job_cond, NULL); 00252 pthread_cond_init(&c->last_job_cond, NULL); 00253 pthread_mutex_init(&c->current_job_lock, NULL); 00254 pthread_mutex_lock(&c->current_job_lock); 00255 for (i=0; i<thread_count; i++) { 00256 if(pthread_create(&c->workers[i], NULL, worker, avctx)) { 00257 avctx->thread_count = i; 00258 pthread_mutex_unlock(&c->current_job_lock); 00259 ff_thread_free(avctx); 00260 return -1; 00261 } 00262 } 00263 00264 avcodec_thread_park_workers(c, thread_count); 00265 00266 avctx->execute = avcodec_thread_execute; 00267 avctx->execute2 = avcodec_thread_execute2; 00268 return 0; 00269 } 00270 00278 static attribute_align_arg void *frame_worker_thread(void *arg) 00279 { 00280 PerThreadContext *p = arg; 00281 FrameThreadContext *fctx = p->parent; 00282 AVCodecContext *avctx = p->avctx; 00283 AVCodec *codec = avctx->codec; 00284 00285 while (1) { 00286 if (p->state == STATE_INPUT_READY && !fctx->die) { 00287 pthread_mutex_lock(&p->mutex); 00288 while (p->state == STATE_INPUT_READY && !fctx->die) 00289 pthread_cond_wait(&p->input_cond, &p->mutex); 00290 pthread_mutex_unlock(&p->mutex); 00291 } 00292 00293 if (fctx->die) break; 00294 00295 if (!codec->update_thread_context && avctx->thread_safe_callbacks) 00296 ff_thread_finish_setup(avctx); 00297 00298 pthread_mutex_lock(&p->mutex); 00299 avcodec_get_frame_defaults(&p->frame); 00300 p->got_frame = 0; 00301 p->result = codec->decode(avctx, &p->frame, &p->got_frame, &p->avpkt); 00302 00303 if (p->state == STATE_SETTING_UP) ff_thread_finish_setup(avctx); 00304 00305 p->state = STATE_INPUT_READY; 00306 00307 pthread_mutex_lock(&p->progress_mutex); 00308 pthread_cond_signal(&p->output_cond); 00309 pthread_mutex_unlock(&p->progress_mutex); 00310 00311 pthread_mutex_unlock(&p->mutex); 00312 } 00313 00314 return NULL; 00315 } 00316 00324 static int update_context_from_thread(AVCodecContext *dst, AVCodecContext *src, int for_user) 00325 { 00326 int err = 0; 00327 00328 if (dst != src) { 00329 dst->sub_id = src->sub_id; 00330 dst->time_base = src->time_base; 00331 dst->width = src->width; 00332 dst->height = src->height; 00333 dst->pix_fmt = src->pix_fmt; 00334 00335 dst->has_b_frames = src->has_b_frames; 00336 dst->idct_algo = src->idct_algo; 00337 dst->slice_count = src->slice_count; 00338 00339 dst->bits_per_coded_sample = src->bits_per_coded_sample; 00340 dst->sample_aspect_ratio = src->sample_aspect_ratio; 00341 dst->dtg_active_format = src->dtg_active_format; 00342 00343 dst->profile = src->profile; 00344 dst->level = src->level; 00345 00346 dst->bits_per_raw_sample = src->bits_per_raw_sample; 00347 dst->ticks_per_frame = src->ticks_per_frame; 00348 dst->color_primaries = src->color_primaries; 00349 00350 dst->color_trc = src->color_trc; 00351 dst->colorspace = src->colorspace; 00352 dst->color_range = src->color_range; 00353 dst->chroma_sample_location = src->chroma_sample_location; 00354 } 00355 00356 if (for_user) { 00357 dst->coded_frame = src->coded_frame; 00358 dst->has_b_frames += src->thread_count - 1; 00359 } else { 00360 if (dst->codec->update_thread_context) 00361 err = dst->codec->update_thread_context(dst, src); 00362 } 00363 00364 return err; 00365 } 00366 00373 static void update_context_from_user(AVCodecContext *dst, AVCodecContext *src) 00374 { 00375 #define copy_fields(s, e) memcpy(&dst->s, &src->s, (char*)&dst->e - (char*)&dst->s); 00376 dst->flags = src->flags; 00377 00378 dst->draw_horiz_band= src->draw_horiz_band; 00379 dst->get_buffer = src->get_buffer; 00380 dst->release_buffer = src->release_buffer; 00381 00382 dst->opaque = src->opaque; 00383 dst->dsp_mask = src->dsp_mask; 00384 dst->debug = src->debug; 00385 dst->debug_mv = src->debug_mv; 00386 00387 dst->slice_flags = src->slice_flags; 00388 dst->flags2 = src->flags2; 00389 00390 copy_fields(skip_loop_filter, bidir_refine); 00391 00392 dst->frame_number = src->frame_number; 00393 dst->reordered_opaque = src->reordered_opaque; 00394 #undef copy_fields 00395 } 00396 00397 static void free_progress(AVFrame *f) 00398 { 00399 PerThreadContext *p = f->owner->thread_opaque; 00400 int *progress = f->thread_opaque; 00401 00402 p->progress_used[(progress - p->progress[0]) / 2] = 0; 00403 } 00404 00406 static void release_delayed_buffers(PerThreadContext *p) 00407 { 00408 FrameThreadContext *fctx = p->parent; 00409 00410 while (p->num_released_buffers > 0) { 00411 AVFrame *f; 00412 00413 pthread_mutex_lock(&fctx->buffer_mutex); 00414 f = &p->released_buffers[--p->num_released_buffers]; 00415 free_progress(f); 00416 f->thread_opaque = NULL; 00417 00418 f->owner->release_buffer(f->owner, f); 00419 pthread_mutex_unlock(&fctx->buffer_mutex); 00420 } 00421 } 00422 00423 static int submit_packet(PerThreadContext *p, AVPacket *avpkt) 00424 { 00425 FrameThreadContext *fctx = p->parent; 00426 PerThreadContext *prev_thread = fctx->prev_thread; 00427 AVCodec *codec = p->avctx->codec; 00428 uint8_t *buf = p->avpkt.data; 00429 00430 if (!avpkt->size && !(codec->capabilities & CODEC_CAP_DELAY)) return 0; 00431 00432 pthread_mutex_lock(&p->mutex); 00433 00434 release_delayed_buffers(p); 00435 00436 if (prev_thread) { 00437 int err; 00438 if (prev_thread->state == STATE_SETTING_UP) { 00439 pthread_mutex_lock(&prev_thread->progress_mutex); 00440 while (prev_thread->state == STATE_SETTING_UP) 00441 pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex); 00442 pthread_mutex_unlock(&prev_thread->progress_mutex); 00443 } 00444 00445 err = update_context_from_thread(p->avctx, prev_thread->avctx, 0); 00446 if (err) { 00447 pthread_mutex_unlock(&p->mutex); 00448 return err; 00449 } 00450 } 00451 00452 av_fast_malloc(&buf, &p->allocated_buf_size, avpkt->size + FF_INPUT_BUFFER_PADDING_SIZE); 00453 p->avpkt = *avpkt; 00454 p->avpkt.data = buf; 00455 memcpy(buf, avpkt->data, avpkt->size); 00456 memset(buf + avpkt->size, 0, FF_INPUT_BUFFER_PADDING_SIZE); 00457 00458 p->state = STATE_SETTING_UP; 00459 pthread_cond_signal(&p->input_cond); 00460 pthread_mutex_unlock(&p->mutex); 00461 00462 /* 00463 * If the client doesn't have a thread-safe get_buffer(), 00464 * then decoding threads call back to the main thread, 00465 * and it calls back to the client here. 00466 */ 00467 00468 if (!p->avctx->thread_safe_callbacks && 00469 p->avctx->get_buffer != avcodec_default_get_buffer) { 00470 while (p->state != STATE_SETUP_FINISHED && p->state != STATE_INPUT_READY) { 00471 pthread_mutex_lock(&p->progress_mutex); 00472 while (p->state == STATE_SETTING_UP) 00473 pthread_cond_wait(&p->progress_cond, &p->progress_mutex); 00474 00475 if (p->state == STATE_GET_BUFFER) { 00476 p->result = p->avctx->get_buffer(p->avctx, p->requested_frame); 00477 p->state = STATE_SETTING_UP; 00478 pthread_cond_signal(&p->progress_cond); 00479 } 00480 pthread_mutex_unlock(&p->progress_mutex); 00481 } 00482 } 00483 00484 fctx->prev_thread = p; 00485 00486 return 0; 00487 } 00488 00489 int ff_thread_decode_frame(AVCodecContext *avctx, 00490 AVFrame *picture, int *got_picture_ptr, 00491 AVPacket *avpkt) 00492 { 00493 FrameThreadContext *fctx = avctx->thread_opaque; 00494 int finished = fctx->next_finished; 00495 PerThreadContext *p; 00496 int err; 00497 00498 /* 00499 * Submit a packet to the next decoding thread. 00500 */ 00501 00502 p = &fctx->threads[fctx->next_decoding]; 00503 update_context_from_user(p->avctx, avctx); 00504 err = submit_packet(p, avpkt); 00505 if (err) return err; 00506 00507 fctx->next_decoding++; 00508 00509 /* 00510 * If we're still receiving the initial packets, don't return a frame. 00511 */ 00512 00513 if (fctx->delaying && avpkt->size) { 00514 if (fctx->next_decoding >= (avctx->thread_count-1)) fctx->delaying = 0; 00515 00516 *got_picture_ptr=0; 00517 return 0; 00518 } 00519 00520 /* 00521 * Return the next available frame from the oldest thread. 00522 * If we're at the end of the stream, then we have to skip threads that 00523 * didn't output a frame, because we don't want to accidentally signal 00524 * EOF (avpkt->size == 0 && *got_picture_ptr == 0). 00525 */ 00526 00527 do { 00528 p = &fctx->threads[finished++]; 00529 00530 if (p->state != STATE_INPUT_READY) { 00531 pthread_mutex_lock(&p->progress_mutex); 00532 while (p->state != STATE_INPUT_READY) 00533 pthread_cond_wait(&p->output_cond, &p->progress_mutex); 00534 pthread_mutex_unlock(&p->progress_mutex); 00535 } 00536 00537 *picture = p->frame; 00538 *got_picture_ptr = p->got_frame; 00539 picture->pkt_dts = p->avpkt.dts; 00540 00541 /* 00542 * A later call with avkpt->size == 0 may loop over all threads, 00543 * including this one, searching for a frame to return before being 00544 * stopped by the "finished != fctx->next_finished" condition. 00545 * Make sure we don't mistakenly return the same frame again. 00546 */ 00547 p->got_frame = 0; 00548 00549 if (finished >= avctx->thread_count) finished = 0; 00550 } while (!avpkt->size && !*got_picture_ptr && finished != fctx->next_finished); 00551 00552 update_context_from_thread(avctx, p->avctx, 1); 00553 00554 if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0; 00555 00556 fctx->next_finished = finished; 00557 00558 return p->result; 00559 } 00560 00561 void ff_thread_report_progress(AVFrame *f, int n, int field) 00562 { 00563 PerThreadContext *p; 00564 int *progress = f->thread_opaque; 00565 00566 if (!progress || progress[field] >= n) return; 00567 00568 p = f->owner->thread_opaque; 00569 00570 if (f->owner->debug&FF_DEBUG_THREADS) 00571 av_log(f->owner, AV_LOG_DEBUG, "%p finished %d field %d\n", progress, n, field); 00572 00573 pthread_mutex_lock(&p->progress_mutex); 00574 progress[field] = n; 00575 pthread_cond_broadcast(&p->progress_cond); 00576 pthread_mutex_unlock(&p->progress_mutex); 00577 } 00578 00579 void ff_thread_await_progress(AVFrame *f, int n, int field) 00580 { 00581 PerThreadContext *p; 00582 int *progress = f->thread_opaque; 00583 00584 if (!progress || progress[field] >= n) return; 00585 00586 p = f->owner->thread_opaque; 00587 00588 if (f->owner->debug&FF_DEBUG_THREADS) 00589 av_log(f->owner, AV_LOG_DEBUG, "thread awaiting %d field %d from %p\n", n, field, progress); 00590 00591 pthread_mutex_lock(&p->progress_mutex); 00592 while (progress[field] < n) 00593 pthread_cond_wait(&p->progress_cond, &p->progress_mutex); 00594 pthread_mutex_unlock(&p->progress_mutex); 00595 } 00596 00597 void ff_thread_finish_setup(AVCodecContext *avctx) { 00598 PerThreadContext *p = avctx->thread_opaque; 00599 00600 if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return; 00601 00602 pthread_mutex_lock(&p->progress_mutex); 00603 p->state = STATE_SETUP_FINISHED; 00604 pthread_cond_broadcast(&p->progress_cond); 00605 pthread_mutex_unlock(&p->progress_mutex); 00606 } 00607 00609 static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count) 00610 { 00611 int i; 00612 00613 for (i = 0; i < thread_count; i++) { 00614 PerThreadContext *p = &fctx->threads[i]; 00615 00616 if (p->state != STATE_INPUT_READY) { 00617 pthread_mutex_lock(&p->progress_mutex); 00618 while (p->state != STATE_INPUT_READY) 00619 pthread_cond_wait(&p->output_cond, &p->progress_mutex); 00620 pthread_mutex_unlock(&p->progress_mutex); 00621 } 00622 } 00623 } 00624 00625 static void frame_thread_free(AVCodecContext *avctx, int thread_count) 00626 { 00627 FrameThreadContext *fctx = avctx->thread_opaque; 00628 AVCodec *codec = avctx->codec; 00629 int i; 00630 00631 park_frame_worker_threads(fctx, thread_count); 00632 00633 if (fctx->prev_thread) 00634 update_context_from_thread(fctx->threads->avctx, fctx->prev_thread->avctx, 0); 00635 00636 fctx->die = 1; 00637 00638 for (i = 0; i < thread_count; i++) { 00639 PerThreadContext *p = &fctx->threads[i]; 00640 00641 pthread_mutex_lock(&p->mutex); 00642 pthread_cond_signal(&p->input_cond); 00643 pthread_mutex_unlock(&p->mutex); 00644 00645 pthread_join(p->thread, NULL); 00646 00647 if (codec->close) 00648 codec->close(p->avctx); 00649 00650 avctx->codec = NULL; 00651 00652 release_delayed_buffers(p); 00653 } 00654 00655 for (i = 0; i < thread_count; i++) { 00656 PerThreadContext *p = &fctx->threads[i]; 00657 00658 avcodec_default_free_buffers(p->avctx); 00659 00660 pthread_mutex_destroy(&p->mutex); 00661 pthread_mutex_destroy(&p->progress_mutex); 00662 pthread_cond_destroy(&p->input_cond); 00663 pthread_cond_destroy(&p->progress_cond); 00664 pthread_cond_destroy(&p->output_cond); 00665 av_freep(&p->avpkt.data); 00666 00667 if (i) 00668 av_freep(&p->avctx->priv_data); 00669 00670 av_freep(&p->avctx); 00671 } 00672 00673 av_freep(&fctx->threads); 00674 pthread_mutex_destroy(&fctx->buffer_mutex); 00675 av_freep(&avctx->thread_opaque); 00676 } 00677 00678 static int frame_thread_init(AVCodecContext *avctx) 00679 { 00680 int thread_count = avctx->thread_count; 00681 AVCodec *codec = avctx->codec; 00682 AVCodecContext *src = avctx; 00683 FrameThreadContext *fctx; 00684 int i, err = 0; 00685 00686 if (thread_count <= 1) { 00687 avctx->active_thread_type = 0; 00688 return 0; 00689 } 00690 00691 avctx->thread_opaque = fctx = av_mallocz(sizeof(FrameThreadContext)); 00692 00693 fctx->threads = av_mallocz(sizeof(PerThreadContext) * thread_count); 00694 pthread_mutex_init(&fctx->buffer_mutex, NULL); 00695 fctx->delaying = 1; 00696 00697 for (i = 0; i < thread_count; i++) { 00698 AVCodecContext *copy = av_malloc(sizeof(AVCodecContext)); 00699 PerThreadContext *p = &fctx->threads[i]; 00700 00701 pthread_mutex_init(&p->mutex, NULL); 00702 pthread_mutex_init(&p->progress_mutex, NULL); 00703 pthread_cond_init(&p->input_cond, NULL); 00704 pthread_cond_init(&p->progress_cond, NULL); 00705 pthread_cond_init(&p->output_cond, NULL); 00706 00707 p->parent = fctx; 00708 p->avctx = copy; 00709 00710 *copy = *src; 00711 copy->thread_opaque = p; 00712 copy->pkt = &p->avpkt; 00713 00714 if (!i) { 00715 src = copy; 00716 00717 if (codec->init) 00718 err = codec->init(copy); 00719 00720 update_context_from_thread(avctx, copy, 1); 00721 } else { 00722 copy->is_copy = 1; 00723 copy->priv_data = av_malloc(codec->priv_data_size); 00724 memcpy(copy->priv_data, src->priv_data, codec->priv_data_size); 00725 00726 if (codec->init_thread_copy) 00727 err = codec->init_thread_copy(copy); 00728 } 00729 00730 if (err) goto error; 00731 00732 pthread_create(&p->thread, NULL, frame_worker_thread, p); 00733 } 00734 00735 return 0; 00736 00737 error: 00738 frame_thread_free(avctx, i+1); 00739 00740 return err; 00741 } 00742 00743 void ff_thread_flush(AVCodecContext *avctx) 00744 { 00745 FrameThreadContext *fctx = avctx->thread_opaque; 00746 00747 if (!avctx->thread_opaque) return; 00748 00749 park_frame_worker_threads(fctx, avctx->thread_count); 00750 if (fctx->prev_thread) { 00751 if (fctx->prev_thread != &fctx->threads[0]) 00752 update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0); 00753 if (avctx->codec->flush) 00754 avctx->codec->flush(fctx->threads[0].avctx); 00755 } 00756 00757 fctx->next_decoding = fctx->next_finished = 0; 00758 fctx->delaying = 1; 00759 fctx->prev_thread = NULL; 00760 } 00761 00762 static int *allocate_progress(PerThreadContext *p) 00763 { 00764 int i; 00765 00766 for (i = 0; i < MAX_BUFFERS; i++) 00767 if (!p->progress_used[i]) break; 00768 00769 if (i == MAX_BUFFERS) { 00770 av_log(p->avctx, AV_LOG_ERROR, "allocate_progress() overflow\n"); 00771 return NULL; 00772 } 00773 00774 p->progress_used[i] = 1; 00775 00776 return p->progress[i]; 00777 } 00778 00779 int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f) 00780 { 00781 PerThreadContext *p = avctx->thread_opaque; 00782 int *progress, err; 00783 00784 f->owner = avctx; 00785 00786 if (!(avctx->active_thread_type&FF_THREAD_FRAME)) { 00787 f->thread_opaque = NULL; 00788 return avctx->get_buffer(avctx, f); 00789 } 00790 00791 if (p->state != STATE_SETTING_UP && 00792 (avctx->codec->update_thread_context || !avctx->thread_safe_callbacks)) { 00793 av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n"); 00794 return -1; 00795 } 00796 00797 pthread_mutex_lock(&p->parent->buffer_mutex); 00798 f->thread_opaque = progress = allocate_progress(p); 00799 00800 if (!progress) { 00801 pthread_mutex_unlock(&p->parent->buffer_mutex); 00802 return -1; 00803 } 00804 00805 progress[0] = 00806 progress[1] = -1; 00807 00808 if (avctx->thread_safe_callbacks || 00809 avctx->get_buffer == avcodec_default_get_buffer) { 00810 err = avctx->get_buffer(avctx, f); 00811 } else { 00812 p->requested_frame = f; 00813 p->state = STATE_GET_BUFFER; 00814 pthread_mutex_lock(&p->progress_mutex); 00815 pthread_cond_signal(&p->progress_cond); 00816 00817 while (p->state != STATE_SETTING_UP) 00818 pthread_cond_wait(&p->progress_cond, &p->progress_mutex); 00819 00820 err = p->result; 00821 00822 pthread_mutex_unlock(&p->progress_mutex); 00823 00824 if (!avctx->codec->update_thread_context) 00825 ff_thread_finish_setup(avctx); 00826 } 00827 00828 pthread_mutex_unlock(&p->parent->buffer_mutex); 00829 00830 /* 00831 * Buffer age is difficult to keep track of between 00832 * multiple threads, and the optimizations it allows 00833 * are not worth the effort. It is disabled for now. 00834 */ 00835 f->age = INT_MAX; 00836 00837 return err; 00838 } 00839 00840 void ff_thread_release_buffer(AVCodecContext *avctx, AVFrame *f) 00841 { 00842 PerThreadContext *p = avctx->thread_opaque; 00843 FrameThreadContext *fctx; 00844 00845 if (!(avctx->active_thread_type&FF_THREAD_FRAME)) { 00846 avctx->release_buffer(avctx, f); 00847 return; 00848 } 00849 00850 if (p->num_released_buffers >= MAX_BUFFERS) { 00851 av_log(p->avctx, AV_LOG_ERROR, "too many thread_release_buffer calls!\n"); 00852 return; 00853 } 00854 00855 if(avctx->debug & FF_DEBUG_BUFFERS) 00856 av_log(avctx, AV_LOG_DEBUG, "thread_release_buffer called on pic %p, %d buffers used\n", 00857 f, f->owner->internal_buffer_count); 00858 00859 fctx = p->parent; 00860 pthread_mutex_lock(&fctx->buffer_mutex); 00861 p->released_buffers[p->num_released_buffers++] = *f; 00862 pthread_mutex_unlock(&fctx->buffer_mutex); 00863 memset(f->data, 0, sizeof(f->data)); 00864 } 00865 00875 static void validate_thread_parameters(AVCodecContext *avctx) 00876 { 00877 int frame_threading_supported = (avctx->codec->capabilities & CODEC_CAP_FRAME_THREADS) 00878 && !(avctx->flags & CODEC_FLAG_TRUNCATED) 00879 && !(avctx->flags & CODEC_FLAG_LOW_DELAY) 00880 && !(avctx->flags2 & CODEC_FLAG2_CHUNKS); 00881 if (avctx->thread_count == 1) { 00882 avctx->active_thread_type = 0; 00883 } else if (frame_threading_supported && (avctx->thread_type & FF_THREAD_FRAME)) { 00884 avctx->active_thread_type = FF_THREAD_FRAME; 00885 } else if (avctx->codec->capabilities & CODEC_CAP_SLICE_THREADS && 00886 avctx->thread_type & FF_THREAD_SLICE) { 00887 avctx->active_thread_type = FF_THREAD_SLICE; 00888 } 00889 } 00890 00891 int ff_thread_init(AVCodecContext *avctx) 00892 { 00893 if (avctx->thread_opaque) { 00894 av_log(avctx, AV_LOG_ERROR, "avcodec_thread_init is ignored after avcodec_open\n"); 00895 return -1; 00896 } 00897 00898 if (avctx->codec) { 00899 validate_thread_parameters(avctx); 00900 00901 if (avctx->active_thread_type&FF_THREAD_SLICE) 00902 return thread_init(avctx); 00903 else if (avctx->active_thread_type&FF_THREAD_FRAME) 00904 return frame_thread_init(avctx); 00905 } 00906 00907 return 0; 00908 } 00909 00910 void ff_thread_free(AVCodecContext *avctx) 00911 { 00912 if (avctx->active_thread_type&FF_THREAD_FRAME) 00913 frame_thread_free(avctx, avctx->thread_count); 00914 else 00915 thread_free(avctx); 00916 }