|
|
|
@ -16,17 +16,21 @@ |
|
|
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
#include <stdatomic.h> |
|
|
|
|
#include <stdio.h> |
|
|
|
|
#include <string.h> |
|
|
|
|
|
|
|
|
|
#include "ffmpeg.h" |
|
|
|
|
#include "objpool.h" |
|
|
|
|
#include "sync_queue.h" |
|
|
|
|
#include "thread_queue.h" |
|
|
|
|
|
|
|
|
|
#include "libavutil/fifo.h" |
|
|
|
|
#include "libavutil/intreadwrite.h" |
|
|
|
|
#include "libavutil/log.h" |
|
|
|
|
#include "libavutil/mem.h" |
|
|
|
|
#include "libavutil/timestamp.h" |
|
|
|
|
#include "libavutil/thread.h" |
|
|
|
|
|
|
|
|
|
#include "libavcodec/packet.h" |
|
|
|
|
|
|
|
|
@ -51,13 +55,18 @@ typedef struct MuxStream { |
|
|
|
|
struct Muxer { |
|
|
|
|
AVFormatContext *fc; |
|
|
|
|
|
|
|
|
|
pthread_t thread; |
|
|
|
|
ThreadQueue *tq; |
|
|
|
|
|
|
|
|
|
MuxStream *streams; |
|
|
|
|
|
|
|
|
|
AVDictionary *opts; |
|
|
|
|
|
|
|
|
|
int thread_queue_size; |
|
|
|
|
|
|
|
|
|
/* filesize limit expressed in bytes */ |
|
|
|
|
int64_t limit_filesize; |
|
|
|
|
int64_t final_filesize; |
|
|
|
|
atomic_int_least64_t last_filesize; |
|
|
|
|
int header_written; |
|
|
|
|
|
|
|
|
|
AVPacket *sq_pkt; |
|
|
|
@ -65,15 +74,6 @@ struct Muxer { |
|
|
|
|
|
|
|
|
|
static int want_sdp = 1; |
|
|
|
|
|
|
|
|
|
static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, OSTFinished others) |
|
|
|
|
{ |
|
|
|
|
int i; |
|
|
|
|
for (i = 0; i < nb_output_streams; i++) { |
|
|
|
|
OutputStream *ost2 = output_streams[i]; |
|
|
|
|
ost2->finished |= ost == ost2 ? this_stream : others; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
{ |
|
|
|
|
MuxStream *ms = &of->mux->streams[ost->index]; |
|
|
|
@ -116,13 +116,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int64_t filesize(AVIOContext *pb) |
|
|
|
|
{ |
|
|
|
|
int64_t ret = -1; |
|
|
|
|
|
|
|
|
|
if (pb) { |
|
|
|
|
ret = avio_size(pb); |
|
|
|
|
if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
|
|
|
|
|
ret = avio_tell(pb); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
{ |
|
|
|
|
MuxStream *ms = &of->mux->streams[ost->index]; |
|
|
|
|
AVFormatContext *s = of->mux->fc; |
|
|
|
|
AVStream *st = ost->st; |
|
|
|
|
int64_t fs; |
|
|
|
|
int ret; |
|
|
|
|
|
|
|
|
|
fs = filesize(s->pb); |
|
|
|
|
atomic_store(&of->mux->last_filesize, fs); |
|
|
|
|
if (fs >= of->mux->limit_filesize) |
|
|
|
|
return AVERROR_EOF; |
|
|
|
|
|
|
|
|
|
if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->vsync_method == VSYNC_DROP) || |
|
|
|
|
(st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0)) |
|
|
|
|
pkt->pts = pkt->dts = AV_NOPTS_VALUE; |
|
|
|
@ -175,7 +194,7 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
ms->last_mux_dts = pkt->dts; |
|
|
|
|
|
|
|
|
|
ost->data_size += pkt->size; |
|
|
|
|
ost->packets_written++; |
|
|
|
|
atomic_fetch_add(&ost->packets_written, 1); |
|
|
|
|
|
|
|
|
|
pkt->stream_index = ost->index; |
|
|
|
|
|
|
|
|
@ -193,66 +212,81 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
ret = av_interleaved_write_frame(s, pkt); |
|
|
|
|
if (ret < 0) { |
|
|
|
|
print_error("av_interleaved_write_frame()", ret); |
|
|
|
|
main_return_code = 1; |
|
|
|
|
close_all_output_streams(ost, MUXER_FINISHED | ENCODER_FINISHED, ENCODER_FINISHED); |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
{ |
|
|
|
|
if (ost->sq_idx_mux >= 0) { |
|
|
|
|
int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); |
|
|
|
|
if (ret < 0) { |
|
|
|
|
if (pkt) |
|
|
|
|
av_packet_unref(pkt); |
|
|
|
|
if (ret == AVERROR_EOF) { |
|
|
|
|
ost->finished |= MUXER_FINISHED; |
|
|
|
|
return 0; |
|
|
|
|
} else |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
if (ret < 0) |
|
|
|
|
return ret; |
|
|
|
|
|
|
|
|
|
while (1) { |
|
|
|
|
ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt)); |
|
|
|
|
if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) |
|
|
|
|
return 0; |
|
|
|
|
else if (ret < 0) |
|
|
|
|
return ret; |
|
|
|
|
if (ret < 0) |
|
|
|
|
return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret; |
|
|
|
|
|
|
|
|
|
ret = write_packet(of, output_streams[of->ost_index + ret], |
|
|
|
|
of->mux->sq_pkt); |
|
|
|
|
if (ret < 0) |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (pkt) |
|
|
|
|
return write_packet(of, ost, pkt); |
|
|
|
|
|
|
|
|
|
ost->finished |= MUXER_FINISHED; |
|
|
|
|
} |
|
|
|
|
} else if (pkt) |
|
|
|
|
return write_packet(of, ost, pkt); |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) |
|
|
|
|
static void *muxer_thread(void *arg) |
|
|
|
|
{ |
|
|
|
|
int ret; |
|
|
|
|
OutputFile *of = arg; |
|
|
|
|
Muxer *mux = of->mux; |
|
|
|
|
AVPacket *pkt = NULL; |
|
|
|
|
int ret = 0; |
|
|
|
|
|
|
|
|
|
pkt = av_packet_alloc(); |
|
|
|
|
if (!pkt) { |
|
|
|
|
ret = AVERROR(ENOMEM); |
|
|
|
|
goto finish; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (of->mux->header_written) { |
|
|
|
|
return submit_packet(of, ost, pkt); |
|
|
|
|
} else { |
|
|
|
|
/* the muxer is not initialized yet, buffer the packet */ |
|
|
|
|
ret = queue_packet(of, ost, pkt); |
|
|
|
|
if (ret < 0) { |
|
|
|
|
av_packet_unref(pkt); |
|
|
|
|
return ret; |
|
|
|
|
while (1) { |
|
|
|
|
OutputStream *ost; |
|
|
|
|
int stream_idx; |
|
|
|
|
|
|
|
|
|
ret = tq_receive(mux->tq, &stream_idx, pkt); |
|
|
|
|
if (stream_idx < 0) { |
|
|
|
|
av_log(NULL, AV_LOG_VERBOSE, |
|
|
|
|
"All streams finished for output file #%d\n", of->index); |
|
|
|
|
ret = 0; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ost = output_streams[of->ost_index + stream_idx]; |
|
|
|
|
ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt); |
|
|
|
|
av_packet_unref(pkt); |
|
|
|
|
if (ret == AVERROR_EOF) |
|
|
|
|
tq_receive_finish(mux->tq, stream_idx); |
|
|
|
|
else if (ret < 0) { |
|
|
|
|
av_log(NULL, AV_LOG_ERROR, |
|
|
|
|
"Error muxing a packet for output file #%d\n", of->index); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
finish: |
|
|
|
|
av_packet_free(&pkt); |
|
|
|
|
|
|
|
|
|
for (unsigned int i = 0; i < mux->fc->nb_streams; i++) |
|
|
|
|
tq_receive_finish(mux->tq, i); |
|
|
|
|
|
|
|
|
|
av_log(NULL, AV_LOG_VERBOSE, "Terminating muxer thread %d\n", of->index); |
|
|
|
|
|
|
|
|
|
return (void*)(intptr_t)ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int print_sdp(void) |
|
|
|
@ -303,11 +337,125 @@ static int print_sdp(void) |
|
|
|
|
av_freep(&sdp_filename); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SDP successfully written, allow muxer threads to start
|
|
|
|
|
ret = 1; |
|
|
|
|
|
|
|
|
|
fail: |
|
|
|
|
av_freep(&avc); |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) |
|
|
|
|
{ |
|
|
|
|
Muxer *mux = of->mux; |
|
|
|
|
int ret = 0; |
|
|
|
|
|
|
|
|
|
if (!pkt || ost->finished & MUXER_FINISHED) |
|
|
|
|
goto finish; |
|
|
|
|
|
|
|
|
|
ret = tq_send(mux->tq, ost->index, pkt); |
|
|
|
|
if (ret < 0) |
|
|
|
|
goto finish; |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
|
|
|
|
|
finish: |
|
|
|
|
if (pkt) |
|
|
|
|
av_packet_unref(pkt); |
|
|
|
|
|
|
|
|
|
ost->finished |= MUXER_FINISHED; |
|
|
|
|
tq_send_finish(mux->tq, ost->index); |
|
|
|
|
return ret == AVERROR_EOF ? 0 : ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) |
|
|
|
|
{ |
|
|
|
|
int ret; |
|
|
|
|
|
|
|
|
|
if (of->mux->tq) { |
|
|
|
|
return submit_packet(of, ost, pkt); |
|
|
|
|
} else { |
|
|
|
|
/* the muxer is not initialized yet, buffer the packet */ |
|
|
|
|
ret = queue_packet(of, ost, pkt); |
|
|
|
|
if (ret < 0) { |
|
|
|
|
av_packet_unref(pkt); |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int thread_stop(OutputFile *of) |
|
|
|
|
{ |
|
|
|
|
Muxer *mux = of->mux; |
|
|
|
|
void *ret; |
|
|
|
|
|
|
|
|
|
if (!mux || !mux->tq) |
|
|
|
|
return 0; |
|
|
|
|
|
|
|
|
|
for (unsigned int i = 0; i < mux->fc->nb_streams; i++) |
|
|
|
|
tq_send_finish(mux->tq, i); |
|
|
|
|
|
|
|
|
|
pthread_join(mux->thread, &ret); |
|
|
|
|
|
|
|
|
|
tq_free(&mux->tq); |
|
|
|
|
|
|
|
|
|
return (int)(intptr_t)ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pkt_move(void *dst, void *src) |
|
|
|
|
{ |
|
|
|
|
av_packet_move_ref(dst, src); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int thread_start(OutputFile *of) |
|
|
|
|
{ |
|
|
|
|
Muxer *mux = of->mux; |
|
|
|
|
AVFormatContext *fc = mux->fc; |
|
|
|
|
ObjPool *op; |
|
|
|
|
int ret; |
|
|
|
|
|
|
|
|
|
op = objpool_alloc_packets(); |
|
|
|
|
if (!op) |
|
|
|
|
return AVERROR(ENOMEM); |
|
|
|
|
|
|
|
|
|
mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move); |
|
|
|
|
if (!mux->tq) { |
|
|
|
|
objpool_free(&op); |
|
|
|
|
return AVERROR(ENOMEM); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of); |
|
|
|
|
if (ret) { |
|
|
|
|
tq_free(&mux->tq); |
|
|
|
|
return AVERROR(ret); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* flush the muxing queues */ |
|
|
|
|
for (int i = 0; i < fc->nb_streams; i++) { |
|
|
|
|
MuxStream *ms = &of->mux->streams[i]; |
|
|
|
|
OutputStream *ost = output_streams[of->ost_index + i]; |
|
|
|
|
AVPacket *pkt; |
|
|
|
|
|
|
|
|
|
/* try to improve muxing time_base (only possible if nothing has been written yet) */ |
|
|
|
|
if (!av_fifo_can_read(ms->muxing_queue)) |
|
|
|
|
ost->mux_timebase = ost->st->time_base; |
|
|
|
|
|
|
|
|
|
while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { |
|
|
|
|
ret = submit_packet(of, ost, pkt); |
|
|
|
|
if (pkt) { |
|
|
|
|
ms->muxing_queue_data_size -= pkt->size; |
|
|
|
|
av_packet_free(&pkt); |
|
|
|
|
} |
|
|
|
|
if (ret < 0) |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* open the muxer when all the streams are initialized */ |
|
|
|
|
int of_check_init(OutputFile *of) |
|
|
|
|
{ |
|
|
|
@ -339,28 +487,19 @@ int of_check_init(OutputFile *of) |
|
|
|
|
if (ret < 0) { |
|
|
|
|
av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* flush the muxing queues */ |
|
|
|
|
for (i = 0; i < fc->nb_streams; i++) { |
|
|
|
|
MuxStream *ms = &of->mux->streams[i]; |
|
|
|
|
OutputStream *ost = output_streams[of->ost_index + i]; |
|
|
|
|
AVPacket *pkt; |
|
|
|
|
|
|
|
|
|
/* try to improve muxing time_base (only possible if nothing has been written yet) */ |
|
|
|
|
if (!av_fifo_can_read(ms->muxing_queue)) |
|
|
|
|
ost->mux_timebase = ost->st->time_base; |
|
|
|
|
|
|
|
|
|
while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { |
|
|
|
|
ret = submit_packet(of, ost, pkt); |
|
|
|
|
if (pkt) { |
|
|
|
|
ms->muxing_queue_data_size -= pkt->size; |
|
|
|
|
av_packet_free(&pkt); |
|
|
|
|
} else if (ret == 1) { |
|
|
|
|
/* SDP is written only after all the muxers are ready, so now we
|
|
|
|
|
* start ALL the threads */ |
|
|
|
|
for (i = 0; i < nb_output_files; i++) { |
|
|
|
|
ret = thread_start(output_files[i]); |
|
|
|
|
if (ret < 0) |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
if (ret < 0) |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
ret = thread_start(of); |
|
|
|
|
if (ret < 0) |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
@ -371,7 +510,7 @@ int of_write_trailer(OutputFile *of) |
|
|
|
|
AVFormatContext *fc = of->mux->fc; |
|
|
|
|
int ret; |
|
|
|
|
|
|
|
|
|
if (!of->mux->header_written) { |
|
|
|
|
if (!of->mux->tq) { |
|
|
|
|
av_log(NULL, AV_LOG_ERROR, |
|
|
|
|
"Nothing was written into output file %d (%s), because " |
|
|
|
|
"at least one of its streams received no packets.\n", |
|
|
|
@ -379,13 +518,17 @@ int of_write_trailer(OutputFile *of) |
|
|
|
|
return AVERROR(EINVAL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ret = thread_stop(of); |
|
|
|
|
if (ret < 0) |
|
|
|
|
main_return_code = ret; |
|
|
|
|
|
|
|
|
|
ret = av_write_trailer(fc); |
|
|
|
|
if (ret < 0) { |
|
|
|
|
av_log(NULL, AV_LOG_ERROR, "Error writing trailer of %s: %s\n", fc->url, av_err2str(ret)); |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
of->mux->final_filesize = of_filesize(of); |
|
|
|
|
of->mux->last_filesize = filesize(fc->pb); |
|
|
|
|
|
|
|
|
|
if (!(of->format->flags & AVFMT_NOFILE)) { |
|
|
|
|
ret = avio_closep(&fc->pb); |
|
|
|
@ -448,6 +591,8 @@ void of_close(OutputFile **pof) |
|
|
|
|
if (!of) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
thread_stop(of); |
|
|
|
|
|
|
|
|
|
sq_free(&of->sq_encode); |
|
|
|
|
sq_free(&of->sq_mux); |
|
|
|
|
|
|
|
|
@ -457,7 +602,8 @@ void of_close(OutputFile **pof) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int of_muxer_init(OutputFile *of, AVFormatContext *fc, |
|
|
|
|
AVDictionary *opts, int64_t limit_filesize) |
|
|
|
|
AVDictionary *opts, int64_t limit_filesize, |
|
|
|
|
int thread_queue_size) |
|
|
|
|
{ |
|
|
|
|
Muxer *mux = av_mallocz(sizeof(*mux)); |
|
|
|
|
int ret = 0; |
|
|
|
@ -487,6 +633,7 @@ int of_muxer_init(OutputFile *of, AVFormatContext *fc, |
|
|
|
|
ms->last_mux_dts = AV_NOPTS_VALUE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
mux->thread_queue_size = thread_queue_size > 0 ? thread_queue_size : 8; |
|
|
|
|
mux->limit_filesize = limit_filesize; |
|
|
|
|
mux->opts = opts; |
|
|
|
|
|
|
|
|
@ -515,25 +662,9 @@ fail: |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int of_finished(OutputFile *of) |
|
|
|
|
{ |
|
|
|
|
return of_filesize(of) >= of->mux->limit_filesize; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int64_t of_filesize(OutputFile *of) |
|
|
|
|
{ |
|
|
|
|
AVIOContext *pb = of->mux->fc->pb; |
|
|
|
|
int64_t ret = -1; |
|
|
|
|
|
|
|
|
|
if (of->mux->final_filesize) |
|
|
|
|
ret = of->mux->final_filesize; |
|
|
|
|
else if (pb) { |
|
|
|
|
ret = avio_size(pb); |
|
|
|
|
if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
|
|
|
|
|
ret = avio_tell(pb); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return ret; |
|
|
|
|
return atomic_load(&of->mux->last_filesize); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AVChapter * const * |
|
|
|
|