2016-08-02 167 views
0

我想创建一个方法,在流上执行一些复杂的操作(例如替换第7个元素,删除最后一个元素,删除相邻的重复项等)而不缓存整个流。java 8个流:复杂的流处理

但什么流api让我插入此方法?我是否必须创建我自己的收集器,同时收集发射物品到其他流?但是这会改变数据流的方向,从拉到推,对吧?

这种方法的可能签名是什么?

Stream<T> process(Stream<T> in) 

大概是不可能的(在单线程代码),因为结果只能收集整个输入流后返还

另一个想法:

void process(Stream<T> in, Stream<T> out) 

也似乎有点瑕疵因为Java不允许发射插入项目到现有的流(提供out参数)。

所以我怎么能做一些复杂的流处理在java中?

+1

你的榜样需要有一个收集不流 - 如果你不知道,你可以删除最后一个元素的数量,流可以被多个线程处理,所以再次相邻的重复是不可能的。我认为你可以以某种方式去除例如仅限第七元素。 –

+0

好吧,'distinct'以某种方式删除相邻的重复项,显然这是可能的。但我同意删除最后一个元素可能没有正确定义 – piotrek

+0

'distinct'是一种简单的算法,它的工作方式与Linux'uniq'命令相同。所有你需要做的就是跟踪你之前看到的价值。如果当前值不同,请将其记录为先前的值。如果相同,则跳过此元素并继续。最多的情况下,您在任何时候都会查看两个连续的元素。您的要求假设“流”可能并非如此,直到您处理完“流”后才能发现。 – nickb

回答

1

您可以调用并返回任何标准流操作,如filter,map,reduce等,并让它们执行一些复杂的操作,例如,一个需要外部数据。例如,filterAdjacentDuplicatesreplaceNthElement可以实现这样的:

public static <T> Stream<T> filterAdjacentDupes(Stream<T> stream) { 
    AtomicReference<T> last = new AtomicReference<>(); 
    return stream.filter(t -> ! t.equals(last.getAndSet(t))); 
} 

public static <T> Stream<T> replaceNthElement(Stream<T> stream, int n, T repl) { 
    AtomicInteger count = new AtomicInteger(); 
    return stream.map(t -> count.incrementAndGet() == n ? repl : t); 
} 

用法示例:

List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo"); 
replaceNthElement(filterAdjacentDupes(lst.stream()), 3, "BAR").forEach(System.out::println); 
// Output: foo bar BAR foo 

然而,正如评论指出,这是不是真正的流API应该如何使用。特别是,如果给定并行流,这两种操作将会失败。

+2

请注意,当输入流并行时,'filterAdjacentDupes'和'replaceNthElement'都会中断。 – Tunaki

+2

请注意,流文档建议针对有状态的非终端操作。可以忽略该建议(如在此答案中),但结果不是由语言定义的。因此,不能保证这将适用于所有实现或将来的版本。 – sprinter

+0

我同意这可能不是完美的,但它可能足以满足OP的要求。尽管感谢您的警告。 –

4

您用作示例的复杂操作都遵循流中一个元素上操作的模式,具体取决于流中的其他元素。 Java流专门设计为不允许这些类型的操作没有收集或减少。 Streams操作不允许直接访问其他成员,并且通常情况下,带有副作用的非终端操作是一个坏主意。

注意从Stream的javadoc如下:

集合和数据流,同时承载一些表面上的相似,有不同的目标。收藏主要关注其元素的有效管理和访问。相比之下,流不提供直接访问或操作元素的手段,而是关注于声明性地描述它们的来源以及将在该来源上进行聚合的计算操作。

更具体地说:

大部分流操作接受描述用户指定的行为参数...为了保持正确的行为,这些行为参数:

必须是无干扰的(它们不修改流源);而在大多数情况下, 必须是无状态的(它们的结果不应取决于执行流管道期间可能会更改的任何状态)。如果行为参数的流操作状态

溪管道的结果可能是不确定的或不正确的。有状态的λ(或其他物体执行适当的功能接口)是其结果取决于流流水线的执行期间可能会改变任何状态

所有itermediate和终端无状态和状态操作的复杂性是公在https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.htmlhttp://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

描述这种方法既有优点也有缺点。一个显着的优点是它允许并行处理流。 Java的一个显着缺点是在其他一些语言中很容易的操作(比如跳过流中的每个第三个元素)都很困难。

注意,你会看到很多的代码(包括SO接受应答)忽视了意见,流操作的行为参数应该是无状态的。为了工作,这段代码依赖于未由语言规范定义的Java实现的行为:即按顺序处理流。在说明书中有没有任何停止以相反顺序或随机顺序执行Java处理元件。这样的实现将使任何有状态的流操作立即表现不同。无状态操作将继续表现完全一样。因此,要总结,状态操作依赖于Java的实施而非规范的细节。

