我找到了一些关于如何测试生产者的资源,但是我找不到任何东西来显示如何测试消费者。
在生产者中,我创建了一个虚拟消费者并且一切正常,但是在消费者中我正在努力进行测试。
defmodule DataProducer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
end
# {:queue.new, demand, size}
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand + 1)
# Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
{:noreply, events, (state + demand)}
end
end
生产者测试:
defmodule DataProducerTest do
use ExUnit.Case
test "check the results" do
{:ok, stage} = DataProducer.start_link([])
{:ok, _cons} = TestConsumer.start_link(stage)
assert_receive {:received, events}
GenStage.stop(stage)
end
end
defmodule TestConsumer do
def start_link(producer) do
GenStage.start_link(__MODULE__, {producer, self()})
end
def init({producer, owner}) do
{:consumer, owner, subscribe_to: [producer]}
end
def handle_events(events, _from, owner) do
send(owner, {:received, events})
{:noreply, [], owner}
end
end
和消费者:
defmodule DataConsumer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, :any_state)
end
def init(state) do
{:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
def handle_events(events, _from, state) do
for event <- events do
# :timer.sleep(250)
Logger.info inspect( {self(), event, state} )
end
{:noreply, [], state}
end
end
提前谢谢你。