弗林克管道如下:在弗林克使用神交流
- 读卡夫卡主题的消息(字符串)。
- 模式匹配通过grok转换为json格式。
- 从json提取的字段上的时间窗口上的聚合。
以下是使用grok进行模式匹配的代码。
SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
.map(new MapFunction<String, JSONObject>() {
private static final long serialVersionUID = 6;
@Override
public JSONObject map(String value) throws Exception {
JSONObject logJson = new JSONObject();
grok.compile(pattern); //pattern is some pattern defined in the class
Match gm = grok.match(value);
gm.captures();
logJson.putAll(gm.toMap());
return logJson;
}})
在上面的代码编写grok.compile(pattern)
map函数内部工作正常。不这样做提供了以下错误
的MapFunction的实现是不可序列
产生的原因:java.io.NotSerializableException:com.google.code.regexp.Pattern
是有什么方法可以将地图外部的grok.compile删除。根据我的理解,每条消息都不需要汇编模式,如果不是,则可能会造成瓶颈。的消息变得相当大。
PS:我已经导入了包oi.thekraken.grok.api.Grok
编辑:
我通过神交实施看着和神交类实现Serializable接口。 https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java
'基本上,你可以自定义类型注册KRYO串行或实施(去)的序列化自己,如果你需要的数量不能直接序列化的操作员成员。“---我对如何执行相同操作有点困惑。 – user3351750