2016-11-15 86 views
1

弗林克管道如下:在弗林克使用神交流

  1. 读卡夫卡主题的消息(字符串)。
  2. 模式匹配通过grok转换为json格式。
  3. 从json提取的字段上的时间窗口上的聚合。

以下是使用grok进行模式匹配的代码。

SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance() 
        .map(new MapFunction<String, JSONObject>() {  
         private static final long serialVersionUID = 6; 

         @Override 
         public JSONObject map(String value) throws Exception { 
          JSONObject logJson = new JSONObject(); 
          grok.compile(pattern); //pattern is some pattern defined in the class 
          Match gm = grok.match(value); 
          gm.captures(); 
          logJson.putAll(gm.toMap()); 
          return logJson; 
         }}) 

在上面的代码编写grok.compile(pattern) map函数内部工作正常。不这样做提供了以下错误

的MapFunction的实现是不可序列

产生的原因:java.io.NotSerializableException:com.google.code.regexp.Pattern

是有什么方法可以将地图外部的grok.compile删除。根据我的理解,每条消息都不需要汇编模式,如果不是,则可能会造成瓶颈。的消息变得相当大。

PS:我已经导入了包oi.thekraken.grok.api.Grok

编辑:

我通过神交实施看着和神交类实现Serializable接口。 https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java

回答

0

您的代码并不显示在局部变量神交从何而来,但:

弗林克要求所有运营商可序列化,因为它们可能会在集群中四处移动。这也适用于所有运营商的成员。你能发布一个完整的非工作例子吗?这可能会更容易看到序列化可能失败的位置。

约弗林克系列化

更多信息,请弗林克文档在 https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html

主要是ound,你可以,如果你需要运营商成员注册自定义类型一KRYO串行或实施(去)序列化自己,不是直接可序列化的。

顺便说一句:我想你是对试图减少时间图案编译

+0

'基本上,你可以自定义类型注册KRYO串行或实施(去)的序列化自己,如果你需要的数量不能直接序列化的操作员成员。“---我对如何执行相同操作有点困惑。 – user3351750