2015-03-19 53 views
6

我开始学习药剂,并遇到了一个难以解决的挑战。Enumerable/Stream展望未来

我试图创建需要Enumerable.t并返回另一个Enumerable.t,其中包括下一个n项目的函数。它与Enum.chunk(e,n,1,[])的行为稍有不同,因为数字迭代计数总是等于原始可枚举计数。我还需要支持流

@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t 

这是最好的文档测试语法内所示:

iex> lookahead(1..6, 1) |> Enum.to_list 
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]] 

iex> lookahead(1..4, 2) |> Enum.to_list 
[[1,2,3],[2,3,4],[3,4],[4]] 

iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5) 
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]] 

iex> {:ok,io} = StringIO.open("abcd") 
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list 
[["a","b","c"],["b","c","d"],["c","d"],["d"]] 

我已经调查落实Enumerable.t协议,但还没有完全理解Enumerable.reduce接口。

有没有简洁/优雅的方式来做到这一点?

我的用例是针对二进制流中的一个小的固定n值(1或2),因此优化版本的额外点。但是,为了学习Elixir,我对许多用例的解决方案感兴趣。性能很重要。我将在解决方案和发布的各种n值上运行一些基准。

基准更新 - 2015年4月8日

6级可行的解决方案已经公布。基准的细节可在https://gist.github.com/spitsw/fce5304ec6941578e454获得。基准在500个不同的n值列表上运行。

对于n = 1以下结果:

PatrickSuspend.lookahead 104.90 µs/op 
Warren.lookahead   174.00 µs/op 
PatrickChunk.lookahead  310.60 µs/op 
PatrickTransform.lookahead 357.00 µs/op 
Jose.lookahead    647.60 µs/op 
PatrickUnfold.lookahead  1484000.00 µs/op 

对于n = 50以下结果:

PatrickSuspend.lookahead 220.80 µs/op 
Warren.lookahead   320.60 µs/op 
PatrickTransform.lookahead 518.60 µs/op 
Jose.lookahead    1390.00 µs/op 
PatrickChunk.lookahead  3058.00 µs/op 
PatrickUnfold.lookahead  1345000.00 µs/op (faster than n=1) 
+0

我觉得你的最后一个例子应该返回三个元素的列表,对不对? – 2015-03-19 13:39:32

+0

是的,@PatrickOscity,最后一个例子应该返回3个元素。我现在纠正了这个例子。 – Warren 2015-03-19 21:39:25

回答

5

正如在评论中所讨论的,我的第一次尝试有一些性能问题,并且不适用于具有副作用的流,例如IO流。我走上深入挖掘流库的时间,终于想出了这个解决方案:

defmodule MyStream 
    def lookahead(enum, n) do 
    step = fn val, _acc -> {:suspend, val} end 
    next = &Enumerable.reduce(enum, &1, step) 
    &do_lookahead(n, :buffer, [], next, &1, &2) 
    end 

    # stream suspended 
    defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do 
    {:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)} 
    end 

    # stream halted 
    defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do 
    {:halted, acc} 
    end 

    # initial buffering 
    defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do 
    case next.({:cont, []}) do 
     {:suspended, val, next} -> 
     new_state = if length(buf) < n, do: :buffer, else: :emit 
     do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun) 
     {_, _} -> 
     do_lookahead(n, :emit, buf, next, {:cont, acc}, fun) 
    end 
    end 

    # emitting 
    defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do 
    case next.({:cont, []}) do 
     {:suspended, val, next} -> 
     do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun) 
     {_, _} -> 
     do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun) 
    end 
    end 

    # buffer empty, halting 
    defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do 
    {:halted, acc} 
    end 
end 

这可以看第一个令人生畏,但实际上它并不难。我会尽力为你分解它,但是用这样一个完整的例子很难。

