From 6edd6884b5a8380979649d80ccf3454684497f3b Mon Sep 17 00:00:00 2001 From: Fabrice Bellard Date: Wed, 20 Nov 2002 18:05:45 +0000 Subject: [PATCH] RTP multicast begins to work in MPEG1 - simplified stream bandwidth computation (no need to recompute it at each request) Originally committed as revision 1260 to svn://svn.ffmpeg.org/ffmpeg/trunk --- ffserver.c | 218 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 165 insertions(+), 53 deletions(-) diff --git a/ffserver.c b/ffserver.c index 2779e280e1..a9250cad31 100644 --- a/ffserver.c +++ b/ffserver.c @@ -120,7 +120,6 @@ typedef struct HTTPContext { AVFormatContext fmt_ctx; /* instance of FFStream for one user */ int last_packet_sent; /* true if last data packet was sent */ int suppress_log; - int bandwidth; DataRateData datarate; int wmp_client_id; char protocol[16]; @@ -190,12 +189,15 @@ typedef struct FFStream { time_t pid_start; /* Of ffmpeg process */ char **child_argv; struct FFStream *next; + int bandwidth; /* bandwidth, in kbits/s */ /* RTSP options */ char *rtsp_option; /* multicast specific */ int is_multicast; struct in_addr multicast_ip; int multicast_port; /* first port used for multicast */ + int multicast_ttl; + int loop; /* if true, send the stream in loops (only meaningful if file) */ /* feed specific */ int feed_opened; /* true if someone is writing to the feed */ @@ -247,7 +249,7 @@ static int prepare_sdp_description(FFStream *stream, UINT8 **pbuffer, struct in_addr my_ip); /* RTP handling */ -static HTTPContext *rtp_new_connection(HTTPContext *rtsp_c, +static HTTPContext *rtp_new_connection(struct sockaddr_in *from_addr, FFStream *stream, const char *session_id); static int rtp_new_av_stream(HTTPContext *c, int stream_index, struct sockaddr_in *dest_addr); @@ -263,8 +265,8 @@ static int need_to_start_children; int nb_max_connections; int nb_connections; -int nb_max_bandwidth; -int nb_bandwidth; +int max_bandwidth; +int current_bandwidth; static long cur_time; // Making this global saves on passing it around everywhere @@ -290,25 +292,31 @@ static void http_log(char *fmt, ...) va_end(ap); } -static void log_connection(HTTPContext *c) +static char *ctime1(char *buf2) { - char buf1[32], buf2[32], *p; time_t ti; + char *p; - if (c->suppress_log) - return; - - /* XXX: reentrant function ? */ - p = inet_ntoa(c->from_addr.sin_addr); - strcpy(buf1, p); ti = time(NULL); p = ctime(&ti); strcpy(buf2, p); p = buf2 + strlen(p) - 1; if (*p == '\n') *p = '\0'; + return buf2; +} + +static void log_connection(HTTPContext *c) +{ + char buf2[32]; + + if (c->suppress_log) + return; + http_log("%s - - [%s] \"%s %s %s\" %d %lld\n", - buf1, buf2, c->method, c->url, c->protocol, (c->http_error ? c->http_error : 200), c->data_count); + inet_ntoa(c->from_addr.sin_addr), + ctime1(buf2), c->method, c->url, + c->protocol, (c->http_error ? c->http_error : 200), c->data_count); } static void update_datarate(DataRateData *drd, INT64 count) @@ -331,7 +339,7 @@ static int compute_datarate(DataRateData *drd, INT64 count) { if (cur_time == drd->time1) return 0; - + return ((count - drd->count1) * 1000) / (cur_time - drd->time1); } @@ -340,7 +348,6 @@ static int get_longterm_datarate(DataRateData *drd, INT64 count) /* You get the first 3 seconds flat out */ if (cur_time - drd->time1 < 3000) return 0; - return compute_datarate(drd, count); } @@ -431,6 +438,61 @@ static int socket_open_listen(struct sockaddr_in *my_addr) return server_fd; } +/* start all multicast streams */ +static void start_multicast(void) +{ + FFStream *stream; + char session_id[32]; + HTTPContext *rtp_c; + struct sockaddr_in dest_addr; + int default_port, stream_index; + + default_port = 6000; + for(stream = first_stream; stream != NULL; stream = stream->next) { + if (stream->is_multicast) { + /* open the RTP connection */ + snprintf(session_id, sizeof(session_id), + "%08x%08x", (int)random(), (int)random()); + + /* choose a port if none given */ + if (stream->multicast_port == 0) { + stream->multicast_port = default_port; + default_port += 100; + } + + dest_addr.sin_family = AF_INET; + dest_addr.sin_addr = stream->multicast_ip; + dest_addr.sin_port = htons(stream->multicast_port); + + rtp_c = rtp_new_connection(&dest_addr, stream, session_id); + if (!rtp_c) { + continue; + } + if (open_input_stream(rtp_c, "") < 0) { + fprintf(stderr, "Could not open input stream for stream '%s'\n", + stream->filename); + continue; + } + + rtp_c->rtp_protocol = RTSP_PROTOCOL_RTP_UDP_MULTICAST; + + /* open each RTP stream */ + for(stream_index = 0; stream_index < stream->nb_streams; + stream_index++) { + dest_addr.sin_port = htons(stream->multicast_port + + 2 * stream_index); + if (rtp_new_av_stream(rtp_c, stream_index, &dest_addr) < 0) { + fprintf(stderr, "Could not open input stream %d for stream '%s'\n", + stream_index, stream->filename); + continue; + } + } + + /* change state to send data */ + rtp_c->state = HTTPSTATE_SEND_DATA; + } + } +} /* main loop of the http server */ static int http_server(void) @@ -454,6 +516,9 @@ static int http_server(void) first_http_ctx = NULL; nb_connections = 0; first_http_ctx = NULL; + + start_multicast(); + for(;;) { poll_entry = poll_table; poll_entry->fd = server_fd; @@ -669,7 +734,8 @@ static void close_connection(HTTPContext *c) } } - nb_bandwidth -= c->bandwidth; + if (c->stream) + current_bandwidth -= c->stream->bandwidth; av_freep(&c->pb_buffer); av_free(c->buffer); av_free(c); @@ -1125,9 +1191,7 @@ static int http_parse_request(HTTPContext *c) compute_real_filename(filename, sizeof(url) - 1); } else if (match_ext(filename, "sdp")) { redir_type = REDIR_SDP; - printf("before %s\n", filename); compute_real_filename(filename, sizeof(url) - 1); - printf("after %s\n", filename); } stream = first_stream; @@ -1174,26 +1238,10 @@ static int http_parse_request(HTTPContext *c) } if (post == 0 && stream->stream_type == STREAM_TYPE_LIVE) { - /* See if we meet the bandwidth requirements */ - for(i=0;inb_streams;i++) { - AVStream *st = stream->streams[i]; - switch(st->codec.codec_type) { - case CODEC_TYPE_AUDIO: - c->bandwidth += st->codec.bit_rate; - break; - case CODEC_TYPE_VIDEO: - c->bandwidth += st->codec.bit_rate; - break; - default: - av_abort(); - } - } + current_bandwidth += stream->bandwidth; } - - c->bandwidth /= 1000; - nb_bandwidth += c->bandwidth; - - if (post == 0 && nb_max_bandwidth < nb_bandwidth) { + + if (post == 0 && max_bandwidth < current_bandwidth) { c->http_error = 200; q = c->buffer; q += sprintf(q, "HTTP/1.0 200 Server too busy\r\n"); @@ -1202,7 +1250,7 @@ static int http_parse_request(HTTPContext *c) q += sprintf(q, "Too busy\r\n"); q += sprintf(q, "The server is too busy to serve your request at this time.

