2016-04-21 98 views
5

在我们的spark-streaming作业中,我们从kafka读取消息流。从spark中的kafka消息中获得主题

为此,我们使用KafkaUtils.createDirectStream API,它返回JavaPairInputDStreamfrom

的消息是从卡夫卡读取(从三个主题 - TEST1,TEST2,TEST3)以下列方式:

private static final String TOPICS = "test1,test2,test3"; 
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(","))); 

HashMap<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", BROKERS); 

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
       streamingContext, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topicsSet 
       ); 

我们要处理来自不同的方式,每个主题的消息,并以实现这一点,我们需要知道每个消息的主题名称。

,所以我们做到以下几点:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction()); 

,这是SplitToLinesFunction执行:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> { 
    @Override 
    public String call(Tuple2<String, String> tuple2) 
    { 
     System.out.println(tuple2._1); 
     return tuple2._2(); 
    } 
} 

的问题是,tuple2._1为空,我们假设tuple2._1会包含一些元数据,例如消息来自何处的主题/分区的名称。

但是,当我们打印tuple2._1时,它为空。

我们的问题 - 有没有办法在kafka中发送主题名称,以便在spark-streaming代码中,tuple2._1将包含它(而不是空)?

请注意,我们也尝试作为spark-streaming kafka-integration tutorial提到从DSTREAM获得主题名称:

但它返回所有被送往KafkaUtils.createDirectStream的主题,而不是从那里的邮件的特定主题(属于当前RDD)来自。

因此,它无法帮助我们确定RDD中的消息从哪里发出的主题的名称。

编辑

响应大卫的回答 - 我尝试使用MessageAndMetadata这样的:

 Map<TopicAndPartition, Long> topicAndPartition = new HashMap(); 
     topicAndPartition.put(new TopicAndPartition("test1", 0), 1L); 
     topicAndPartition.put(new TopicAndPartition("test2", 0), 1L); 
     topicAndPartition.put(new TopicAndPartition("test3", 0), 1L); 

     class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String> 
     { 

      @Override 
      public String call(MessageAndMetadata<String, String> v1) 
        throws Exception { 
       // nothing is printed here 
       System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition()); 
       return v1.topic(); 
      } 

     } 

     JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction()); 
     messages.foreachRDD(new VoidFunction() { 

      @Override 
      public void call(Object t) throws Exception { 
       JavaRDD<String> rdd = (JavaRDD<String>)t; 
       OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
       // here all the topics kafka listens to are printed, but that doesn't help 
       for (OffsetRange offset : offsets) { 
        System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset()); 
       } 
      } 
     }); 

的问题是,什么也没有在MessageAndMetadataFunction.call方法打印。我应该如何解决以获得MessageAndMetadataFunction.call方法中RDD的相关主题?

+0

你是什么意思“什么都打印在这里”?甚至不是“topic =”部分,也不是那个部分打印,但值是空的。 –

+0

如果没有,那么你应该看看你的'YARN'日志,或者你正在运行的任何集群。对我来说,'/ usr/local/hadoop/logs/userLogs /'中的日志文件可以从你的执行程序中捕获'stdout'。 –

+0

对不起 - 我现在知道这个问题。这是因为你的'MessageAndMetadataFunction'必须将主题和消息一起返回到一条记录中。现在你只返回话题,而不是消息本身。这就是为什么你反复打印主题的原因 - 因为这是你从MessageAndMetadataFunction返回的结果 - 既返回,你将拥有两个。 –

回答

6

使用以messageHandler函数作为参数的版本createDirectStream之一。这是我做的:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
    ssc, 
    kafkaParams, 
    getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap, 
    (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} 
) 

有东西存在,这并不意味着什么吗 - 相关的部分是

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} 

如果你不熟悉Scala,所有的功能确实是回报包含msg.topicmsg.messageTuple2。您的功能需要返回这两个以便您在下游使用它们。您可以返回整个MessageAndMetadata对象,这会给您一些其他有趣的字段。但是,如果您只想要topicmessage,请使用上述内容。

+0

嘿看起来像你有一个额外的支架,请你纠正它。 –

+1

@David可以请你提供一个在斯卡拉工作或详细的例子。因为我对来自Offset,messageHandler的这些参数感到困惑。感谢您! –

1

Kafka integration guide的底部,有一个从消息中提取主题的示例。

相关的代码在Java中:

// Hold a reference to the current offset ranges, so it can be used downstream 
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); 

directKafkaStream.transformToPair(
    new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { 
    @Override 
    public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { 
     OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
     offsetRanges.set(offsets); 
     return rdd; 
    } 
    } 
).map(
    ... 
).foreachRDD(
    new Function<JavaPairRDD<String, String>, Void>() { 
    @Override 
    public Void call(JavaPairRDD<String, String> rdd) throws IOException { 
     for (OffsetRange o : offsetRanges.get()) { 
     System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() 
     ); 
     } 
     ... 
     return null; 
    } 
    } 
); 

这或许可以被折叠成更紧凑刚刚询问的话题,没有别的。

+0

我试过了,它会打印所有kafka监听的主题,而不仅仅是与当前RDD相关的主题。例如 - 如果我听3个主题 - test1,test2,test3 - 并且消息仅从test1到达,那么此代码将为每个RDD打印test1,test2,test3。所以这段代码并没有帮助我。 –

+0

这不起作用 – User3

相关问题