avformat/libamqp: add option delivery_mode

Reviewed-by: Andriy Gelman <andriy.gelman@gmail.com>
Signed-off-by: Levis Florian <levis.florian@gmail.com>
pull/342/head
Levis Florian 5 years ago committed by Andriy Gelman
parent 24d1781cbd
commit bd6ae462f8
  1. 15
      doc/protocols.texi
  2. 6
      libavformat/libamqp.c

@ -109,6 +109,21 @@ the received message may be truncated causing decoding errors.
The timeout in seconds during the initial connection to the broker. The The timeout in seconds during the initial connection to the broker. The
default value is rw_timeout, or 5 seconds if rw_timeout is not set. default value is rw_timeout, or 5 seconds if rw_timeout is not set.
@item delivery_mode @var{mode}
Sets the delivery mode of each message sent to broker.
The following values are accepted:
@table @samp
@item persistent
Delivery mode set to "persistent" (2). This is the default value.
Messages may be written to the broker's disk depending on its setup.
@item non-persistent
Delivery mode set to "non-persistent" (1).
Messages will stay in broker's memory unless the broker is under memory
pressure.
@end table
@end table @end table
@section async @section async

@ -39,6 +39,7 @@ typedef struct AMQPContext {
int pkt_size; int pkt_size;
int64_t connection_timeout; int64_t connection_timeout;
int pkt_size_overflow; int pkt_size_overflow;
int delivery_mode;
} AMQPContext; } AMQPContext;
#define STR_LEN 1024 #define STR_LEN 1024
@ -52,6 +53,9 @@ static const AVOption options[] = {
{ "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E }, { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
{ "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E }, { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
{ "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E}, { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E},
{ "delivery_mode", "Delivery mode", OFFSET(delivery_mode), AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags = E, "delivery_mode"},
{ "persistent", "Persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0, E, "delivery_mode" },
{ "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0, E, "delivery_mode" },
{ NULL } { NULL }
}; };
@ -222,7 +226,7 @@ static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("octet/stream"); props.content_type = amqp_cstring_bytes("octet/stream");
props.delivery_mode = 2; /* persistent delivery mode */ props.delivery_mode = s->delivery_mode;
ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange), ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
amqp_cstring_bytes(s->routing_key), 0, 0, amqp_cstring_bytes(s->routing_key), 0, 0,

Loading…
Cancel
Save