13

gstreamer 文档中有很多关于构建和运行静态管道的示例。但是,在实时管道中更改/重新链接元素并不多——而媒体实际上是在流动的。这绝对是可能的,所以问题是:

  1. 在尝试此操作之前,我应该了解哪些 gstreamer 概念/机制?
  2. 有什么需要注意的陷阱吗?
  3. 什么是基本程序,或者一个很好的例子?

接受的答案将是勺子喂养的,全面的,并带有源代码

4

5 回答 5

14
  1. 我最喜欢的理解链接(和动态链接)的“概念”是将管道视为真正的管道,水流过它。一旦你这样做了,有些事情会变得非常明显。比如,“你在连接元素之前将源设置为播放吗?”,变成“你在连接软管之前打开水吗?”,它本身就是一个答案。使用动态链接更是如此,您如何确保没有水“泄漏”(这很糟糕,GStreamer 中的“泄漏”相当于获得 GST_FLOW_NOT_LINKED,并且会停止您的来源和乐趣)或被堵塞(可以导致丢包或拥塞)。

  2. 是的。许多。有一点免责声明,我目前仍在使用 0.10,其中一些可能已在 1.0 中修复,不幸的是,使用 GStreamer 0.10 进行动态链接和取消链接非常非常困难。让我解释一下:假设您正在使用 Tee,并且您想取消链接一个分支。您将从释放 Tees srcpad 开始(不要介意取消链接,这是释放垫的一部分),现在您应该能够安全地拆除该垫下游的元素。(相当于水是你在三通之后关闭了一个阀门,现在应该能够在阀门之后拆除管道,除非你想弄湿,否则你不会在不先关闭阀门的情况下开始拆除管道......)这个大部分时间都可以工作,但这里有一场比赛。因为在你释放垫子后,

  3. 我对此进行了很多实验,发现如果您需要稳定性,并且偶尔崩溃/冻结不是一种选择,您需要一个可以作为动态安全网的元素。一个元素,它将保证在您释放/取消链接后,pad 上绝对不会发生任何活动。做到这一点的唯一方法是打破另一个 GStreamer 在持有锁时不推送的范例:您需要在推送/发送事件/填充分配时持有锁。前段时间我在这里做了这样的事情. (当然,测试用例是最重要的事情,因为它允许您测试自己/其他元素的安全性)您还可以想象一个无锁元素可以吞下所有错误的 FlowReturns,并为它的上游,但是您需要绝对确定您的所有下游元素都将“在关闭时收到推送或填充分配”-安全,因为您的元素将无法保证一旦“停止流动” (release/unlink) 已经执行了,一点滴也挤不过去。

当然,您必须正确看待其中的一些。我所说的这些可怕的竞争条件的窗口实际上非常非常小,并且可能只会在您运行程序时每 1000 次或 10.000 次发生一次。但对于专业应用来说,这当然是不可接受的。我做了一个演讲,在这里我介绍了一些这些东西

于 2013-07-06T20:07:13.417 回答
8

