From 35d9185f6c3dc1c34ea7d46859ee89827e2fe74a Mon Sep 17 00:00:00 2001 From: Romain Vimont Date: Wed, 31 Jul 2019 01:55:40 +0200 Subject: [PATCH] Record asynchronously The record file was written from the stream thread. As a consequence, any blocking I/O to write the file delayed the decoder. For maximum performance even when recording is enabled, send (refcounted) packets to a separate recording thread. --- app/src/recorder.c | 184 +++++++++++++++++++++++++++++++++++++++++++++ app/src/recorder.h | 30 +++++++- app/src/stream.c | 27 +++++-- 3 files changed, 233 insertions(+), 8 deletions(-) diff --git a/app/src/recorder.c b/app/src/recorder.c index c14394a3..c36e3f91 100644 --- a/app/src/recorder.c +++ b/app/src/recorder.c @@ -5,6 +5,7 @@ #include "compat.h" #include "config.h" +#include "lock_util.h" #include "log.h" static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us @@ -26,6 +27,82 @@ find_muxer(const char *name) { return oformat; } +static struct record_packet * +record_packet_new(const AVPacket *packet) { + struct record_packet *rec = SDL_malloc(sizeof(*rec)); + if (!rec) { + return NULL; + } + if (av_packet_ref(&rec->packet, packet)) { + SDL_free(rec); + return NULL; + } + rec->next = NULL; + return rec; +} + +static void +record_packet_delete(struct record_packet *rec) { + av_packet_unref(&rec->packet); + SDL_free(rec); +} + +static void +recorder_queue_init(struct recorder_queue *queue) { + queue->first = NULL; + // queue->last is undefined if queue->first == NULL +} + +static inline bool +recorder_queue_is_empty(struct recorder_queue *queue) { + return !queue->first; +} + +static bool +recorder_queue_push(struct recorder_queue *queue, const AVPacket *packet) { + struct record_packet *rec = record_packet_new(packet); + if (!rec) { + LOGC("Could not allocate record packet"); + return false; + } + rec->next = NULL; + + if (recorder_queue_is_empty(queue)) { + queue->first = queue->last = rec; + } else { + // chain rec after the (current) last packet + queue->last->next = rec; + // the last packet is now rec + queue->last = rec; + } + return true; +} + +static inline struct record_packet * +recorder_queue_take(struct recorder_queue *queue) { + SDL_assert(!recorder_queue_is_empty(queue)); + + struct record_packet *rec = queue->first; + SDL_assert(rec); + + queue->first = rec->next; + // no need to update queue->last if the queue is left empty: + // queue->last is undefined if queue->first == NULL + + return rec; +} + +static void +recorder_queue_clear(struct recorder_queue *queue) { + struct record_packet *rec = queue->first; + while (rec) { + struct record_packet *current = rec; + rec = rec->next; + record_packet_delete(current); + } + queue->first = NULL; +} + bool recorder_init(struct recorder *recorder, const char *filename, @@ -37,6 +114,24 @@ recorder_init(struct recorder *recorder, return false; } + recorder->mutex = SDL_CreateMutex(); + if (!recorder->mutex) { + LOGC("Could not create mutex"); + SDL_free(recorder->filename); + return false; + } + + recorder->queue_cond = SDL_CreateCond(); + if (!recorder->queue_cond) { + LOGC("Could not create cond"); + SDL_DestroyMutex(recorder->mutex); + SDL_free(recorder->filename); + return false; + } + + recorder_queue_init(&recorder->queue); + recorder->stopped = false; + recorder->failed = false; recorder->format = format; recorder->declared_frame_size = declared_frame_size; recorder->header_written = false; @@ -46,6 +141,8 @@ recorder_init(struct recorder *recorder, void recorder_destroy(struct recorder *recorder) { + SDL_DestroyCond(recorder->queue_cond); + SDL_DestroyMutex(recorder->mutex); SDL_free(recorder->filename); } @@ -186,3 +283,90 @@ recorder_write(struct recorder *recorder, AVPacket *packet) { recorder_rescale_packet(recorder, packet); return av_write_frame(recorder->ctx, packet) >= 0; } + +static int +run_recorder(void *data) { + struct recorder *recorder = data; + + for (;;) { + mutex_lock(recorder->mutex); + + while (!recorder->stopped && + recorder_queue_is_empty(&recorder->queue)) { + cond_wait(recorder->queue_cond, recorder->mutex); + } + + // if stopped is set, continue to process the remaining events (to + // finish the recording) before actually stopping + + if (recorder->stopped && recorder_queue_is_empty(&recorder->queue)) { + mutex_unlock(recorder->mutex); + break; + } + + struct record_packet *rec = recorder_queue_take(&recorder->queue); + + mutex_unlock(recorder->mutex); + + bool ok = recorder_write(recorder, &rec->packet); + record_packet_delete(rec); + if (!ok) { + LOGE("Could not record packet"); + + mutex_lock(recorder->mutex); + recorder->failed = true; + // discard pending packets + recorder_queue_clear(&recorder->queue); + mutex_unlock(recorder->mutex); + break; + } + + } + + LOGD("Recorder thread ended"); + + return 0; +} + +bool +recorder_start(struct recorder *recorder) { + LOGD("Starting recorder thread"); + + recorder->thread = SDL_CreateThread(run_recorder, "recorder", recorder); + if (!recorder->thread) { + LOGC("Could not start recorder thread"); + return false; + } + + return true; +} + +void +recorder_stop(struct recorder *recorder) { + mutex_lock(recorder->mutex); + recorder->stopped = true; + cond_signal(recorder->queue_cond); + mutex_unlock(recorder->mutex); +} + +void +recorder_join(struct recorder *recorder) { + SDL_WaitThread(recorder->thread, NULL); +} + +bool +recorder_push(struct recorder *recorder, const AVPacket *packet) { + mutex_lock(recorder->mutex); + SDL_assert(!recorder->stopped); + + if (recorder->failed) { + // reject any new packet (this will stop the stream) + return false; + } + + bool ok = recorder_queue_push(&recorder->queue, packet); + cond_signal(recorder->queue_cond); + + mutex_unlock(recorder->mutex); + return ok; +} diff --git a/app/src/recorder.h b/app/src/recorder.h index 8a8e3310..8d1f575d 100644 --- a/app/src/recorder.h +++ b/app/src/recorder.h @@ -3,6 +3,8 @@ #include #include +#include +#include #include "common.h" @@ -11,12 +13,29 @@ enum recorder_format { RECORDER_FORMAT_MKV, }; +struct record_packet { + AVPacket packet; + struct record_packet *next; +}; + +struct recorder_queue { + struct record_packet *first; + struct record_packet *last; // undefined if first is NULL +}; + struct recorder { char *filename; enum recorder_format format; AVFormatContext *ctx; struct size declared_frame_size; bool header_written; + + SDL_Thread *thread; + SDL_mutex *mutex; + SDL_cond *queue_cond; + bool stopped; // set on recorder_stop() by the stream reader + bool failed; // set on packet write failure + struct recorder_queue queue; }; bool @@ -33,6 +52,15 @@ void recorder_close(struct recorder *recorder); bool -recorder_write(struct recorder *recorder, AVPacket *packet); +recorder_start(struct recorder *recorder); + +void +recorder_stop(struct recorder *recorder); + +void +recorder_join(struct recorder *recorder); + +bool +recorder_push(struct recorder *recorder, const AVPacket *packet); #endif diff --git a/app/src/stream.c b/app/src/stream.c index 0396bf60..bca89f71 100644 --- a/app/src/stream.c +++ b/app/src/stream.c @@ -71,7 +71,7 @@ notify_stopped(void) { static bool process_config_packet(struct stream *stream, AVPacket *packet) { - if (stream->recorder && !recorder_write(stream->recorder, packet)) { + if (stream->recorder && !recorder_push(stream->recorder, packet)) { LOGE("Could not send config packet to recorder"); return false; } @@ -87,8 +87,8 @@ process_frame(struct stream *stream, AVPacket *packet) { if (stream->recorder) { packet->dts = packet->pts; - if (!recorder_write(stream->recorder, packet)) { - LOGE("Could not write frame to output file"); + if (!recorder_push(stream->recorder, packet)) { + LOGE("Could not send packet to recorder"); return false; } } @@ -201,15 +201,22 @@ run_stream(void *data) { goto finally_free_codec_ctx; } - if (stream->recorder && !recorder_open(stream->recorder, codec)) { - LOGE("Could not open recorder"); - goto finally_close_decoder; + if (stream->recorder) { + if (!recorder_open(stream->recorder, codec)) { + LOGE("Could not open recorder"); + goto finally_close_decoder; + } + + if (!recorder_start(stream->recorder)) { + LOGE("Could not start recorder"); + goto finally_close_recorder; + } } stream->parser = av_parser_init(AV_CODEC_ID_H264); if (!stream->parser) { LOGE("Could not initialize parser"); - goto finally_close_recorder; + goto finally_stop_and_join_recorder; } // We must only pass complete frames to av_parser_parse2()! @@ -239,6 +246,12 @@ run_stream(void *data) { } av_parser_close(stream->parser); +finally_stop_and_join_recorder: + if (stream->recorder) { + recorder_stop(stream->recorder); + LOGI("Finishing recording..."); + recorder_join(stream->recorder); + } finally_close_recorder: if (stream->recorder) { recorder_close(stream->recorder);