回答

0

定义Topology.java

import storm.kafka.BrokerHosts; 
import storm.kafka.KafkaSpout; 
import storm.kafka.SpoutConfig; 
import storm.kafka.StringScheme; 
import storm.kafka.ZkHosts; 

public class Topology{ 
    public static void main(String[] args){ 
    TopologyBuilder builder = new TopologyBuilder(); 
    String zkHosts = StringUtils.join("127.0.0.1", ','); 

     BrokerHosts hosts = new ZkHosts(zkHosts); 
     SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name"); 
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     spoutConfig.forceFromStart = forceFromStart; 
     builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5); 
     //... 
    } 
} 

基本上,你需要以创建kafkaSpout创建SpoutConfig。

+0

也许我会让你感到困惑,但我需要的是KafkaSpout.java,而不是拓扑。这是你的新东西。 – cutd