我倾向于根据情况使用输出选择器输入选择器箱,而不是焊盘阻塞的复杂性(我在另一篇文章http://gstreamer-devel.966125.n4.nabble.com/Dynamically-adding-中回答了焊盘阻塞and-removing-branches-of-a-tee-td973635.html#a4656812)。并在不使用时将选择器连接到 fakesrc 或 fakesink bin。在下面的示例中,如果使用 GTK,则可以将行替换为g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel);并将gtk_toggle_button当前所有代码放入switch_cb函数转换为切换按钮回调函数。在此代码中,可以在两个图像接收器之间切换。我会用 fakesink 替换一个图像接收器以保持管道运行,以防我将来想添加一个带有文件接收器的 tee,我想在其中录制视频但为播放器提供打开(图像接收器上的选择器)/关闭(选择器)的选项在 fakesink 上)显示。这允许在运行时使用选择器添加/删除垃圾箱。

#include <gst/gst.h>

#define SWITCH_TIMEOUT 1000
#define NUM_VIDEO_BUFFERS 500

static GMainLoop *loop;

/* Output selector src pads */
static GstPad *osel_src1 = NULL;
static GstPad *osel_src2 = NULL;

static gboolean
my_bus_callback (GstBus * bus, GstMessage * message, gpointer data)
{
  g_print ("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));

  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_ERROR:{
      GError *err;
      gchar *debug;

      gst_message_parse_error (message, &err, &debug);
      g_print ("Error: %s\n", err->message);
      g_error_free (err);
      g_free (debug);

      g_main_loop_quit (loop);
      break;
    }
    case GST_MESSAGE_EOS:
      /* end-of-stream */
      g_main_loop_quit (loop);
      break;
    default:
      /* unhandled message */
      break;
  }
  /* we want to be notified again the next time there is a message
   * on the bus, so returning TRUE (FALSE means we want to stop watching
   * for messages on the bus and our callback should not be called again)
   */
  return TRUE;
}

static gboolean
switch_cb (gpointer user_data)
{
  GstElement *sel = GST_ELEMENT (user_data);
  GstPad *old_pad, *new_pad = NULL;

  g_object_get (G_OBJECT (sel), "active-pad", &old_pad, NULL);

  if (old_pad == osel_src1)
    new_pad = osel_src2;
  else
    new_pad = osel_src1;

  g_object_set (G_OBJECT (sel), "active-pad", new_pad, NULL);

  g_print ("switched from %s:%s to %s:%s\n", GST_DEBUG_PAD_NAME (old_pad),
      GST_DEBUG_PAD_NAME (new_pad));

  gst_object_unref (old_pad);

  return TRUE;

}

gint
main (gint argc, gchar * argv[])
{
  GstElement *pipeline, *src, *toverlay, *osel, *sink1, *sink2, *convert;
  GstPad *sinkpad1;
  GstPad *sinkpad2;
  GstBus *bus;

  /* init GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);

  /* create elements */
  pipeline = gst_element_factory_make ("pipeline", "pipeline");
  src = gst_element_factory_make ("videotestsrc", "src");
  toverlay = gst_element_factory_make ("timeoverlay", "timeoverlay");
  osel = gst_element_factory_make ("output-selector", "osel");
  convert = gst_element_factory_make ("ffmpegcolorspace", "convert");
  sink1 = gst_element_factory_make ("xvimagesink", "sink1");
  sink2 = gst_element_factory_make ("ximagesink", "sink2");

  if (!pipeline || !src || !toverlay || !osel || !convert || !sink1 || !sink2) {
    g_print ("missing element\n");
    return -1;
  }

  /* add them to bin */
  gst_bin_add_many (GST_BIN (pipeline), src, toverlay, osel, convert, sink1,
      sink2, NULL);

  /* set properties */
  g_object_set (G_OBJECT (src), "is-live", TRUE, NULL);
  g_object_set (G_OBJECT (src), "do-timestamp", TRUE, NULL);
  g_object_set (G_OBJECT (src), "num-buffers", NUM_VIDEO_BUFFERS, NULL);
  g_object_set (G_OBJECT (sink1), "sync", FALSE, "async", FALSE, NULL);
  g_object_set (G_OBJECT (sink2), "sync", FALSE, "async", FALSE, NULL);
  g_object_set (G_OBJECT (osel), "resend-latest", TRUE, NULL);

  /* link src ! timeoverlay ! osel */
  if (!gst_element_link_many (src, toverlay, osel, NULL)) {
    g_print ("linking failed\n");
    return -1;
  }

  /* link output 1 */
  sinkpad1 = gst_element_get_static_pad (sink1, "sink");
  osel_src1 = gst_element_get_request_pad (osel, "src%d");
  if (gst_pad_link (osel_src1, sinkpad1) != GST_PAD_LINK_OK) {
    g_print ("linking output 1 failed\n");
    return -1;
  }
  gst_object_unref (sinkpad1);

  /* link output 2 */
  sinkpad2 = gst_element_get_static_pad (convert, "sink");
  osel_src2 = gst_element_get_request_pad (osel, "src%d");
  if (gst_pad_link (osel_src2, sinkpad2) != GST_PAD_LINK_OK) {
    g_print ("linking output 2 failed\n");
    return -1;
  }
  gst_object_unref (sinkpad2);

  if (!gst_element_link (convert, sink2)) {
    g_print ("linking output 2 failed\n");
    return -1;
  }

  /* add switch callback */
  g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel);

  /* change to playing */
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  gst_bus_add_watch (bus, my_bus_callback, loop);
  gst_object_unref (bus);

  gst_element_set_state (pipeline, GST_STATE_PLAYING);

  /* now run */
  g_main_loop_run (loop);

  /* also clean up */
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_element_release_request_pad (osel, osel_src1);
  gst_element_release_request_pad (osel, osel_src2);
  gst_object_unref (GST_OBJECT (pipeline));

  return 0;
}
于 2012-11-14T21:24:15.133 回答
3

