如何让RxJava在NiFi内部工作?或者我如何获得NiFi和RxJava的好玩?它们看起来像是彼此完美的补充。如何在我的Apache NiFi onTrigger方法中使用RxJava?
我遇到了一个问题,我无法弄清楚如何解决。 NiFi不断抱怨IllegalStateException
或FlowFileHandlingException
,具体取决于我从FlowFile输入流中读取的位置和方式。
我正在学习Apache NiFi和RxJava 2(即Flowables)。我想创建一个类似于现有SplitText处理器的Apache NiFi处理器 - 只是比较简单。没有头文件处理,没有片段大小处理 - 只需提取每行数据 - 我称之为SplitLine。
这里没有花哨的线程 - 这意味着我没有试图用Flowable.observeOn()或Flowable.subscribeOn()做任何事情。一切都应该在一个线程上完成......当前线程。
我想我会通过使用RxJava来解决这个问题。我会从FlowFile中读取字符并使用移位缓冲区发布它们;例如...
Flowable<Tuple<Long, Integer>> chars =
Flowable.generate(
() -> 0L,
(cnt, emitter) -> {
int ch = flowStream.read();
emitter.onNext(new Tuple<>(cnt, ch);
if (ch == -1) emitter.onComplete();
return cnt++;
});
return chars.buffer(2, 1).publish().autoConnect();
我也尝试使用Flowable.create相当于...
Flowable<Tuple<Long, Integer>> chars =
Flowable.create(emitter -> {
try {
int ch;
long cnt = 0;
while ((ch = flowStream.read()) != -1) {
emitter.onNext(new Tuple<>(cnt, ch);
cnt++;
}
emitter.onComplete();
} catch (IOException ex) {
ex.printStackTrace();
emitter.onError(ex);
} finally {
flowStream.close();
}
}, BackpressureStrategy.BUFFER);
return chars.buffer(2, 1).publish().autoConnect();
在上述情况下,我传递的InputStream
从NiFi ProcessSession
在重写onTrigger
我的处理器类的方法。
InputStream stream = session.read(flowFile)
RxLineSplitter splitter = new RxLineSplitter(stream);
我也使用回调版本尝试,但勿庸置疑收到异常,因为流是从比读取回调其他回调访问。这是...
session.read(flowFile, stream -> {
RxLineSplitter splitter = new RxLineSplitter(stream);
// RxLineSplitter contains the code above which is the other callback it is complaining about...
}
为什么我发布字符流?为什么成对的字符?我有两个用户在char流上。一个查找一行的开始,另一个查找行的结尾。由于Windows,我需要查找[\ r; \ N;或\ r \ n)]。基本上,这对中的第二个字符是向前看的。
如果你有兴趣,我RxSplitLine的症结看起来像......
Flowable<Tuple<Long, Integer>> findLineMarkers(
Flowable<List<Tuple<Long, Integer>>> charPairs,
BiFunction<Tuple<Long, Integer>, Optional<Tuple<Long, Integer>>, Optional<Tuple<Long, Integer>>> strategy) {
return charPairs().map(pair -> {
Tuple<Long, Integer> fst = pair.get(0);
Optional<Tuple<Long, Integer>> snd = pair.size() > 1 ? Optional.of(pair.get(1)) : Optional.empty();
return strategy.apply(fst, snd);
}).filter(Optional::isPresent).map(Optional::get);
}
Flowable<SplitInfo> split(InputStream stream) throws IOException {
return findLineMarkers(stream, startingPositionStrategy)
.zipWith(findLineMarkers(stream, endingPositionStrategy),
(s, e) -> new Split(s.item1, e.item1 - s.item1))
.filter(split -> !removeEmptyLines || split.length > 0)
.zipWith(counter(), Tuple::new)
.timeInterval(TimeUnit.MILLISECONDS)
.map(x -> new SplitInfo(x.value().item1.start,
x.value().item1.length,
x.value().item2,
x.time(), x.unit()));
}
够散漫......我会任何帮助或指针在获得NiFi和RxJava 2〜感激和另一个玩好。
你能后的踪迹详细说明你上面提到的例外? – Andy
感谢您对Andy的关注。我使用了一个要点来复制我的SplitLine处理器的一部分nifi-app.log和onTrigger()。 https://gist.github.com/rkayman/60faa63723c9f54b20f619fb131ccb66 – Rob