让我们从一个更简单的例子开始:代替无限循环重复给定的值。为了发射流,我们可以返回一个将累加器和函数作为参数的函数。为了发出一个值,我们使用两个参数调用该函数:要发射的值和累加器。 acc累加器是一个由命令(:cont,:suspend:halt)组成的元组,并告诉我们消费者希望我们做什么;我们需要返回的结果取决于操作。如果流应该被挂起,我们返回原子:suspended的三元素元组,累加器和枚举继续时将调用的函数(有时称为“继续”)。对于:halt命令,我们只需返回{:halted, acc};对于:cont,我们通过执行上述递归步骤来发出值。整个事情看起来像这样:

defmodule MyStream do 
    def repeat(val) do 
    &do_repeat(val, &1, &2) 
    end 

    defp do_repeat(val, {:suspend, acc}, fun) do 
    {:suspended, acc, &do_repeat(val, &1, fun)} 
    end 

    defp do_repeat(_val, {:halt, acc}, _fun) do 
    {:halted, acc} 
    end 

    defp do_repeat(val, {:cont, acc}, fun) do 
    do_repeat(val, fun.(val, acc), fun) 
    end 
end 

现在,这只是谜题的一部分。我们可以发射流,但是我们不处理流入流。再次,为了解释如何工作,构建一个更简单的例子是有意义的。在这里,我将构建一个函数,该函数接受一个枚举值,并为每个值暂停和重新发射。

defmodule MyStream do 
    def passthrough(enum) do 
    step = fn val, _acc -> {:suspend, val} end 
    next = &Enumerable.reduce(enum, &1, step) 
    &do_passthrough(next, &1, &2) 
    end 

    defp do_passthrough(next, {:suspend, acc}, fun) do 
    {:suspended, acc, &do_passthrough(next, &1, fun)} 
    end 

    defp do_passthrough(_next, {:halt, acc}, _fun) do 
    {:halted, acc} 
    end 

    defp do_passthrough(next, {:cont, acc}, fun) do 
    case next.({:cont, []}) do 
     {:suspended, val, next} -> 
     do_passthrough(next, fun.(val, acc), fun) 
     {_, _} -> 
     {:halted, acc} 
    end 
    end 
end 

第一句设置的是被向下传递到do_passthrough功能next功能。它用于从传入流获取下一个值。内部使用的step函数定义我们暂停流中的每个项目。除了最后一个条款外,其余部分非常相似。在这里,我们使用{:cont, []}来调用下一个函数来获得一个新值并通过case语句处理结果。如果有价值,我们会返回{:suspended, val, next},如果没有,则流停止,我们将其传递给消费者。

我希望澄清一些关于如何在Elixir中手动构建流的问题。不幸的是,有很多需要使用流的样板文件。如果您现在回到lookahead实施,您会看到只有微小的差异,这是实际上有趣的部分。有两个附加参数:state,其用于区分:buffer:emit步骤,以及buffer,其在初始缓冲步骤中预先填充有n+1项目。在发射阶段,当前的缓冲区被发射,然后在每次迭代中向左移动。当输入流停止或我们的流直接停止时,我们完成了。


我在这里留下我原来的答案以供参考:

下面是一个使用Stream.unfold/2根据您的规格发射值 的真实流的解决方案。这意味着您需要在前两个示例的末尾添加Enum.to_list到 以获取实际值。

defmodule MyStream do 
    def lookahead(stream, n) do 
    Stream.unfold split(stream, n+1), fn 
     {[], stream} -> 
     nil 
     {[_ | buf] = current, stream} -> 
     {value, stream} = split(stream, 1) 
     {current, {buf ++ value, stream}} 
    end 
    end 

    defp split(stream, n) do 
    {Enum.take(stream, n), Stream.drop(stream, n)} 
    end 
end 

一般的想法是我们保留前面迭代的buf。在每次迭代中,我们发出当前的buf,从流中取一个值并将其附加到buf的末尾。这一直重复,直到buf是空的。

实施例:

iex> MyStream.lookahead(1..6, 1) |> Enum.to_list 
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]] 

iex> MyStream.lookahead(1..4, 2) |> Enum.to_list 
[[1, 2, 3], [2, 3, 4], [3, 4], [4]] 

iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5) 
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]] 
+0

这也是一个很好的解决方案!我认为,对Stream.drop/1的多次调用也会影响性能,因为每次向“drop”流添加越来越多的步骤时都会如此。也许基于你的一个解决方案是使用我们可以暂停流的事实。所以你得到你需要的物品并暂停它。 – 2015-03-19 21:50:54

+1

我试着针对IO.stream的上述解决方案。不幸的是IO.streams有一个副作用,在随后的调用中不会返回相同的项目。所以我认为拆分功能会导致项目被删除。我会在问题中添加一个示例。 – Warren 2015-03-20 00:21:45

+0

@JoséValim谢谢你的建议,我会研究这一点并尝试改进我的答案。在累加器中传递流时感觉错误。我想这意味着我必须完全手动构建它,或者是否存在帮助我构建低层流构建的函数? – 2015-03-20 05:22:22

1

您应该能够使用Stream.chunk/4

将看起来像这样:

defmodule MyMod do 
    def lookahead(enum, amount) do 
    Stream.chunk(enum, amount + 1, 1, []) 
    end 
end 

随着您的输入:

iex(2)> MyMod.lookahead(1..6, 1) |> Enum.to_list 
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]] 

iex(3)> MyMod.lookahead(1..4, 2) |> Enum.to_list 
[[1, 2, 3], [2, 3, 4], [3, 4]] 

iex(4)> Stream.cycle(1..3) |> MyMod.lookahead(1) |> Enum.take(5) 
[[1, 2], [2, 3], [3, 1], [1, 2], [2, 3]] 
+0

其实,好奇为什么第二个例子没有最后的[4] .... – hahuang65 2015-03-19 06:35:41

+0

嗯这可能不是一个完美的解决方案。我认为当使用Stream.chunk时,它会消耗Enumerable的其余部分... – hahuang65 2015-03-19 06:44:58

+0

填充仅在块中发生一次,而不是多次。所以你确实不会看到最后一个'[4]'。 – 2015-03-19 07:29:47

3

这里是一个低效率的实现这样的功能:

defmodule Lookahead do 
    def lookahead(enumerable, n) when n > 0 do 
    enumerable 
    |> Stream.chunk(n + 1, 1, []) 
    |> Stream.flat_map(fn list -> 
     length = length(list) 
     if length < n + 1 do 
      [list|Enum.scan(1..n-1, list, fn _, acc -> Enum.drop(acc, 1) end)] 
     else 
      [list] 
     end 
     end) 
    end 
end 

它建立在@ hahuang65实施的顶部,除了我们使用Stream.flat_map/2检查每个发射的长度项目,只要我们检测到发射的物品变短,就加上丢失的物品。

从头开始的手写实现会更快,因为我们不需要在每次迭代时调用length(list)。上面的实现可能没有问题,但如果n很小。如果n是固定的,你甚至可以明确地在生成的列表上模式匹配。

+0

这似乎运作良好。尽管如此,该函数在某些情况下确实会返回一些额外的空列表([])。例如lookahead([1,2],1)返回[[1,2],[2],[],[]]。我很享受每一个回应! – Warren 2015-03-20 00:55:12

1

以下解决方案使用Stream.resource和暂停Enumerable.reduce的能力。所有的例子都通过了。

总之,它使用Enumerable.reduce来建立一个列表。然后在每次迭代中暂挂减速器,删除列表的头部,并在列表的尾部添加最新的项目。最后,当reducer被完成或停止时,它会产生流的预告片。所有这些都使用Stream.resource进行协调。

如果使用FIFO队列而不是每个迭代的列表,这将更有效。

请任何简化,效率或错误

