0

我正在尝试编写一种算法,用于下载视频直播流。具体来说,我尝试获取的相应流基于动态.m3u8播放列表文件,该文件定期提供新视频文件的 URI。主要目标是将这些单独的媒体文件组合成一个连贯的 InputStream。
我实际上成功地让它工作:我定期检查出现在播放列表中的新媒体文件,并将它们的 HTTP 流传递给自定义 InputStream 实现,即InputStreamChain。既然是直播,我认为它是无止境的,至少目前是这样。因此,我想要我InputStreamChainread()永远不要发送-1。不幸的是,它做到了;每次当所有排队的媒体流被消耗时,InputStreamChain结束。相反,我希望它阻止 I/O,直到新的媒体文件到达。所以,我想出了一个可行的解决方案:我调整了read()循环方法,直到有一个新的流可用(aTimerTask将提供新文件)。在循环中,我内置了一个Thread.sleep(),以减少 CPU 负载:

public int read() throws IOException {  
    int bit = current.read();  
    if (bit == -1 && streams.size() > 0) {  
        // left out due to lacking relevance
    } else if(bit == -1 && streams.size() == 0) {
        while(streams.size() == 0) {
            Thread.currentThread().sleep(50);
        }
        return read();
    }
    return bit;
}

虽然它似乎有效,但我有一种感觉,我没有按照我应该做的那样做。我也尝试过Lock与 with 一起使用Condition.await(),但是当我TimerTask尝试触发时Condition.signal(),它只是抛出了一个IllegalMonitorStateException.
这就是我问这个问题的原因:

我应该以什么方式延迟/阻止 InputStream 的read()方法,尤其是在我的场景中?

编辑:

为了完整起见,我也将提供我失败Lock的方法:

private ReentrantLock ioLock;
private Condition ioCond;
private boolean waitingForStream = false;

public InputStreamChain() {
    ioLock = new ReentrantLock();
    ioCond = ioLock.newCondition();
}

public synchronized InputStreamChain addInputStream(final InputStream stream) {
    streams.addLast(stream);
    if (current == null) {
        current = streams.removeFirst();
    }
    if(waitingForStream) {
        ioCond.signal();
    }
    return this;
}

public int read() throws IOException {
    int bit = current.read();
    if (bit == -1 && streams.size() > 0) {
        // do stuff
    } else if(bit == -1) {
        waitingForStream = true;
        ioLock.lock();
        try {
            ioCond.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            waitingForStream = false;
            ioLock.unlock();
        }
        return read();
    }
    return bit;
}
4

1 回答 1

1

可能您没有使用同步块。这是一个例子:

class MyReader
{
     public int read() throws IOException {  
        int bit = current.read();  
        if (bit == -1 && streams.size() > 0) {  
            // left out due to lacking relevance
        } else if(bit == -1 && streams.size() == 0) {
            waitForNextStream();
            return read();
        }
        return bit;
    }

    private synchronized void waitForNextStream()
    {
        // TODO add close handling, set current here
        while (streams.isEmpty())
        {
            wait();
        }
    }


    public synchronized void addNextStream(InputStream is)
    {
        streams.add(is);
        notify();
    }
}
于 2013-05-31T12:00:39.117 回答