\r\n"); q += sprintf(q, "The bandwidth being served (including your stream) is %dkbit/sec, and this exceeds the limit of %dkbit/sec\r\n", - nb_bandwidth, nb_max_bandwidth); + current_bandwidth, max_bandwidth); q += sprintf(q, "\r\n"); /* prepare output buffer */ @@ -1580,7 +1628,7 @@ static void compute_stats(HTTPContext *c) } url_fprintf(pb, " %s %d %d %s %s %d %s %s", stream->fmt->name, - (audio_bit_rate + video_bit_rate) / 1000, + stream->bandwidth, video_bit_rate / 1000, video_codec_name, video_codec_name_extra, audio_bit_rate / 1000, audio_codec_name, audio_codec_name_extra); if (stream->feed) { @@ -1702,7 +1750,7 @@ static void compute_stats(HTTPContext *c) nb_connections, nb_max_connections); url_fprintf(pb, "Bandwidth in use: %dk / %dk
\n", - nb_bandwidth, nb_max_bandwidth); + current_bandwidth, max_bandwidth); url_fprintf(pb, "\n"); url_fprintf(pb, "
#FileIPProtoStateTarget bits/secActual bits/secBytes transferred\n"); @@ -1999,7 +2047,7 @@ static int compute_send_delay(HTTPContext *c) { int datarate = 8 * get_longterm_datarate(&c->datarate, c->data_count); - if (datarate > c->bandwidth * 2000) { + if (datarate > c->stream->bandwidth * 2000) { return 1000; } return 0; @@ -2082,6 +2130,7 @@ static int http_prepare_data(HTTPContext *c) return 1; /* state changed */ } } + redo: if (av_read_frame(c->fmt_in, &pkt) < 0) { if (c->stream->feed && c->stream->feed->feed_opened) { /* if coming from feed, it means we reached the end of the @@ -2089,8 +2138,17 @@ static int http_prepare_data(HTTPContext *c) c->state = HTTPSTATE_WAIT_FEED; return 1; /* state changed */ } else { - /* must send trailer now because eof or error */ - c->state = HTTPSTATE_SEND_DATA_TRAILER; + if (c->stream->loop) { + av_close_input_file(c->fmt_in); + c->fmt_in = NULL; + if (open_input_stream(c, "") < 0) + goto no_loop; + goto redo; + } else { + no_loop: + /* must send trailer now because eof or error */ + c->state = HTTPSTATE_SEND_DATA_TRAILER; + } } } else { /* update first pts if needed */ @@ -2143,6 +2201,8 @@ static int http_prepare_data(HTTPContext *c) c->packet_stream_index = pkt.stream_index; ctx = c->rtp_ctx[c->packet_stream_index]; codec = &ctx->streams[0]->codec; + /* only one stream per RTP connection */ + pkt.stream_index = 0; } else { ctx = &c->fmt_ctx; /* Fudge here */ @@ -2721,7 +2781,7 @@ static void rtsp_cmd_setup(HTTPContext *c, const char *url, /* find rtp session, and create it if none found */ rtp_c = find_rtp_session(h->session_id); if (!rtp_c) { - rtp_c = rtp_new_connection(c, stream, h->session_id); + rtp_c = rtp_new_connection(&c->from_addr, stream, h->session_id); if (!rtp_c) { rtsp_reply_error(c, RTSP_STATUS_BANDWIDTH); return; @@ -2923,7 +2983,7 @@ static void rtsp_cmd_teardown(HTTPContext *c, const char *url, RTSPHeader *h) /********************************************************************/ /* RTP handling */ -static HTTPContext *rtp_new_connection(HTTPContext *rtsp_c, +static HTTPContext *rtp_new_connection(struct sockaddr_in *from_addr, FFStream *stream, const char *session_id) { HTTPContext *c = NULL; @@ -2940,7 +3000,7 @@ static HTTPContext *rtp_new_connection(HTTPContext *rtsp_c, c->fd = -1; c->poll_entry = NULL; - c->from_addr = rtsp_c->from_addr; + c->from_addr = *from_addr; c->buffer_size = IOBUFFER_INIT_SIZE; c->buffer = av_malloc(c->buffer_size); if (!c->buffer) @@ -2953,6 +3013,8 @@ static HTTPContext *rtp_new_connection(HTTPContext *rtsp_c, /* protocol is shown in statistics */ pstrcpy(c->protocol, sizeof(c->protocol), "RTP"); + current_bandwidth += stream->bandwidth; + c->next = first_http_ctx; first_http_ctx = c; return c; @@ -2976,7 +3038,8 @@ static int rtp_new_av_stream(HTTPContext *c, char *ipaddr; URLContext *h; UINT8 *dummy_buf; - + char buf2[32]; + /* now we can open the relevant output stream */ ctx = av_mallocz(sizeof(AVFormatContext)); if (!ctx) @@ -3002,10 +3065,19 @@ static int rtp_new_av_stream(HTTPContext *c, /* build destination RTP address */ ipaddr = inet_ntoa(dest_addr->sin_addr); - snprintf(ctx->filename, sizeof(ctx->filename), - "rtp://%s:%d", ipaddr, ntohs(dest_addr->sin_port)); - - printf("open %s\n", ctx->filename); + /* XXX: also pass as parameter to function ? */ + if (c->stream->is_multicast) { + int ttl; + ttl = c->stream->multicast_ttl; + if (!ttl) + ttl = 16; + snprintf(ctx->filename, sizeof(ctx->filename), + "rtp://%s:%d?multicast=1&ttl=%d", + ipaddr, ntohs(dest_addr->sin_port), ttl); + } else { + snprintf(ctx->filename, sizeof(ctx->filename), + "rtp://%s:%d", ipaddr, ntohs(dest_addr->sin_port)); + } if (url_open(&h, ctx->filename, URL_WRONLY) < 0) goto fail; @@ -3014,6 +3086,11 @@ static int rtp_new_av_stream(HTTPContext *c, goto fail; } + http_log("%s:%d - - [%s] \"RTPSTART %s/streamid=%d\"\n", + ipaddr, ntohs(dest_addr->sin_port), + ctime1(buf2), + c->stream->filename, stream_index); + /* normally, no packets should be output here, but the packet size may be checked */ if (url_open_dyn_packet_buf(&ctx->pb, url_get_max_packet_size(h)) < 0) { @@ -3286,6 +3363,29 @@ void build_feed_streams(void) } } +/* compute the bandwidth used by each stream */ +static void compute_bandwidth(void) +{ + int bandwidth, i; + FFStream *stream; + + for(stream = first_stream; stream != NULL; stream = stream->next) { + bandwidth = 0; + for(i=0;inb_streams;i++) { + AVStream *st = stream->streams[i]; + switch(st->codec.codec_type) { + case CODEC_TYPE_AUDIO: + case CODEC_TYPE_VIDEO: + bandwidth += st->codec.bit_rate; + break; + default: + break; + } + } + stream->bandwidth = (bandwidth + 999) / 1000; + } +} + static void get_arg(char *buf, int buf_size, const char **pp) { const char *p; @@ -3519,7 +3619,7 @@ int parse_ffconfig(const char *filename) filename, line_num, arg); errors++; } else { - nb_max_bandwidth = val; + max_bandwidth = val; } } else if (!strcasecmp(cmd, "CustomLog")) { get_arg(logfilename, sizeof(logfilename), &p); @@ -3952,12 +4052,22 @@ int parse_ffconfig(const char *filename) errors++; } stream->is_multicast = 1; + stream->loop = 1; /* default is looping */ } } else if (!strcasecmp(cmd, "MulticastPort")) { get_arg(arg, sizeof(arg), &p); if (stream) { stream->multicast_port = atoi(arg); } + } else if (!strcasecmp(cmd, "MulticastTTL")) { + get_arg(arg, sizeof(arg), &p); + if (stream) { + stream->multicast_ttl = atoi(arg); + } + } else if (!strcasecmp(cmd, "NoLoop")) { + if (stream) { + stream->loop = 0; + } } else if (!strcasecmp(cmd, "")) { if (!stream) { fprintf(stderr, "%s:%d: No corresponding for \n", @@ -4162,7 +4272,7 @@ int main(int argc, char **argv) my_rtsp_addr.sin_addr.s_addr = htonl (INADDR_ANY); nb_max_connections = 5; - nb_max_bandwidth = 1000; + max_bandwidth = 1000; first_stream = NULL; logfilename[0] = '\0'; @@ -4180,6 +4290,8 @@ int main(int argc, char **argv) build_feed_streams(); + compute_bandwidth(); + /* put the process in background and detach it from its TTY */ if (ffserver_daemon) { int pid;