另请注意,可以进行安全的有状态中间操作。他们需要进行设计,以便他们不依赖处理元素的顺序。 Stream.distinctStream.sorted就是很好的例子。他们需要维护国家的工作,但他们的设计工作与处理元素的顺序无关。

因此,要回答你的问题,这些类型的操作都可能在Java中这样做,但他们都不是简单的,安全的(在前面的段落中给出的原因)或天作之合的语言设计。我建议使用简化或收集或(见塔吉尔瓦列耶夫的回答)分裂者创造一个新的流。或者使用传统的迭代。

+0

*作为示例使用的复杂操作都遵循流中的一个元素上的操作模式,具体取决于流中的其他元素。*“替换”元素可以实现为无状态映射操作。 * Java流专门设计为不允许这些类型的操作没有收集或减少*标准流API提供有状态的操作,例如'distinct()'和'sorted()',它定义依赖于比较一个元素另一个。 – shmosel

+0

@shmosel我简化了我的回答,以避免在https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html中重新提供所有细节。有状态的中间操作退出是正确的。如该文档中所述,有状态操作(例如排序)可能需要收集整个流才能工作。这就是我'没有集体或减少'的意思。我会编辑我的答案,使之更清晰。 – sprinter

1

正确的(虽然不是很容易)的方式来做到这一点是写自己的Spliterator。常见的算法如下:

  1. 使用stream.spliterator()
  2. 写自己的Spliterator推进可能做一些额外的操作时可能会占用现有的元素以现有的流Spliterator。
  3. 通过StreamSupport.stream(spliterator, stream.isParallel())
  4. 委派创建基于您的spliterator一个新的流状.onClose(stream::close)close()调用原始数据流。

编写好的并行处理好的分割器通常是非常不平凡的任务。但是,如果你不关心并行化,你可以继承AbstractSpliterator这更简单。下面是一个例子,如何写一个新的流操作以给定位置删除一个元素:

public static <T> Stream<T> removeAt(Stream<T> src, int idx) { 
    Spliterator<T> spltr = src.spliterator(); 
    Spliterator<T> res = new AbstractSpliterator<T>(Math.max(0, spltr.estimateSize()-1), 
      spltr.characteristics()) { 
     long cnt = 0; 

     @Override 
     public boolean tryAdvance(Consumer<? super T> action) { 
      if(cnt++ == idx && !spltr.tryAdvance(x -> {})) 
       return false; 
      return spltr.tryAdvance(action); 
     } 
    }; 
    return StreamSupport.stream(res, src.isParallel()).onClose(src::close); 
} 

这是最小的实现,它可以改善,表现出更好的性能和并行。

在我的StreamEx库中,我尝试通过headTail来简化这种自定义流操作的添加。以下是如何使用StreamEx做同样的:

public static <T> StreamEx<T> removeAt(StreamEx<T> src, int idx) { 
    // head is the first stream element 
    // tail is the stream of the rest elements 
    // want to remove first element? ok, just remove tail 
    // otherwise call itself with decremented idx and prepend the head element to the result 
    return src.headTail(
     (head, tail) -> idx == 0 ? tail : removeAt(tail, idx-1).prepend(head)); 
} 

你甚至可以支持与chain()方法链接:

public static <T> Function<StreamEx<T>, StreamEx<T>> removeAt(int idx) { 
    return s -> removeAt(s, idx); 
} 

用例:

StreamEx.of("Java 8", "Stream", "API", "is", "not", "great") 
     .chain(removeAt(4)).forEach(System.out::println); 

最后请注意,即使没有headTail有一些使用StreamEx解决问题的方法。要删除你可以用越来越多的拉链具体指标,然后过滤和删除索引是这样的:

StreamEx.of(stream) 
     .zipWith(IntStreamEx.ints().boxed()) 
     .removeValues(pos -> pos == idx) 
     .keys(); 

要折叠相邻重复还有的专门collapse方法(它甚至并行化,相当不错!):

StreamEx.of(stream).collapse(Object::equals); 
0

建立在tobias_k答案和this question/update 2表达的想法上,我们可能会返回捕获其局部变量的适当谓词和映射函数。 (因为这些函数是有状态的,这对流并不理想,但Stream API中的distinct()方法可能也是有状态的)。

下面是修改代码:

public class Foo { 
    public static void run() { 
     List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo"); 
     lst.stream() 
       .filter(Foo.filterAdjacentDupes()) 
       .map(Foo.replaceNthElement(3, "BAR")) 
       .forEach(System.out::println); 
     // Output: foo bar BAR foo 
    } 

    public static <T> Predicate<T> filterAdjacentDupes() { 
     final AtomicReference<T> last = new AtomicReference<>(); 
     return t -> ! t.equals(last.getAndSet(t)); 
    } 

    public static <T> UnaryOperator<T> replaceNthElement(int n, T repl) { 
     final AtomicInteger count = new AtomicInteger(); 
     return t -> count.incrementAndGet() == n ? repl : t; 
    } 
}