在我们的spark-streaming作业中,我们从kafka读取消息流。从spark中的kafka消息中获得主题
为此,我们使用KafkaUtils.createDirectStream
API,它返回JavaPairInputDStreamfrom
。
的消息是从卡夫卡读取(从三个主题 - TEST1,TEST2,TEST3)以下列方式:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
我们要处理来自不同的方式,每个主题的消息,并以实现这一点,我们需要知道每个消息的主题名称。
,所以我们做到以下几点:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
,这是SplitToLinesFunction
执行:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
的问题是,tuple2._1
为空,我们假设tuple2._1
会包含一些元数据,例如消息来自何处的主题/分区的名称。
但是,当我们打印tuple2._1
时,它为空。
我们的问题 - 有没有办法在kafka中发送主题名称,以便在spark-streaming代码中,tuple2._1
将包含它(而不是空)?
请注意,我们也尝试作为spark-streaming kafka-integration tutorial提到从DSTREAM获得主题名称:
但它返回所有被送往KafkaUtils.createDirectStream
的主题,而不是从那里的邮件的特定主题(属于当前RDD)来自。
因此,它无法帮助我们确定RDD中的消息从哪里发出的主题的名称。
编辑
响应大卫的回答 - 我尝试使用MessageAndMetadata
这样的:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
的问题是,什么也没有在MessageAndMetadataFunction.call
方法打印。我应该如何解决以获得MessageAndMetadataFunction.call
方法中RDD的相关主题?
你是什么意思“什么都打印在这里”?甚至不是“topic =”部分,也不是那个部分打印,但值是空的。 –
如果没有,那么你应该看看你的'YARN'日志,或者你正在运行的任何集群。对我来说,'/ usr/local/hadoop/logs/userLogs /'中的日志文件可以从你的执行程序中捕获'stdout'。 –
对不起 - 我现在知道这个问题。这是因为你的'MessageAndMetadataFunction'必须将主题和消息一起返回到一条记录中。现在你只返回话题,而不是消息本身。这就是为什么你反复打印主题的原因 - 因为这是你从MessageAndMetadataFunction返回的结果 - 既返回,你将拥有两个。 –