From 129bb238430ec45a3b5f8f1d384df590ddf7b62f Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Sat, 11 May 2013 20:41:46 +0200 Subject: [PATCH] lavfi: add a slice threading infrastructure Mostly based on libavcodec's --- Changelog | 1 + cmdutils.c | 3 + doc/APIchanges | 7 ++ libavfilter/Makefile | 2 + libavfilter/avfilter.c | 46 ++++++++ libavfilter/avfilter.h | 63 ++++++++++ libavfilter/avfiltergraph.c | 50 ++++++++ libavfilter/internal.h | 12 ++ libavfilter/pthread.c | 229 ++++++++++++++++++++++++++++++++++++ libavfilter/thread.h | 31 +++++ libavfilter/version.h | 2 +- 11 files changed, 445 insertions(+), 1 deletion(-) create mode 100644 libavfilter/pthread.c create mode 100644 libavfilter/thread.h diff --git a/Changelog b/Changelog index 49bb2363f3..b8fbca2c4a 100644 --- a/Changelog +++ b/Changelog @@ -20,6 +20,7 @@ version 10: - avconv -deinterlace option removed, the yadif filter should be used instead - Apple Intermediate Codec decoder - Escape 130 video decoder +- support for slice multithreading in libavfilter version 9: diff --git a/cmdutils.c b/cmdutils.c index 1ff964b521..ea86b83415 100644 --- a/cmdutils.c +++ b/cmdutils.c @@ -1284,6 +1284,9 @@ static void show_help_filter(const char *name) printf("Filter %s [%s]:\n", f->name, f->description); + if (f->flags & AVFILTER_FLAG_SLICE_THREADS) + printf(" slice threading supported\n"); + printf(" Inputs:\n"); count = avfilter_pad_count(f->inputs); for (i = 0; i < count; i++) { diff --git a/doc/APIchanges b/doc/APIchanges index f7279c6c52..0957f9e246 100644 --- a/doc/APIchanges +++ b/doc/APIchanges @@ -13,6 +13,13 @@ libavutil: 2012-10-22 API changes, most recent first: +2013-05-xx - xxxxxxx - lavfi 3.10.0 - avfilter.h + Add support for slice multithreading to lavfi. Filters supporting threading + are marked with AVFILTER_FLAG_SLICE_THREADS. + New fields AVFilterContext.thread_type, AVFilterGraph.thread_type and + AVFilterGraph.nb_threads (accessible directly or through AVOptions) may be + used to configure multithreading. + 2013-xx-xx - xxxxxxx - lavu 52.12.0 - cpu.h Add av_cpu_count() function for getting the number of logical CPUs. diff --git a/libavfilter/Makefile b/libavfilter/Makefile index 7555b49428..96fa8c0c79 100644 --- a/libavfilter/Makefile +++ b/libavfilter/Makefile @@ -92,5 +92,7 @@ OBJS-$(CONFIG_TESTSRC_FILTER) += vsrc_testsrc.o OBJS-$(CONFIG_NULLSINK_FILTER) += vsink_nullsink.o +OBJS-$(HAVE_THREADS) += pthread.o + TOOLS = graph2dot TESTPROGS = filtfmts diff --git a/libavfilter/avfilter.c b/libavfilter/avfilter.c index b7913a12cc..fd01f19de4 100644 --- a/libavfilter/avfilter.c +++ b/libavfilter/avfilter.c @@ -352,14 +352,37 @@ static const AVClass *filter_child_class_next(const AVClass *prev) return NULL; } +#define OFFSET(x) offsetof(AVFilterContext, x) +#define FLAGS AV_OPT_FLAG_VIDEO_PARAM +static const AVOption options[] = { + { "thread_type", "Allowed thread types", OFFSET(thread_type), AV_OPT_TYPE_FLAGS, + { .i64 = AVFILTER_THREAD_SLICE }, 0, INT_MAX, FLAGS, "thread_type" }, + { "slice", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = AVFILTER_THREAD_SLICE }, .unit = "thread_type" }, + { NULL }, +}; + static const AVClass avfilter_class = { .class_name = "AVFilter", .item_name = filter_name, .version = LIBAVUTIL_VERSION_INT, .child_next = filter_child_next, .child_class_next = filter_child_class_next, + .option = options, }; +static int default_execute(AVFilterContext *ctx, action_func *func, void *arg, + int *ret, int nb_jobs) +{ + int i; + + for (i = 0; i < nb_jobs; i++) { + int r = func(ctx, arg, i, nb_jobs); + if (ret) + ret[i] = r; + } + return 0; +} + AVFilterContext *ff_filter_alloc(const AVFilter *filter, const char *inst_name) { AVFilterContext *ret; @@ -380,11 +403,17 @@ AVFilterContext *ff_filter_alloc(const AVFilter *filter, const char *inst_name) goto err; } + av_opt_set_defaults(ret); if (filter->priv_class) { *(const AVClass**)ret->priv = filter->priv_class; av_opt_set_defaults(ret->priv); } + ret->internal = av_mallocz(sizeof(*ret->internal)); + if (!ret->internal) + goto err; + ret->internal->execute = default_execute; + ret->nb_inputs = avfilter_pad_count(filter->inputs); if (ret->nb_inputs ) { ret->input_pads = av_malloc(sizeof(AVFilterPad) * ret->nb_inputs); @@ -421,6 +450,7 @@ err: av_freep(&ret->output_pads); ret->nb_outputs = 0; av_freep(&ret->priv); + av_freep(&ret->internal); av_free(ret); return NULL; } @@ -478,6 +508,7 @@ void avfilter_free(AVFilterContext *filter) av_freep(&filter->inputs); av_freep(&filter->outputs); av_freep(&filter->priv); + av_freep(&filter->internal); av_free(filter); } @@ -525,6 +556,21 @@ int avfilter_init_dict(AVFilterContext *ctx, AVDictionary **options) { int ret = 0; + ret = av_opt_set_dict(ctx, options); + if (ret < 0) { + av_log(ctx, AV_LOG_ERROR, "Error applying generic filter options.\n"); + return ret; + } + + if (ctx->filter->flags & AVFILTER_FLAG_SLICE_THREADS && + ctx->thread_type & ctx->graph->thread_type & AVFILTER_THREAD_SLICE && + ctx->graph->internal->thread_execute) { + ctx->thread_type = AVFILTER_THREAD_SLICE; + ctx->internal->execute = ctx->graph->internal->thread_execute; + } else { + ctx->thread_type = 0; + } + if (ctx->filter->priv_class) { ret = av_opt_set_dict(ctx->priv, options); if (ret < 0) { diff --git a/libavfilter/avfilter.h b/libavfilter/avfilter.h index 9baf64e941..5717774098 100644 --- a/libavfilter/avfilter.h +++ b/libavfilter/avfilter.h @@ -401,6 +401,11 @@ enum AVMediaType avfilter_pad_get_type(const AVFilterPad *pads, int pad_idx); * the options supplied to it. */ #define AVFILTER_FLAG_DYNAMIC_OUTPUTS (1 << 1) +/** + * The filter supports multithreading by splitting frames into multiple parts + * and processing them concurrently. + */ +#define AVFILTER_FLAG_SLICE_THREADS (1 << 2) /** * Filter definition. This defines the pads a filter contains, and all the @@ -472,6 +477,13 @@ typedef struct AVFilter { struct AVFilter *next; } AVFilter; +/** + * Process multiple parts of the frame concurrently. + */ +#define AVFILTER_THREAD_SLICE (1 << 0) + +typedef struct AVFilterInternal AVFilterInternal; + /** An instance of a filter */ struct AVFilterContext { const AVClass *av_class; ///< needed for av_log() @@ -497,6 +509,29 @@ struct AVFilterContext { void *priv; ///< private data for use by the filter struct AVFilterGraph *graph; ///< filtergraph this filter belongs to + + /** + * Type of multithreading being allowed/used. A combination of + * AVFILTER_THREAD_* flags. + * + * May be set by the caller before initializing the filter to forbid some + * or all kinds of multithreading for this filter. The default is allowing + * everything. + * + * When the filter is initialized, this field is combined using bit AND with + * AVFilterGraph.thread_type to get the final mask used for determining + * allowed threading types. I.e. a threading type needs to be set in both + * to be allowed. + * + * After the filter is initialzed, libavfilter sets this field to the + * threading type that is actually used (0 for no multithreading). + */ + int thread_type; + + /** + * An opaque struct for libavfilter internal use. + */ + AVFilterInternal *internal; }; /** @@ -793,6 +828,8 @@ int avfilter_copy_buf_props(AVFrame *dst, const AVFilterBufferRef *src); */ const AVClass *avfilter_get_class(void); +typedef struct AVFilterGraphInternal AVFilterGraphInternal; + typedef struct AVFilterGraph { const AVClass *av_class; #if FF_API_FOO_COUNT @@ -809,6 +846,32 @@ typedef struct AVFilterGraph { #if FF_API_FOO_COUNT unsigned nb_filters; #endif + + /** + * Type of multithreading allowed for filters in this graph. A combination + * of AVFILTER_THREAD_* flags. + * + * May be set by the caller at any point, the setting will apply to all + * filters initialized after that. The default is allowing everything. + * + * When a filter in this graph is initialized, this field is combined using + * bit AND with AVFilterContext.thread_type to get the final mask used for + * determining allowed threading types. I.e. a threading type needs to be + * set in both to be allowed. + */ + int thread_type; + + /** + * Maximum number of threads used by filters in this graph. May be set by + * the caller before adding any filters to the filtergraph. Zero (the + * default) means that the number of threads is determined automatically. + */ + int nb_threads; + + /** + * Opaque object for libavfilter internal use. + */ + AVFilterGraphInternal *internal; } AVFilterGraph; /** diff --git a/libavfilter/avfiltergraph.c b/libavfilter/avfiltergraph.c index 9e4c407b1e..e93a5bb415 100644 --- a/libavfilter/avfiltergraph.c +++ b/libavfilter/avfiltergraph.c @@ -20,6 +20,8 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include "config.h" + #include #include "libavutil/avassert.h" @@ -27,22 +29,59 @@ #include "libavutil/channel_layout.h" #include "libavutil/common.h" #include "libavutil/log.h" +#include "libavutil/opt.h" + #include "avfilter.h" #include "formats.h" #include "internal.h" +#include "thread.h" + +#define OFFSET(x) offsetof(AVFilterGraph, x) +#define FLAGS AV_OPT_FLAG_VIDEO_PARAM +static const AVOption filtergraph_options[] = { + { "thread_type", "Allowed thread types", OFFSET(thread_type), AV_OPT_TYPE_FLAGS, + { .i64 = AVFILTER_THREAD_SLICE }, 0, INT_MAX, FLAGS, "thread_type" }, + { "slice", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = AVFILTER_THREAD_SLICE }, .flags = FLAGS, .unit = "thread_type" }, + { "threads", "Maximum number of threads", OFFSET(nb_threads), + AV_OPT_TYPE_INT, { .i64 = 0 }, 0, INT_MAX, FLAGS }, + { NULL }, +}; static const AVClass filtergraph_class = { .class_name = "AVFilterGraph", .item_name = av_default_item_name, .version = LIBAVUTIL_VERSION_INT, + .option = filtergraph_options, }; +#if !HAVE_THREADS +void ff_graph_thread_free(AVFilterGraph *graph) +{ +} + +int ff_graph_thread_init(AVFilterGraph *graph) +{ + graph->thread_type = 0; + graph->nb_threads = 1; + return 0; +} +#endif + AVFilterGraph *avfilter_graph_alloc(void) { AVFilterGraph *ret = av_mallocz(sizeof(*ret)); if (!ret) return NULL; + + ret->internal = av_mallocz(sizeof(*ret->internal)); + if (!ret->internal) { + av_freep(&ret); + return NULL; + } + ret->av_class = &filtergraph_class; + av_opt_set_defaults(ret); + return ret; } @@ -67,9 +106,12 @@ void avfilter_graph_free(AVFilterGraph **graph) while ((*graph)->nb_filters) avfilter_free((*graph)->filters[0]); + ff_graph_thread_free(*graph); + av_freep(&(*graph)->scale_sws_opts); av_freep(&(*graph)->resample_lavr_opts); av_freep(&(*graph)->filters); + av_freep(&(*graph)->internal); av_freep(graph); } @@ -123,6 +165,14 @@ AVFilterContext *avfilter_graph_alloc_filter(AVFilterGraph *graph, { AVFilterContext **filters, *s; + if (graph->thread_type && !graph->internal->thread) { + int ret = ff_graph_thread_init(graph); + if (ret < 0) { + av_log(graph, AV_LOG_ERROR, "Error initializing threading.\n"); + return NULL; + } + } + s = ff_filter_alloc(filter, name); if (!s) return NULL; diff --git a/libavfilter/internal.h b/libavfilter/internal.h index 8e8a13f084..bdbbe4498a 100644 --- a/libavfilter/internal.h +++ b/libavfilter/internal.h @@ -25,6 +25,7 @@ */ #include "avfilter.h" +#include "thread.h" #if !FF_API_AVFILTERPAD_PUBLIC /** @@ -117,6 +118,17 @@ struct AVFilterPad { }; #endif +struct AVFilterGraphInternal { + void *thread; + int (*thread_execute)(AVFilterContext *ctx, action_func *func, void *arg, + int *ret, int nb_jobs); +}; + +struct AVFilterInternal { + int (*execute)(AVFilterContext *ctx, action_func *func, void *arg, + int *ret, int nb_jobs); +}; + /** default handler for freeing audio/video buffer when there are no references left */ void ff_avfilter_default_free_buffer(AVFilterBuffer *buf); diff --git a/libavfilter/pthread.c b/libavfilter/pthread.c new file mode 100644 index 0000000000..374c1c32c3 --- /dev/null +++ b/libavfilter/pthread.c @@ -0,0 +1,229 @@ +/* + * + * This file is part of Libav. + * + * Libav is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * Libav is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with Libav; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file + * Libavfilter multithreading support + */ + +#include "config.h" + +#include "libavutil/common.h" +#include "libavutil/cpu.h" +#include "libavutil/mem.h" + +#include "avfilter.h" +#include "internal.h" +#include "thread.h" + +#if HAVE_PTHREADS +#include +#elif HAVE_W32THREADS +#include "compat/w32pthreads.h" +#endif + +typedef struct ThreadContext { + AVFilterGraph *graph; + + int nb_threads; + pthread_t *workers; + action_func *func; + + /* per-execute perameters */ + AVFilterContext *ctx; + void *arg; + int *rets; + int nb_rets; + int nb_jobs; + + pthread_cond_t last_job_cond; + pthread_cond_t current_job_cond; + pthread_mutex_t current_job_lock; + int current_job; + int done; +} ThreadContext; + +static void* attribute_align_arg worker(void *v) +{ + ThreadContext *c = v; + int our_job = c->nb_jobs; + int nb_threads = c->nb_threads; + int self_id; + + pthread_mutex_lock(&c->current_job_lock); + self_id = c->current_job++; + for (;;) { + while (our_job >= c->nb_jobs) { + if (c->current_job == nb_threads + c->nb_jobs) + pthread_cond_signal(&c->last_job_cond); + + pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); + our_job = self_id; + + if (c->done) { + pthread_mutex_unlock(&c->current_job_lock); + return NULL; + } + } + pthread_mutex_unlock(&c->current_job_lock); + + c->rets[our_job % c->nb_rets] = c->func(c->ctx, c->arg, our_job, c->nb_jobs); + + pthread_mutex_lock(&c->current_job_lock); + our_job = c->current_job++; + } +} + +static void slice_thread_uninit(ThreadContext *c) +{ + int i; + + pthread_mutex_lock(&c->current_job_lock); + c->done = 1; + pthread_cond_broadcast(&c->current_job_cond); + pthread_mutex_unlock(&c->current_job_lock); + + for (i = 0; i < c->nb_threads; i++) + pthread_join(c->workers[i], NULL); + + pthread_mutex_destroy(&c->current_job_lock); + pthread_cond_destroy(&c->current_job_cond); + pthread_cond_destroy(&c->last_job_cond); + av_freep(&c->workers); +} + +static void slice_thread_park_workers(ThreadContext *c) +{ + pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); + pthread_mutex_unlock(&c->current_job_lock); +} + +static int thread_execute(AVFilterContext *ctx, action_func *func, + void *arg, int *ret, int nb_jobs) +{ + ThreadContext *c = ctx->graph->internal->thread; + int dummy_ret; + + if (nb_jobs <= 0) + return 0; + + pthread_mutex_lock(&c->current_job_lock); + + c->current_job = c->nb_threads; + c->nb_jobs = nb_jobs; + c->ctx = ctx; + c->arg = arg; + c->func = func; + if (ret) { + c->rets = ret; + c->nb_rets = nb_jobs; + } else { + c->rets = &dummy_ret; + c->nb_rets = 1; + } + pthread_cond_broadcast(&c->current_job_cond); + + slice_thread_park_workers(c); + + return 0; +} + +static int thread_init(ThreadContext *c, int nb_threads) +{ + int i, ret; + + if (!nb_threads) { + int nb_cpus = av_cpu_count(); + av_log(c->graph, AV_LOG_DEBUG, "Detected %d logical cores.\n", nb_cpus); + // use number of cores + 1 as thread count if there is more than one + if (nb_cpus > 1) + nb_threads = nb_cpus + 1; + else + nb_threads = 1; + } + + if (nb_threads <= 1) + return 1; + + c->nb_threads = nb_threads; + c->workers = av_mallocz(sizeof(*c->workers) * nb_threads); + if (!c->workers) + return AVERROR(ENOMEM); + + c->current_job = 0; + c->nb_jobs = 0; + c->done = 0; + + pthread_cond_init(&c->current_job_cond, NULL); + pthread_cond_init(&c->last_job_cond, NULL); + + pthread_mutex_init(&c->current_job_lock, NULL); + pthread_mutex_lock(&c->current_job_lock); + for (i = 0; i < nb_threads; i++) { + ret = pthread_create(&c->workers[i], NULL, worker, c); + if (ret) { + pthread_mutex_unlock(&c->current_job_lock); + c->nb_threads = i; + slice_thread_uninit(c); + return AVERROR(ret); + } + } + + slice_thread_park_workers(c); + + return c->nb_threads; +} + +int ff_graph_thread_init(AVFilterGraph *graph) +{ + int ret; + +#if HAVE_W32THREADS + w32thread_init(); +#endif + + if (graph->nb_threads == 1) { + graph->thread_type = 0; + return 0; + } + + graph->internal->thread = av_mallocz(sizeof(ThreadContext)); + if (!graph->internal->thread) + return AVERROR(ENOMEM); + + ret = thread_init(graph->internal->thread, graph->nb_threads); + if (ret <= 1) { + av_freep(&graph->internal->thread); + graph->thread_type = 0; + graph->nb_threads = 1; + return (ret < 0) ? ret : 0; + } + graph->nb_threads = ret; + + graph->internal->thread_execute = thread_execute; + + return 0; +} + +void ff_graph_thread_free(AVFilterGraph *graph) +{ + if (graph->internal->thread) + slice_thread_uninit(graph->internal->thread); + av_freep(&graph->internal->thread); +} diff --git a/libavfilter/thread.h b/libavfilter/thread.h new file mode 100644 index 0000000000..49134d948a --- /dev/null +++ b/libavfilter/thread.h @@ -0,0 +1,31 @@ +/* + * + * This file is part of Libav. + * + * Libav is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * Libav is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with Libav; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef AVFILTER_THREAD_H +#define AVFILTER_THREAD_H + +#include "avfilter.h" + +typedef int (action_func)(AVFilterContext *ctx, void *arg, int jobnr, int nb_jobs); + +int ff_graph_thread_init(AVFilterGraph *graph); + +void ff_graph_thread_free(AVFilterGraph *graph); + +#endif /* AVFILTER_THREAD_H */ diff --git a/libavfilter/version.h b/libavfilter/version.h index c8e968996f..cb69228213 100644 --- a/libavfilter/version.h +++ b/libavfilter/version.h @@ -30,7 +30,7 @@ #include "libavutil/avutil.h" #define LIBAVFILTER_VERSION_MAJOR 3 -#define LIBAVFILTER_VERSION_MINOR 9 +#define LIBAVFILTER_VERSION_MINOR 10 #define LIBAVFILTER_VERSION_MICRO 0 #define LIBAVFILTER_VERSION_INT AV_VERSION_INT(LIBAVFILTER_VERSION_MAJOR, \