2

我找到了一些关于如何测试生产者的资源,但是我找不到任何东西来显示如何测试消费者。

在生产者中,我创建了一个虚拟消费者并且一切正常,但是在消费者中我正在努力进行测试。

defmodule DataProducer do
      use GenStage

      def start_link([]) do
        GenStage.start_link(__MODULE__, 0, name: __MODULE__)
      end

      # {:queue.new, demand, size}
      def init(counter) do
        {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
      end

      def handle_demand(demand, state) do 
        events = Enum.to_list(state..state + demand + 1)
        # Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
        {:noreply, events, (state + demand)}
      end
    end

生产者测试:

 defmodule DataProducerTest do
      use ExUnit.Case

      test "check the results" do
        {:ok, stage} = DataProducer.start_link([])
        {:ok, _cons} = TestConsumer.start_link(stage)
        assert_receive {:received, events}
        GenStage.stop(stage)
      end

    end

    defmodule TestConsumer do
      def start_link(producer) do
        GenStage.start_link(__MODULE__, {producer, self()})
      end
      def init({producer, owner}) do
        {:consumer, owner, subscribe_to: [producer]}
      end
      def handle_events(events, _from, owner) do
        send(owner, {:received, events})
        {:noreply, [], owner}
      end
    end

和消费者:

defmodule DataConsumer do
  use GenStage
  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end
  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
  def handle_events(events, _from, state) do
    for event <- events do
      # :timer.sleep(250)
      Logger.info inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end
end

提前谢谢你。

4

2 回答 2

1

没有理由在ex_mock这里使用。如果你让生产者成为你的消费者订阅这样的论点,那就容易多了:

defmodule DataConsumer do
  use GenStage

  def start_link(producer) do
    GenStage.start_link(__MODULE__, producer)
  end

  def init(producer) do
    {:consumer, state, subscribe_to: [{producer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
end

然后你可以有一个TestProducer

defmodule TestProducer
  use GenStage

  def notify(pid, event) do
    GenServer.cast(pid, {:notify, event})
  end

  def start_link do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end

  def handle_cast({:notify, event}, state) do
    {:noreply, [event], state}
  end
end

并在您的测试中订阅它并断言预期的结果:

defmodule DataConsumerTest do
  use ExUnit.Case

  test "consumes events" do
    {:ok, pid} = TestProducer.start_link()
    DataConsumer.start_link(pid)
    TestProducer.notify(%{data: :event_data})

    # assert thing you expected to happen happens
  end
end

TLDR;如果您在代码库中与许多不同的消费者一起工作,那么手动/测试事件生产者是必须的。消费者并不真正关心生产者做什么来产生事件,只关心它可以订阅和消费它们。因此,您的测试只需要确保消费者能够接收来自任何生产者的事件,并且您可以向他们发送它在测试中寻找的正确事件。

于 2018-06-13T17:42:39.933 回答
0

在对消费者的测试中:

 test "should behave like consumer" do
    {:ok, producer} = DummyProducer.start_link(1)
    {:ok, consumer} = Consumer.start_link(producer)
    Process.register self, :test
    assert_receive {:called_back, 10}
  end

现在DummyProducer

defmodule DummyProducer do
  use GenStage

  def start_link(demand) do
    GenStage.start_link(__MODULE__, demand)
  end

  def init(demand) do
    {:producer, demand}
  end

  def handle_demand(demand, counter) when demand > 0 do
    events = Enum.to_list(counter..counter+demand-1)
    Process.send_after(self(), {:stop, demand}, 1)
    {:noreply, events, demand + counter}
  end

  def handle_info({:stop, demand}, state) do
    send :test, {:called_back, demand}
    {:stop, :normal, demand}
  end
end

我认为,

测试消费者的重点是检查消费者是否可以发送需求并坚持订阅中分配的最大需求。

于 2018-06-17T13:05:41.740 回答