def Module 
    def lookahead(enum, n) when n >= 0 do 
    reducer = fn -> Enumerable.reduce(enum, {:cont, {0, []}}, fn 
     item, {c, list} when c < n -> {:cont, {c+1, list ++ [item]}} # Build up the first list 
     item, {c, list} when c == n -> {:suspend, {c+1, list ++ [item]}} # Suspend on first full list 
     item, {c, [_|list]} -> {:suspend, {c, list ++ [item]}} # Remove the first item and emit 
     end) 
    end 

    Stream.resource(reducer, 
     fn 
     {:suspended, {_, list} = acc , fun} -> {[list], fun.({:cont, acc})} 
     {:halted, _} = result -> lookahead_trail(n, result) # Emit the trailing items 
     {:done, _} = result -> lookahead_trail(n, result) # Emit the trailing items 
     end, 
     fn 
     {:suspended, acc, fun} -> fun.({:halt, acc}) # Ensure the reducer is halted after suspend 
     _ -> 
     end) 
    end 

    defp lookahead_trail(n, acc) do 
    case acc do 
     {action, {c, [_|rest]}} when c > n -> {[], {action, {c-1, rest}}} # List already emitted here 
     {action, {c, [_|rest] = list}} -> {[list], {action, {c-1, rest}}} # Emit the next tail item 
     acc -> {:halt, acc } # Finish of the stream 
    end 
    end 
end 
+0

我真的很喜欢你的方法比较简洁。竖起大拇指! – 2015-03-23 08:23:08

3

I had started a discussion about my proposed Stream.mutate method on the elixir core mailing list,彼得汉密尔顿提出解决这个问题的另一种方式提供反馈。通过使用make_ref to create a globally unique reference,我们可以创建一个填充流并将其与原始可枚举连接,以在原始流停止后继续发射。这个过程既可以配合使用Stream.chunk,这意味着我们需要删除的最后一步不需要的引用:

def lookahead(enum, n) do 
    stop = make_ref 
    enum 
    |> Stream.concat(List.duplicate(stop, n)) 
    |> Stream.chunk(n+1, 1) 
    |> Stream.map(&Enum.reject(&1, fn x -> x == stop end)) 
end 

我觉得这是最漂亮的解决方案还没有,从一个语法点。或者,我们可以使用Stream.transform手工打造的缓冲,这是相当类似于我前面提出的手动解决方案:

def lookahead(enum, n) do 
    stop = make_ref 
    enum 
    |> Stream.concat(List.duplicate(stop, n+1)) 
    |> Stream.transform([], fn val, acc -> 
    case {val, acc} do 
     {^stop, []}       -> {[] , []   } 
     {^stop, [_|rest] = buf}    -> {[buf], rest   } 
     {val , buf} when length(buf) < n+1 -> {[] , buf ++ [val] } 
     {val , [_|rest] = buf}    -> {[buf], rest ++ [val]} 
    end 
    end) 
end 

我没有基准这些解决方案,但我想第二个,虽然略显笨重,应因为它不必遍历每个块,所以执行得更好一点。

顺便说一句,第二个解决方案,可以不写case语句once Elixir allows to use the pin operator in function heads (probably in v1.1.0)

def lookahead(enum, n) do 
    stop = make_ref 
    enum 
    |> Stream.concat(List.duplicate(stop, n+1)) 
    |> Stream.transform([], fn 
    ^stop, []       -> {[] , []   } 
    ^stop, [_|rest] = buf    -> {[buf], rest   } 
    val , buf when length(buf) < n+1 -> {[] , buf ++ [val] } 
    val , [_|rest] = buf    -> {[buf], rest ++ [val]} 
    end) 
end 
+0

两个非常好的解决方案。我已经添加了基准。 – Warren 2015-04-08 01:38:25

+0

@Warren看到如何以多种不同的方式解决这个问题真的很有趣。感谢您分享您的基准! – 2015-04-08 06:23:03

0

从沃伦汲取灵感后,我做了这个。基本用法:

ex> {peek, enum} = StreamSplit.peek 1..10, 3 
{[1, 2, 3], #Function<57.77324385/2 in Stream.transform/3>} 
iex> Enum.take(enum, 5) 
[1, 2, 3, 4, 5] 

https://hex.pm/packages/stream_split