gstreamer 文档中有很多关于构建和运行静态管道的示例。但是,在实时管道中更改/重新链接元素并不多——而媒体实际上是在流动的。这绝对是可能的,所以问题是:
- 在尝试此操作之前,我应该了解哪些 gstreamer 概念/机制?
- 有什么需要注意的陷阱吗?
- 什么是基本程序,或者一个很好的例子?
接受的答案将是勺子喂养的,全面的,并带有源代码
我最喜欢的理解链接(和动态链接)的“概念”是将管道视为真正的管道,水流过它。一旦你这样做了,有些事情会变得非常明显。比如,“你在连接元素之前将源设置为播放吗?”,变成“你在连接软管之前打开水吗?”,它本身就是一个答案。使用动态链接更是如此,您如何确保没有水“泄漏”(这很糟糕,GStreamer 中的“泄漏”相当于获得 GST_FLOW_NOT_LINKED,并且会停止您的来源和乐趣)或被堵塞(可以导致丢包或拥塞)。
是的。许多。有一点免责声明,我目前仍在使用 0.10,其中一些可能已在 1.0 中修复,不幸的是,使用 GStreamer 0.10 进行动态链接和取消链接非常非常困难。让我解释一下:假设您正在使用 Tee,并且您想取消链接一个分支。您将从释放 Tees srcpad 开始(不要介意取消链接,这是释放垫的一部分),现在您应该能够安全地拆除该垫下游的元素。(相当于水是你在三通之后关闭了一个阀门,现在应该能够在阀门之后拆除管道,除非你想弄湿,否则你不会在不先关闭阀门的情况下开始拆除管道......)这个大部分时间都可以工作,但这里有一场比赛。因为在你释放垫子后,
我对此进行了很多实验,发现如果您需要稳定性,并且偶尔崩溃/冻结不是一种选择,您需要一个可以作为动态安全网的元素。一个元素,它将保证在您释放/取消链接后,pad 上绝对不会发生任何活动。做到这一点的唯一方法是打破另一个 GStreamer 在持有锁时不推送的范例:您需要在推送/发送事件/填充分配时持有锁。前段时间我在这里做了这样的事情. (当然,测试用例是最重要的事情,因为它允许您测试自己/其他元素的安全性)您还可以想象一个无锁元素可以吞下所有错误的 FlowReturns,并为它的上游,但是您需要绝对确定您的所有下游元素都将“在关闭时收到推送或填充分配”-安全,因为您的元素将无法保证一旦“停止流动” (release/unlink) 已经执行了,一点滴也挤不过去。
当然,您必须正确看待其中的一些。我所说的这些可怕的竞争条件的窗口实际上非常非常小,并且可能只会在您运行程序时每 1000 次或 10.000 次发生一次。但对于专业应用来说,这当然是不可接受的。我做了一个演讲,在这里我介绍了一些这些东西
我倾向于根据情况使用输出选择器或输入选择器箱,而不是焊盘阻塞的复杂性(我在另一篇文章http://gstreamer-devel.966125.n4.nabble.com/Dynamically-adding-中回答了焊盘阻塞and-removing-branches-of-a-tee-td973635.html#a4656812)。并在不使用时将选择器连接到 fakesrc 或 fakesink bin。在下面的示例中,如果使用 GTK,则可以将行替换为g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel);
并将gtk_toggle_button
当前所有代码放入switch_cb
函数转换为切换按钮回调函数。在此代码中,可以在两个图像接收器之间切换。我会用 fakesink 替换一个图像接收器以保持管道运行,以防我将来想添加一个带有文件接收器的 tee,我想在其中录制视频但为播放器提供打开(图像接收器上的选择器)/关闭(选择器)的选项在 fakesink 上)显示。这允许在运行时使用选择器添加/删除垃圾箱。
#include <gst/gst.h>
#define SWITCH_TIMEOUT 1000
#define NUM_VIDEO_BUFFERS 500
static GMainLoop *loop;
/* Output selector src pads */
static GstPad *osel_src1 = NULL;
static GstPad *osel_src2 = NULL;
static gboolean
my_bus_callback (GstBus * bus, GstMessage * message, gpointer data)
{
g_print ("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:{
GError *err;
gchar *debug;
gst_message_parse_error (message, &err, &debug);
g_print ("Error: %s\n", err->message);
g_error_free (err);
g_free (debug);
g_main_loop_quit (loop);
break;
}
case GST_MESSAGE_EOS:
/* end-of-stream */
g_main_loop_quit (loop);
break;
default:
/* unhandled message */
break;
}
/* we want to be notified again the next time there is a message
* on the bus, so returning TRUE (FALSE means we want to stop watching
* for messages on the bus and our callback should not be called again)
*/
return TRUE;
}
static gboolean
switch_cb (gpointer user_data)
{
GstElement *sel = GST_ELEMENT (user_data);
GstPad *old_pad, *new_pad = NULL;
g_object_get (G_OBJECT (sel), "active-pad", &old_pad, NULL);
if (old_pad == osel_src1)
new_pad = osel_src2;
else
new_pad = osel_src1;
g_object_set (G_OBJECT (sel), "active-pad", new_pad, NULL);
g_print ("switched from %s:%s to %s:%s\n", GST_DEBUG_PAD_NAME (old_pad),
GST_DEBUG_PAD_NAME (new_pad));
gst_object_unref (old_pad);
return TRUE;
}
gint
main (gint argc, gchar * argv[])
{
GstElement *pipeline, *src, *toverlay, *osel, *sink1, *sink2, *convert;
GstPad *sinkpad1;
GstPad *sinkpad2;
GstBus *bus;
/* init GStreamer */
gst_init (&argc, &argv);
loop = g_main_loop_new (NULL, FALSE);
/* create elements */
pipeline = gst_element_factory_make ("pipeline", "pipeline");
src = gst_element_factory_make ("videotestsrc", "src");
toverlay = gst_element_factory_make ("timeoverlay", "timeoverlay");
osel = gst_element_factory_make ("output-selector", "osel");
convert = gst_element_factory_make ("ffmpegcolorspace", "convert");
sink1 = gst_element_factory_make ("xvimagesink", "sink1");
sink2 = gst_element_factory_make ("ximagesink", "sink2");
if (!pipeline || !src || !toverlay || !osel || !convert || !sink1 || !sink2) {
g_print ("missing element\n");
return -1;
}
/* add them to bin */
gst_bin_add_many (GST_BIN (pipeline), src, toverlay, osel, convert, sink1,
sink2, NULL);
/* set properties */
g_object_set (G_OBJECT (src), "is-live", TRUE, NULL);
g_object_set (G_OBJECT (src), "do-timestamp", TRUE, NULL);
g_object_set (G_OBJECT (src), "num-buffers", NUM_VIDEO_BUFFERS, NULL);
g_object_set (G_OBJECT (sink1), "sync", FALSE, "async", FALSE, NULL);
g_object_set (G_OBJECT (sink2), "sync", FALSE, "async", FALSE, NULL);
g_object_set (G_OBJECT (osel), "resend-latest", TRUE, NULL);
/* link src ! timeoverlay ! osel */
if (!gst_element_link_many (src, toverlay, osel, NULL)) {
g_print ("linking failed\n");
return -1;
}
/* link output 1 */
sinkpad1 = gst_element_get_static_pad (sink1, "sink");
osel_src1 = gst_element_get_request_pad (osel, "src%d");
if (gst_pad_link (osel_src1, sinkpad1) != GST_PAD_LINK_OK) {
g_print ("linking output 1 failed\n");
return -1;
}
gst_object_unref (sinkpad1);
/* link output 2 */
sinkpad2 = gst_element_get_static_pad (convert, "sink");
osel_src2 = gst_element_get_request_pad (osel, "src%d");
if (gst_pad_link (osel_src2, sinkpad2) != GST_PAD_LINK_OK) {
g_print ("linking output 2 failed\n");
return -1;
}
gst_object_unref (sinkpad2);
if (!gst_element_link (convert, sink2)) {
g_print ("linking output 2 failed\n");
return -1;
}
/* add switch callback */
g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel);
/* change to playing */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
gst_bus_add_watch (bus, my_bus_callback, loop);
gst_object_unref (bus);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
/* now run */
g_main_loop_run (loop);
/* also clean up */
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_element_release_request_pad (osel, osel_src1);
gst_element_release_request_pad (osel, osel_src2);
gst_object_unref (GST_OBJECT (pipeline));
return 0;
}
当我寻找动态修改任何 gstreamer 管道时,这篇文章首先出现。找到了一些链接,但它现在在手册中有很好的记录:http: //gstreamer.freedesktop.org/data/doc/gstreamer/head/manual/html/section-dynamic-pipelines.html
其实我也在尝试做同样的事情。还没有太多的运气:(
我通过在#gstreamer IRC 频道上询问获得了以下链接:http: //cgit.freedesktop.org/gstreamer/gstreamer/tree/docs/design/part-dynamic.txt
也许是对正确方向的提示。
当您找到其他文档时请告诉我...
我没有在multifilesink或output-selector之上为Gstreamer 0.10创建完整清晰的多路复用文件。
在分析了许多替代方案之后,我的解决方案将以下示例作为代码库:http: //gstreamer.freedesktop.org/data/doc/gstreamer/head/manual/html/section-dynamic-pipelines.html
探测函数 API 已从 0.10 更改为 1.0,但下面的解决方案可以每 N 秒创建不同的 MP4 文件:
static GstElement *pipeline = NULL;
// Pipeline -> src -> dynamic pipeline
// Pipeline -> capsfilter(f264file) -> mp4mux(mux0) -> filesink(fsink0)
// Pipeline -> elem_before||blockpad| -> |elem_cur_sinkpad||elem_cur||elem_cur_srcpad -> |elem_after_sinkpad||elem_after
static gulong probe_id; // probe ID
static GstElement *elem_before; // SRC of dynamic pipeline
static GstElement *elem_after; // SINK of dynamic pipeline
static GstElement *elem_cur; // Main element of dynamic pipeline
static GstPad *blockpad; // SRC pad to be blocked
static GstPad *elem_cur_srcpad; // SRC pad where check EOS
static GstPad *elem_cur_sinkpad; // SINK of dynamic pipeline
static GstPad *elem_after_sinkpad; // SINK of SINK element
// Last Buffer Timestamp
static GstClockTime last_ts = 0;
typedef enum {
NO_NEW_FILE, // Keep current file destination
NEW_FILE, // Switch file destination
} NewFileStatus;
static NewFileStatus newfile = NO_NEW_FILE; // Switch File Flag
static int counter = 1; // Index filename
// EOS listener to switch to other file destination
static gboolean
event_probe_cb (GstPad * pad, GstEvent * event, gpointer user_data)
{
g_print ("INSIDE event_probe_cb:%d type:%s\n",probe_id,
GST_EVENT_TYPE (event)==GST_EVENT_EOS?"EOS":GST_EVENT_TYPE (event)==GST_EVENT_NEWSEGMENT?"NEWSEGMENT":"OTHER");
if (GST_EVENT_TYPE (event) != GST_EVENT_EOS)
{
// Push the event in the pipe flow (false DROP)
return TRUE;
}
// remove the probe first
gst_pad_remove_event_probe (pad, probe_id);
gst_object_unref (elem_cur_srcpad);
gst_object_unref (elem_after_sinkpad);
gst_element_release_request_pad(elem_cur, elem_cur_sinkpad);
gst_element_set_state (elem_cur, GST_STATE_NULL);
gst_element_set_state (elem_after, GST_STATE_NULL);
// remove unlinks automatically
GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, elem_cur);
gst_bin_remove (GST_BIN (pipeline), elem_cur);
GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, elem_after);
gst_bin_remove (GST_BIN (pipeline), elem_after);
GstElement * mux0 = gst_element_factory_make("mp4mux", "mux0");
GstElement * fsink0 = gst_element_factory_make("filesink", "fsink0");
elem_cur = mux0;
elem_after = fsink0;
if(!mux0 || !fsink0)
{
printf("mising elements\n");
}
GST_DEBUG_OBJECT (pipeline, "adding %" GST_PTR_FORMAT, elem_cur);
gst_bin_add (GST_BIN (pipeline), elem_cur);
GST_DEBUG_OBJECT (pipeline, "adding %" GST_PTR_FORMAT, elem_after);
gst_bin_add (GST_BIN (pipeline), elem_after);
char buffer[128];
sprintf(buffer, "test_%d.mp4", counter++);
g_print ("File Switching %s\n", buffer);
g_object_set(G_OBJECT(elem_after), "location", buffer, NULL);
GST_DEBUG_OBJECT (pipeline, "linking..");
elem_cur_srcpad = gst_element_get_static_pad (elem_cur, "src");
elem_cur_sinkpad = gst_element_get_request_pad (elem_cur, "video_%d");
elem_after_sinkpad = gst_element_get_static_pad (elem_after, "sink");
if(gst_pad_link(blockpad, elem_cur_sinkpad) != GST_PAD_LINK_OK)
{
printf("linking output 0 failed\n");
return -1;
}
if(gst_pad_link(elem_cur_srcpad, elem_after_sinkpad) != GST_PAD_LINK_OK)
{
printf("linking output 1 failed\n");
return -1;
}
g_print ("Moving to PLAYING\n");
gst_element_set_state (elem_cur, GST_STATE_PLAYING);
gst_element_set_state (elem_after, GST_STATE_PLAYING);
GST_DEBUG_OBJECT (pipeline, "done");
newfile = NO_NEW_FILE;
// Push the event in the pipe flow (false DROP)
return TRUE;
}
// Check if Buffer contains a KEY FRAME
static gboolean
is_sync_frame (GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT))
{
return FALSE;
}
else if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_IN_CAPS))
{
return TRUE;
}
}
// Block source and launch EOS to MUXER to achieve a full muxed file
static gboolean
pad_probe_cb (GstPad * pad, GstBuffer * buffer, gpointer user_data)
{
g_print ("\n\tINSIDE pad_probe_cb:%d %s %s\n",probe_id, (newfile?"newfile":"thesame"),
(is_sync_frame (buffer)?"KEYframe":"frame"));
GST_DEBUG_OBJECT (pad, "pad is blocked now");
last_ts = GST_BUFFER_TIMESTAMP(buffer);
if(!GST_CLOCK_TIME_IS_VALID(last_ts))
last_ts=0;
if((newfile==NO_NEW_FILE) || !is_sync_frame (buffer))
return TRUE;
/* remove the probe first */
gst_pad_remove_buffer_probe (pad, probe_id);
/* install new probe for EOS */
probe_id = gst_pad_add_event_probe (elem_after_sinkpad, G_CALLBACK(event_probe_cb), user_data);
/* push EOS into the element, the probe will be fired when the
* EOS leaves the effect and it has thus drained all of its data */
gst_pad_send_event (elem_cur_sinkpad, gst_event_new_eos ());
// Wait til the EOS have been processed the Buffer with the Key frame will be the FIRST
while(newfile != NO_NEW_FILE)
Sleep(1);
// Push the buffer in the pipe flow (false DROP)
return TRUE;
}
// this timeout is periodically run as part of the mainloop
static gboolean timeout (gpointer user_data)
{
g_print ("TIMEOUT\n");
if(!playing)
return false;
newfile = NEW_FILE;
/* install new probe for Keyframe and New File */
probe_id = gst_pad_add_buffer_probe (blockpad, G_CALLBACK(pad_probe_cb), pipeline);
return true;
}