/* * This file is part of FFmpeg. * * FFmpeg is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * FFmpeg is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with FFmpeg; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include <stdatomic.h> #include "cpu.h" #include "slicethread.h" #include "mem.h" #include "thread.h" #include "avassert.h" #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS typedef struct WorkerContext { AVSliceThread *ctx; pthread_mutex_t mutex; pthread_cond_t cond; pthread_t thread; int done; } WorkerContext; struct AVSliceThread { WorkerContext *workers; int nb_threads; int nb_active_threads; int nb_jobs; atomic_uint first_job; atomic_uint current_job; pthread_mutex_t done_mutex; pthread_cond_t done_cond; int done; int finished; void *priv; void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads); void (*main_func)(void *priv); }; static int run_jobs(AVSliceThread *ctx) { unsigned nb_jobs = ctx->nb_jobs; unsigned nb_active_threads = ctx->nb_active_threads; unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel); unsigned current_job = first_job; do { ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads); } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs); return current_job == nb_jobs + nb_active_threads - 1; } static void *attribute_align_arg thread_worker(void *v) { WorkerContext *w = v; AVSliceThread *ctx = w->ctx; pthread_mutex_lock(&w->mutex); pthread_cond_signal(&w->cond); while (1) { w->done = 1; while (w->done) pthread_cond_wait(&w->cond, &w->mutex); if (ctx->finished) { pthread_mutex_unlock(&w->mutex); return NULL; } if (run_jobs(ctx)) { pthread_mutex_lock(&ctx->done_mutex); ctx->done = 1; pthread_cond_signal(&ctx->done_cond); pthread_mutex_unlock(&ctx->done_mutex); } } } int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), void (*main_func)(void *priv), int nb_threads) { AVSliceThread *ctx; int nb_workers, i; av_assert0(nb_threads >= 0); if (!nb_threads) { int nb_cpus = av_cpu_count(); if (nb_cpus > 1) nb_threads = nb_cpus + 1; else nb_threads = 1; } nb_workers = nb_threads; if (!main_func) nb_workers--; *pctx = ctx = av_mallocz(sizeof(*ctx)); if (!ctx) return AVERROR(ENOMEM); if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) { av_freep(pctx); return AVERROR(ENOMEM); } ctx->priv = priv; ctx->worker_func = worker_func; ctx->main_func = main_func; ctx->nb_threads = nb_threads; ctx->nb_active_threads = 0; ctx->nb_jobs = 0; ctx->finished = 0; atomic_init(&ctx->first_job, 0); atomic_init(&ctx->current_job, 0); pthread_mutex_init(&ctx->done_mutex, NULL); pthread_cond_init(&ctx->done_cond, NULL); ctx->done = 0; for (i = 0; i < nb_workers; i++) { WorkerContext *w = &ctx->workers[i]; int ret; w->ctx = ctx; pthread_mutex_init(&w->mutex, NULL); pthread_cond_init(&w->cond, NULL); pthread_mutex_lock(&w->mutex); w->done = 0; if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) { ctx->nb_threads = main_func ? i : i + 1; pthread_mutex_unlock(&w->mutex); pthread_cond_destroy(&w->cond); pthread_mutex_destroy(&w->mutex); avpriv_slicethread_free(pctx); return AVERROR(ret); } while (!w->done) pthread_cond_wait(&w->cond, &w->mutex); pthread_mutex_unlock(&w->mutex); } return nb_threads; } void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) { int nb_workers, i, is_last = 0; av_assert0(nb_jobs > 0); ctx->nb_jobs = nb_jobs; ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads); atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed); atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed); nb_workers = ctx->nb_active_threads; if (!ctx->main_func || !execute_main) nb_workers--; for (i = 0; i < nb_workers; i++) { WorkerContext *w = &ctx->workers[i]; pthread_mutex_lock(&w->mutex); w->done = 0; pthread_cond_signal(&w->cond); pthread_mutex_unlock(&w->mutex); } if (ctx->main_func && execute_main) ctx->main_func(ctx->priv); else is_last = run_jobs(ctx); if (!is_last) { pthread_mutex_lock(&ctx->done_mutex); while (!ctx->done) pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex); ctx->done = 0; pthread_mutex_unlock(&ctx->done_mutex); } } void avpriv_slicethread_free(AVSliceThread **pctx) { AVSliceThread *ctx; int nb_workers, i; if (!pctx || !*pctx) return; ctx = *pctx; nb_workers = ctx->nb_threads; if (!ctx->main_func) nb_workers--; ctx->finished = 1; for (i = 0; i < nb_workers; i++) { WorkerContext *w = &ctx->workers[i]; pthread_mutex_lock(&w->mutex); w->done = 0; pthread_cond_signal(&w->cond); pthread_mutex_unlock(&w->mutex); } for (i = 0; i < nb_workers; i++) { WorkerContext *w = &ctx->workers[i]; pthread_join(w->thread, NULL); pthread_cond_destroy(&w->cond); pthread_mutex_destroy(&w->mutex); } pthread_cond_destroy(&ctx->done_cond); pthread_mutex_destroy(&ctx->done_mutex); av_freep(&ctx->workers); av_freep(pctx); } #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */ int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), void (*main_func)(void *priv), int nb_threads) { *pctx = NULL; return AVERROR(ENOSYS); } void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) { av_assert0(0); } void avpriv_slicethread_free(AVSliceThread **pctx) { av_assert0(!pctx || !*pctx); } #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */