正如在评论中所讨论的,我的第一次尝试有一些性能问题,并且不适用于具有副作用的流,例如IO流。我走上深入挖掘流库的时间,终于想出了这个解决方案:
defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end
# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end
# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end
# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end
# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end
# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end
这可以看第一个令人生畏,但实际上它并不难。我会尽力为你分解它,但是用这样一个完整的例子很难。
让我们从一个更简单的例子开始:代替无限循环重复给定的值。为了发射流,我们可以返回一个将累加器和函数作为参数的函数。为了发出一个值,我们使用两个参数调用该函数:要发射的值和累加器。 acc
累加器是一个由命令(:cont
,:suspend
或:halt
)组成的元组,并告诉我们消费者希望我们做什么;我们需要返回的结果取决于操作。如果流应该被挂起,我们返回原子:suspended
的三元素元组,累加器和枚举继续时将调用的函数(有时称为“继续”)。对于:halt
命令,我们只需返回{:halted, acc}
;对于:cont
,我们通过执行上述递归步骤来发出值。整个事情看起来像这样:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end
defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end
defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end
现在,这只是谜题的一部分。我们可以发射流,但是我们不处理流入流。再次,为了解释如何工作,构建一个更简单的例子是有意义的。在这里,我将构建一个函数,该函数接受一个枚举值,并为每个值暂停和重新发射。
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end
defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end
defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end
第一句设置的是被向下传递到do_passthrough
功能next
功能。它用于从传入流获取下一个值。内部使用的step函数定义我们暂停流中的每个项目。除了最后一个条款外,其余部分非常相似。在这里,我们使用{:cont, []}
来调用下一个函数来获得一个新值并通过case语句处理结果。如果有价值,我们会返回{:suspended, val, next}
,如果没有,则流停止,我们将其传递给消费者。
我希望澄清一些关于如何在Elixir中手动构建流的问题。不幸的是,有很多需要使用流的样板文件。如果您现在回到lookahead
实施,您会看到只有微小的差异,这是实际上有趣的部分。有两个附加参数:state
,其用于区分:buffer
和:emit
步骤,以及buffer
,其在初始缓冲步骤中预先填充有n+1
项目。在发射阶段,当前的缓冲区被发射,然后在每次迭代中向左移动。当输入流停止或我们的流直接停止时,我们完成了。
我在这里留下我原来的答案以供参考:
下面是一个使用Stream.unfold/2
根据您的规格发射值 的真实流的解决方案。这意味着您需要在前两个示例的末尾添加Enum.to_list
到 以获取实际值。
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end
defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end
一般的想法是我们保留前面迭代的buf。在每次迭代中,我们发出当前的buf,从流中取一个值并将其附加到buf的末尾。这一直重复,直到buf是空的。
实施例:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]
iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
我觉得你的最后一个例子应该返回三个元素的列表,对不对? – 2015-03-19 13:39:32
是的,@PatrickOscity,最后一个例子应该返回3个元素。我现在纠正了这个例子。 – Warren 2015-03-19 21:39:25