3

今天我决定将我的基于 的小脚本gst-launch转换为真正的 Python/GStreamer 应用程序,以添加一些功能。

由于. _ shout2send_filesinktee

shout2send由于网络问题,有时可能会停止。我想每隔 N 秒重新启动一次这个元素,直到连接恢复,而不停止管道,因为本地音频文件不应该受到网络条件的影响。

这是我尝试过的:

  1. 网络错误后一秒停止/启动管道(结果:流工作,本地文件被截断)
  2. 取消链接tee,将shout2send状态设置为NULL并将其从管道中删除(结果:GStreamer 严重错误,如Trying to dispose element ... but it is in PLAYING instead of the NULL state
  3. 试图了解在这种情况下如何使用焊盘(结果:与上面相同,但涉及更多代码)

我应该怎么办?

这是我的代码的样子:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import GLib
from gi.repository import Gst
# [...]

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pass # TODO: restart the element
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

pipeline = Gst.Pipeline()
message_bus = pipeline.get_bus()
message_bus.add_signal_watch()
message_bus.connect('message', message_handler)

# [...]
tee.link(queue0)
queue0.link(filesink)
tee.link(queue1)
queue1.link(shout2send)

更新(2015 年 9 月 12 日):添加了非工作代码 + 日志

我尝试遵循GStreamer doc 中的“动态更改管道”,但我的代码不起作用。

def event_probe(pad, info, *args):
    Gst.Pad.remove_probe(pad, info)
    queue1.unlink(shout2send)
    tee.unlink(queue1)
    pipeline.remove(shout2send)
    pipeline.remove(queue1)
    return Gst.PadProbeReturn.OK

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pad = queue1.get_static_pad('src')
            pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

这是我在运行脚本GST_DEBUG=3并在流式传输时重新启动 Icecast 时看到的内容:

[...]
0:00:02.142033258  5462 0x55e414d900a0 WARN                  shout2 gstshout2.c:674:gst_shout2send_render:<shout2send> error: shout_send() failed: Socket error
0:00:02.658137998  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: Internal data flow error.
0:00:02.658169752  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: streaming task paused, reason error (-5)
(GLib.Error('Internal data flow error.', 'gst-stream-error-quark', 1), 'gstbasesrc.c(2943): gst_base_src_loop (): /GstPipeline:pipeline0/GstPulseSrc:pulsesrc:\nstreaming task paused, reason error (-5)')
0:00:02.658628129  5462 0x7f6ba8002a30 WARN                audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
4

1 回答 1

4

感谢otopolsky的评论我做到了:)

我做错了什么:

  1. 元素必须设置为NULL:这很重要
  2. oggmux必须tee在两个子管道上保留在 之后:否则 Icecast 将列出流而无法为其提供服务。做同样的事情opusenc

建议:

  1. 不必取消链接您不需要的每个元素:只需在需要的地方断开即可
  2. 没有必要从管道中删除您不需要的每个元素:如果您想重用它们,请保留它们

最终代码(重新连接正常工作且独立于本地编码/录制):

def event_probe2(pad, info, *args):
    Gst.Pad.remove_probe(pad, info.id)
    tee.link(opusenc1)
    opusenc1.set_state(Gst.State.PLAYING)
    oggmux1.set_state(Gst.State.PLAYING)
    queue1.set_state(Gst.State.PLAYING)
    shout2send.set_state(Gst.State.PLAYING)
    return Gst.PadProbeReturn.OK

def reconnect():
    pad = tee.get_static_pad('src_1')
    pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)

def event_probe(pad, info, *args):
    Gst.Pad.remove_probe(pad, info.id)
    tee.unlink(opusenc1)
    opusenc1.set_state(Gst.State.NULL)
    oggmux1.set_state(Gst.State.NULL)
    queue1.set_state(Gst.State.NULL)
    shout2send.set_state(Gst.State.NULL)
    GLib.timeout_add_seconds(interval, reconnect)
    return Gst.PadProbeReturn.OK

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pad = tee.get_static_pad('src_1')
            pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

小问题:

  1. 我使用tee.get_static_pad('src_1'),但我想我可以在某处获得 src id,而不是使用固定值
  2. 可能整个事情可以写成更好的形式(但这是我第一个使用 Python+Gstreamer 的程序,它可以工作,所以我很好)
  3. 为了避免数据丢失,我pipeline.set_state(Gst.State.NULL)在一秒钟后打电话pipeline.send_event(Gst.Event.new_eos()),但我仍然收到类似的消息WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment

代码:https ://github.com/ViGLug/libre-streaming

于 2015-12-09T17:36:50.900 回答