2017-04-06 68 views
1

如何让RxJava在NiFi内部工作?或者我如何获得NiFi和RxJava的好玩?它们看起来像是彼此完美的补充。如何在我的Apache NiFi onTrigger方法中使用RxJava?

我遇到了一个问题,我无法弄清楚如何解决。 NiFi不断抱怨IllegalStateExceptionFlowFileHandlingException,具体取决于我从FlowFile输入流中读取的位置和方式。

我正在学习Apache NiFi和RxJava 2(即Flowables)。我想创建一个类似于现有SplitText处理器的Apache NiFi处理器 - 只是比较简单。没有头文件处理,没有片段大小处理 - 只需提取每行数据 - 我称之为SplitLine。

这里没有花哨的线程 - 这意味着我没有试图用Flowable.observeOn()或Flowable.subscribeOn()做任何事情。一切都应该在一个线程上完成......当前线程。

我想我会通过使用RxJava来解决这个问题。我会从FlowFile中读取字符并使用移位缓冲区发布它们;例如...

Flowable<Tuple<Long, Integer>> chars = 
    Flowable.generate(
     () -> 0L, 
     (cnt, emitter) -> { 
      int ch = flowStream.read(); 
      emitter.onNext(new Tuple<>(cnt, ch); 
      if (ch == -1) emitter.onComplete(); 
      return cnt++; 
     }); 

return chars.buffer(2, 1).publish().autoConnect(); 

我也尝试使用Flowable.create相当于...

Flowable<Tuple<Long, Integer>> chars = 
    Flowable.create(emitter -> { 
     try { 
      int ch; 
      long cnt = 0; 
      while ((ch = flowStream.read()) != -1) { 
       emitter.onNext(new Tuple<>(cnt, ch); 
       cnt++; 
      } 
      emitter.onComplete(); 
     } catch (IOException ex) { 
      ex.printStackTrace(); 
      emitter.onError(ex); 
     } finally { 
      flowStream.close(); 
     } 
    }, BackpressureStrategy.BUFFER); 

return chars.buffer(2, 1).publish().autoConnect(); 

在上述情况下,我传递的InputStream从NiFi ProcessSession在重写onTrigger我的处理器类的方法。

InputStream stream = session.read(flowFile) 
RxLineSplitter splitter = new RxLineSplitter(stream); 

我也使用回调版本尝试,但勿庸置疑收到异常,因为流是从比读取回调其他回调访问。这是...

session.read(flowFile, stream -> { 
    RxLineSplitter splitter = new RxLineSplitter(stream); 

    // RxLineSplitter contains the code above which is the other callback it is complaining about... 
} 

为什么我发布字符流?为什么成对的字符?我有两个用户在char流上。一个查找一行的开始,另一个查找行的结尾。由于Windows,我需要查找[\ r; \ N;或\ r \ n)]。基本上,这对中的第二个字符是向前看的。

如果你有兴趣,我RxSplitLine的症结看起来像......

Flowable<Tuple<Long, Integer>> findLineMarkers(
    Flowable<List<Tuple<Long, Integer>>> charPairs, 
    BiFunction<Tuple<Long, Integer>, Optional<Tuple<Long, Integer>>, Optional<Tuple<Long, Integer>>> strategy) { 

    return charPairs().map(pair -> { 
      Tuple<Long, Integer> fst = pair.get(0); 
      Optional<Tuple<Long, Integer>> snd = pair.size() > 1 ? Optional.of(pair.get(1)) : Optional.empty(); 

      return strategy.apply(fst, snd); 
    }).filter(Optional::isPresent).map(Optional::get); 
} 

Flowable<SplitInfo> split(InputStream stream) throws IOException { 

    return findLineMarkers(stream, startingPositionStrategy) 
       .zipWith(findLineMarkers(stream, endingPositionStrategy), 
         (s, e) -> new Split(s.item1, e.item1 - s.item1)) 
       .filter(split -> !removeEmptyLines || split.length > 0) 
       .zipWith(counter(), Tuple::new) 
       .timeInterval(TimeUnit.MILLISECONDS) 
       .map(x -> new SplitInfo(x.value().item1.start, 
             x.value().item1.length, 
             x.value().item2, 
             x.time(), x.unit())); 
} 

够散漫......我会任何帮助或指针在获得NiFi和RxJava 2〜感激和另一个玩好。

+0

你能后的踪迹详细说明你上面提到的例外? – Andy

+0

感谢您对Andy的关注。我使用了一个要点来复制我的SplitLine处理器的一部分nifi-app.log和onTrigger()。 https://gist.github.com/rkayman/60faa63723c9f54b20f619fb131ccb66 – Rob

回答

0

我相信我已经找到了答案......至少我的SplitLine处理器显示它已经收到流文件,读取的字节也是准确的!

如果您打算读取或做与输入流的东西正常InputStreamCallback外,该NiFi文档指导您使用其他重载一个在ProcessSession.read专门InputStream input = session.read(flowFile)。该文档还声明您有责任正确关闭流。对于那些试图这样做,我可以补充... 尽可能快速和尽可能快地关闭流

在RxJava2中,这意味着我的Flowable.create方法很接近但不够。您需要围绕您的Flowable.create包装Flowable.using。下面是我修改的构造函数和方法,工作...

几个亮点值得注意:

你可能想通过ProcessSession周围,并用它在Flowable.usingresourceSupplier ...那给我造成了无数的麻烦,ymmv,我不推荐它(但是如果你找到了方法,请告诉我)。

我利用了Flowable.using过载,使您可以指定eager参数。我将我设置为true以便切实关闭/处理资源(InputStream)。

RxLineSplitter(InputStream input, boolean removeEmptyLines) { 

    this.inputStream = input; 
    this.removeEmptyLines = removeEmptyLines; 
} 

private Flowable<List<Tuple<Long, Integer>>> getCharacters() { 

    Flowable<Tuple<Long, Integer>> chars = 
     Flowable.using(
      () -> this.inputStream, 
      input -> Flowable.create(emitter -> { 

       try { 
        long cnt = 0; 
        while (true) { 
         int ch = input.read(); 
         if (isEOF.test(ch)) break; 
         emitter.onNext(new Tuple<>(cnt, ch)); 
         ++cnt; 
        } 
        emitter.onComplete(); 

       } catch (Exception ex) { 
        emitter.onError(ex); 
       } 

      }, BackpressureStrategy.BUFFER), 
      InputStream::close, 
      true); 

    return chars.buffer(2, 1); 
} 

最后的思考:

  • 我喜欢这样的支持RxLineSplitter类对NiFi没有依赖关系。减少耦合。

  • 我不喜欢NiFi Processor.onTrigger方法获取InputStream,但需要RxLineSplitter关闭&处置。这在文档中有所讨论,但它对我来说感觉很脏并且容易出错。缓解上述情况,InputStream仅在一种方法中使用,并通过Flowable.using以相当明显且清晰的方式进行清理。

希望这可以帮助别人...时间看看我与NiFi和Rx遇到的其他[学习]障碍。