当我寻找动态修改任何 gstreamer 管道时,这篇文章首先出现。找到了一些链接,但它现在在手册中有很好的记录:http: //gstreamer.freedesktop.org/data/doc/gstreamer/head/manual/html/section-dynamic-pipelines.html

于 2012-11-07T13:19:04.440 回答
2

其实我也在尝试做同样的事情。还没有太多的运气:(

我通过在#gstreamer IRC 频道上询问获得了以下链接:http: //cgit.freedesktop.org/gstreamer/gstreamer/tree/docs/design/part-dynamic.txt

也许是对正确方向的提示。

当您找到其他文档时请告诉我...

于 2010-06-21T00:14:35.483 回答
0

我没有在multifilesinkoutput-selector之上为Gstreamer 0.10创建完整清晰的多路复用文件

在分析了许多替代方案之后,我的解决方案将以下示例作为代码库:http: //gstreamer.freedesktop.org/data/doc/gstreamer/head/manual/html/section-dynamic-pipelines.html

探测函数 API 已从 0.10 更改为 1.0,但下面的解决方案可以每 N 秒创建不同的 MP4 文件:

static GstElement *pipeline = NULL;

// Pipeline -> src                    -> dynamic pipeline
// Pipeline -> capsfilter(f264file)   -> mp4mux(mux0)                                 -> filesink(fsink0)
// Pipeline -> elem_before||blockpad| -> |elem_cur_sinkpad||elem_cur||elem_cur_srcpad -> |elem_after_sinkpad||elem_after
static gulong probe_id;             // probe ID
static GstElement *elem_before;     // SRC of dynamic pipeline
static GstElement *elem_after;      // SINK of dynamic pipeline
static GstElement *elem_cur;        // Main element of dynamic pipeline
static GstPad *blockpad;            // SRC pad to be blocked
static GstPad *elem_cur_srcpad;     // SRC pad where check EOS
static GstPad *elem_cur_sinkpad;    // SINK of dynamic pipeline
static GstPad *elem_after_sinkpad;  // SINK of SINK element

// Last Buffer Timestamp
static GstClockTime last_ts = 0;

typedef enum {
  NO_NEW_FILE,  // Keep current file destination
  NEW_FILE,     // Switch file destination
} NewFileStatus;
static NewFileStatus newfile = NO_NEW_FILE; // Switch File Flag

static int counter = 1; // Index filename

// EOS listener to switch to other file destination
static gboolean
event_probe_cb (GstPad * pad, GstEvent * event, gpointer user_data)
{
  g_print ("INSIDE event_probe_cb:%d type:%s\n",probe_id,
      GST_EVENT_TYPE (event)==GST_EVENT_EOS?"EOS":GST_EVENT_TYPE (event)==GST_EVENT_NEWSEGMENT?"NEWSEGMENT":"OTHER");

  if (GST_EVENT_TYPE (event) != GST_EVENT_EOS)
  {
    // Push the event in the pipe flow (false DROP)
    return TRUE;
  }

  // remove the probe first
  gst_pad_remove_event_probe (pad, probe_id);

  gst_object_unref (elem_cur_srcpad);
  gst_object_unref (elem_after_sinkpad);
  gst_element_release_request_pad(elem_cur, elem_cur_sinkpad);

  gst_element_set_state (elem_cur, GST_STATE_NULL);
  gst_element_set_state (elem_after, GST_STATE_NULL);

  // remove unlinks automatically
  GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, elem_cur);
  gst_bin_remove (GST_BIN (pipeline), elem_cur);
  GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, elem_after);
  gst_bin_remove (GST_BIN (pipeline), elem_after);

  GstElement * mux0 = gst_element_factory_make("mp4mux", "mux0");
  GstElement * fsink0 = gst_element_factory_make("filesink", "fsink0");
  elem_cur = mux0;
  elem_after = fsink0;

  if(!mux0 || !fsink0)
  {
    printf("mising elements\n");
  }

  GST_DEBUG_OBJECT (pipeline, "adding   %" GST_PTR_FORMAT, elem_cur);
  gst_bin_add (GST_BIN (pipeline), elem_cur);
  GST_DEBUG_OBJECT (pipeline, "adding   %" GST_PTR_FORMAT, elem_after);
  gst_bin_add (GST_BIN (pipeline), elem_after);

  char buffer[128];
  sprintf(buffer, "test_%d.mp4", counter++);
  g_print ("File Switching %s\n", buffer);
  g_object_set(G_OBJECT(elem_after), "location", buffer, NULL);

  GST_DEBUG_OBJECT (pipeline, "linking..");
  elem_cur_srcpad = gst_element_get_static_pad (elem_cur, "src");
  elem_cur_sinkpad = gst_element_get_request_pad (elem_cur, "video_%d");
  elem_after_sinkpad = gst_element_get_static_pad (elem_after, "sink");

  if(gst_pad_link(blockpad, elem_cur_sinkpad) != GST_PAD_LINK_OK)
  {
    printf("linking output 0 failed\n");
    return -1;
  }
  if(gst_pad_link(elem_cur_srcpad, elem_after_sinkpad) != GST_PAD_LINK_OK)
  {
    printf("linking output 1 failed\n");
    return -1;
  }

  g_print ("Moving to PLAYING\n");
  gst_element_set_state (elem_cur, GST_STATE_PLAYING);
  gst_element_set_state (elem_after, GST_STATE_PLAYING);

  GST_DEBUG_OBJECT (pipeline, "done");

  newfile = NO_NEW_FILE;
  // Push the event in the pipe flow (false DROP)
  return TRUE;
}

