2015-09-07 86 views
0

我一直在向Kafka主题发送消息,这些消息在主题中使用JSON,我使用KafkaSpout从Kafka获取消息并使用shuffle将消息发送给螺栓分组。现在我想要在KafkaSpout和bolt之间实现字段组合。请任何人都可以帮我解决这个问题。如何实现KafkaSpout与螺栓之间的区域分组。我如何实现KafkaSpout和螺栓之间的字段分组

回答

2

您需要实现backtype.storm.spout.scheme接口,基本上它看起来是这样的:

public class FooScheme implements Scheme { 

    public Values deserialize(final byte[] _line) { 

    try{ 
      Values values = new Values(); 
      JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line)); 
      values.add((String) msg.get("a")); 
      values.add((String) msg.get("b")) 
      values.add(msg) 
     } 
     catch(ParseException e) { 
      //handle the exception 
      return null; 
     } 

    } 

    public Fields getOutputFields() { 
    return new Fields("a", "b", "json");} 
} 

,你与你的嘴像这样使用:

SpoutConfig spoutConfig = new SpoutConfig(... your config here ...); 
spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 
topology.setSpout("kafka-spout", 1).setNumTasks(1); 

,现在你可以准备好使用“a”或“b”或两者的字段分组。

FooBolt bolt = new FooBolt(); 
topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1) 
     .fieldsGrouping("kafka-spout", new Fields("a","b")); 

享受

+0

我曾试图用你的代码示例实现,但我可以得到下面的错误--- java.lang.IllegalArgumentException异常:有错号码字段创建的元组。预期1个字段,但在backtype.storm.tuple.TupleImpl处获得2个字段 。 (TupleImpl.java:55)〜[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] –

+0

@RishiArora getOutputFields中的字段数与反序列化中的值的数量? –

+0

公共值反序列化(最后一个字节[] _line){ \t \t // TODO自动生成方法存根 \t \t尝试{ \t \t \t值的值=新值(); \t \t \t的JSONObject味精=(的JSONObject)JSONValue \t \t \t \t \t .parseWithException(新字符串(_line)); \t \t \t //values.add((String)msg.get(“id”)); \t \t \t values.add(msg); \t \t \t返回值; –