2015-07-19 121 views
2

我已经写了kafka生产者并在Eclipse中的windows机器中进行了分区。我的Kafka集群正在ec2 linux中运行。我能够从eclipse执行kafaka生产者代码,但是我没有看到ec2框中的主题。Kafka生产者消息在EC2主题中不可用linux

Produce code : 
package com.panda.kafka.training; 
import java.util.*; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class PandaKafkaProducer { 
    public static void main(String[] args) { 
     long events = Long.parseLong(args[0]); 
     Random rnd = new Random(); 
     Properties props = new Properties(); 
     props.put("metadata.broker.list", "ec2-xx-yy-zzz-212.compute-1.amazonaws.com:9092"); 
     //props.put("producer.type", "sync"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("partitioner.class", "com.panda.kafka.training.PandaKafkaPartitioner"); 
     props.put("request.required.acks", "1"); 
     props.put("producer.type","async"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> producer = new Producer<String, 
       String>(config); 
     for (long nEvents = 0; nEvents < events; nEvents++) 
     { System.out.println("creating event "+nEvents); 
     long runtime = new Date().getTime(); 
     String ip = "192.168.2."+ rnd.nextInt(255); 
     String msg = runtime + ",www.vulab.com," + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("vulab123", ip, msg); 
     producer.send(data); 
     } 
     producer.close(); 
    } 

} 


Server Properties file: 
# The port the socket server listens on 
port=9092 

# Hostname the broker will bind to. If not set, the server will bind to all interfaces 
host.name=localhost 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the 
# value for "host.name" if configured. Otherwise, it will use the value returned from 
# java.net.InetAddress.getCanonicalHostName(). 
advertised.host.name=ec2-xx-yy-zzz-212.compute-1.amazonaws.com 

# The port to publish to ZooKeeper for clients to use. If this is not set, 
# it will publish the same port that the broker binds to. 
advertised.port=0000 


Producer properties file: 
# list of brokers used for bootstrapping knowledge about the rest of the cluster 
# format: host1:port1,host2:port2 ... 
metadata.broker.list=52.2.202.212:0000 

# name of the partitioner class for partitioning events; default partition spreads data randomly 
#partitioner.class= 

# specifies whether the messages are sent asynchronously (async) or synchronously (sync) 
producer.type=sync 

Output from Producer in ec2: 
[[email protected] bin]$ sh kafka-console-producer.sh --broker-list xxxx:yyy.zzzz:9092 --topic vulab123 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 

任何详细的解释将有很大的帮助。

+0

902端口是否在EC2节点上打开? – Sky

回答

2

请找到详细的答案。我有一台安装了eclipse的Windows机器,并且我为Kafka创建了Maven项目。我想从windows eclipse发送一些消息给kafaka集群(EC2)。我有一台机器完成所有卡夫卡的工作。

说明: Java生产者代码。您必须确保在此提及完整的DNS名称。

props.put("metadata.broker.list", "ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092"); 

props.put("advertised.host.name", "ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092"); 

第一步:

I started a new EC2 Linux machine and installed Kafka. 
wget http://mirror.sdunix.com/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz - See more at: http://vulab.com/blog/?p=576#sthash.GiMHbvXm.dpuf 

tar -xzf kafka_2.9.2-0.8.1.1.tgz 

第二步:

  1. 允许ssh和在EC2安全组入站规则的所有TRAFIC。

  2. 在windows机器上的/ etc/hosts /文件中添加公共ip(EC2),其中安装了 eclipse。

第三步:

Modified the kafka server properties files. I put the exact DNS name. 
    # The port the socket server listens on 
    port=9092 

# Hostname the broker will bind to. If not set, the server will bind to all interfaces 
host.name=ec2-52-xx-yyy-216.compute-1.amazonaws.com 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the 
# value for "host.name" if configured. Otherwise, it will use the value returned from 
# java.net.InetAddress.getCanonicalHostName(). 
advertised.host.name=ec2-52-xx-yyy-216.compute-1.amazonaws.com 
zookeeper.connect=ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181 

第四步:

执行的命令

1. sh zookeeper-server-start.sh /home/ec2/user/kafka/kafka/config/zookeeper.properties 

    2. sh kafka-server-start.sh /home/ec2-user/kafka/kafka/config/server.properties 

    3 .sh kafka-topics.sh --create --zookeeper ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181 --replication-factor 1 --partitions 1 --topic spanda20 

    4. sh kafka-console-producer.sh --broker-list ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092 --topic spanda20--sync 

    5.sh kafka-console-consumer.sh --zookeeper ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181 --topic spanda20 --from-beginning 

我可以看到在消费的所有消息。