// Check if Buffer contains a KEY FRAME
static gboolean
is_sync_frame (GstBuffer * buffer)
{
  if (GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)) 
  {
    return FALSE;
  }
  else if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_IN_CAPS)) 
  {
    return TRUE;
  }
}

// Block source and launch EOS to MUXER to achieve a full muxed file
static gboolean
pad_probe_cb (GstPad * pad, GstBuffer * buffer, gpointer user_data)
{
  g_print ("\n\tINSIDE pad_probe_cb:%d %s %s\n",probe_id, (newfile?"newfile":"thesame"), 
      (is_sync_frame (buffer)?"KEYframe":"frame"));
  GST_DEBUG_OBJECT (pad, "pad is blocked now");

  last_ts = GST_BUFFER_TIMESTAMP(buffer);
  if(!GST_CLOCK_TIME_IS_VALID(last_ts))
      last_ts=0;

  if((newfile==NO_NEW_FILE) || !is_sync_frame (buffer))
    return TRUE;

  /* remove the probe first */
  gst_pad_remove_buffer_probe (pad, probe_id);

  /* install new probe for EOS */
  probe_id = gst_pad_add_event_probe (elem_after_sinkpad, G_CALLBACK(event_probe_cb), user_data);

  /* push EOS into the element, the probe will be fired when the
   * EOS leaves the effect and it has thus drained all of its data */
  gst_pad_send_event (elem_cur_sinkpad, gst_event_new_eos ());

  // Wait til the EOS have been processed the Buffer with the Key frame will be the FIRST
  while(newfile != NO_NEW_FILE)
      Sleep(1);

  // Push the buffer in the pipe flow (false DROP)
  return TRUE;
}

// this timeout is periodically run as part of the mainloop
static gboolean timeout (gpointer user_data)
{
  g_print ("TIMEOUT\n");
  if(!playing)
      return false;
  newfile = NEW_FILE;
  /* install new probe for Keyframe and New File */
  probe_id = gst_pad_add_buffer_probe (blockpad, G_CALLBACK(pad_probe_cb), pipeline);
  return true;
}

于 2013-06-05T10:00:46.647 回答