0
我一直在向Kafka主题发送消息,这些消息在主题中使用JSON,我使用KafkaSpout
从Kafka获取消息并使用shuffle将消息发送给螺栓分组。现在我想要在KafkaSpout
和bolt之间实现字段组合。请任何人都可以帮我解决这个问题。如何实现KafkaSpout
与螺栓之间的区域分组。我如何实现KafkaSpout和螺栓之间的字段分组
我一直在向Kafka主题发送消息,这些消息在主题中使用JSON,我使用KafkaSpout
从Kafka获取消息并使用shuffle将消息发送给螺栓分组。现在我想要在KafkaSpout
和bolt之间实现字段组合。请任何人都可以帮我解决这个问题。如何实现KafkaSpout
与螺栓之间的区域分组。我如何实现KafkaSpout和螺栓之间的字段分组
您需要实现backtype.storm.spout.scheme
接口,基本上它看起来是这样的:
public class FooScheme implements Scheme {
public Values deserialize(final byte[] _line) {
try{
Values values = new Values();
JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line));
values.add((String) msg.get("a"));
values.add((String) msg.get("b"))
values.add(msg)
}
catch(ParseException e) {
//handle the exception
return null;
}
}
public Fields getOutputFields() {
return new Fields("a", "b", "json");}
}
,你与你的嘴像这样使用:
SpoutConfig spoutConfig = new SpoutConfig(... your config here ...);
spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
topology.setSpout("kafka-spout", 1).setNumTasks(1);
,现在你可以准备好使用“a”或“b”或两者的字段分组。
FooBolt bolt = new FooBolt();
topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1)
.fieldsGrouping("kafka-spout", new Fields("a","b"));
享受
我曾试图用你的代码示例实现,但我可以得到下面的错误--- java.lang.IllegalArgumentException异常:有错号码字段创建的元组。预期1个字段,但在backtype.storm.tuple.TupleImpl处获得2个字段 。(TupleImpl.java:55)〜[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] –
@RishiArora getOutputFields中的字段数与反序列化中的值的数量? –
公共值反序列化(最后一个字节[] _line){ \t \t // TODO自动生成方法存根 \t \t尝试{ \t \t \t值的值=新值(); \t \t \t的JSONObject味精=(的JSONObject)JSONValue \t \t \t \t \t .parseWithException(新字符串(_line)); \t \t \t //values.add((String)msg.get(“id”)); \t \t \t values.add(msg); \t \t \t返回值; –