2017-06-06 99 views
1

我在Json中有一个由Websocket提供的流数据,其大小在每秒1MB和60MB之间变化。通过Kafka和Spark消耗大数据

我得解码数据然后解析它,最后写入到mysql。

我想2个想法:

1)从插槽中读取数据,然后对数据进行解码,并通过Avro公司发送给消费者的生产者, 然后来获取数据并写入到MySQL的星火地图,减少消费

2)从Socket读取数据然后将数据发送到Consumer in Producer, 然后在Consumer中获取数据,然后在Spark上解码并将解析的数据发送到Spark Job以写入到mysql。

你有什么想法吗?

生产者

/* 
* To change this license header, choose License Headers in Project Properties. 
* To change this template file, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package com.tan; 


import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.ProducerConfig; 

import java.util.Properties; 


import java.util.stream.Stream; 

import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 
/** 
* 
* @author Tan 
*/ 
public class MainKafkaProducer { 

    /** 
    * @param args the command line arguments 
    */ 
    public static void main(String[] args) throws InterruptedException { 
     // TODO code application logic here 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 

     //props.put("group.id", "mygroup"); 
     //props.put("max.partition.fetch.bytes", "100000000"); 
     //props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     //props.put("partitioner.class","kafka.producer.DefaultPartitioner"); 
     //props.put("request.required.acks", "1"); 

     KafkaProducer<String, String> producer = new KafkaProducer<>(props); 

     // Read the data from websocket and send it to consumer 
     //for (int i = 0; i < 100; i++) { 
      String fileName = "/Users/Tan/Desktop/feed.json"; 
      try{ 
       BufferedReader file = new BufferedReader(new FileReader(fileName)); 
       String st = file.readLine(); 
       for(int i = 0; i < 100; i++) 
       { 
        ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", st); 
        producer.send(record); 
       } 
      }catch(IOException e){ 
       e.printStackTrace(); 
      } 
     //} 

     /* 
     for(int i = 0; i < 100; i++) 
     { 
      ProducerRecord<String, String> record2 = new ProducerRecord<>("mytopic", "Hasan-" + i); 
      producer.send(record2); 
     } 
     */ 


     producer.close(); 
    } 

} 

消费者

/* 
* To change this license header, choose License Headers in Project Properties. 
* To change this template file, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package com.tan; 

import kafka.serializer.DefaultDecoder; 
import kafka.serializer.StringDecoder; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 
/** 
* 
* @author Tan 
*/ 
public class MainKafkaConsumer { 
    /** 
    * @param args the command line arguments 
    */ 
    public static void main(String[] args) { 

     SparkConf conf = new SparkConf() 
       .setAppName(MainKafkaConsumer.class.getName()) 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Set<String> topics = Collections.singleton("mytopic"); 
     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 

     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, String.class, 
       StringDecoder.class, StringDecoder.class, 
       kafkaParams, topics); 

     directKafkaStream.foreachRDD(rdd -> { 

      rdd.foreach(records -> { 

       System.out.println(records._2); 

      }); 

     }); 
     /* 
     directKafkaStream.foreachRDD(rdd -> { 
      System.out.println("--- New RDD with " + rdd.partitions().size() 
        + " partitions and " + rdd.count() + " records"); 
      rdd.foreach(record -> { 
       System.out.println(record._2); 
      }); 
     }); 
     */ 



     ssc.start(); 
     ssc.awaitTermination(); 

    } 

} 

回答

2

你的过程是好的,关键是只为Avro的转换。你的数据不是那么大,1Mb到60Mb。

在这里,我有一个类似的过程,从MQ读取数据,转换为avro,发送到kafka,从kafka消耗,解析数据并在其他MQ中发布。

当我们的数据非常庞大时,Avro可以提供很多帮助,例如> = 1Gb。但在某些情况下,我们的数据非常小,如< 10Mb。在这种情况下,Avro使我们的处理速度有点慢,网络传输没有任何收益。

我建议你,如果你的网络足够好,不能转换成AVRO,最好不要AVRO。为了提高Spark端的性能,需要配置大量分区的kafka主题,因为如果你只有一个分区,你的spark就不会正确地进行parallization。检查this文字,可以帮助你。

+0

感谢您的评论,我删除了avro,我发送了kafka的数据,但我无法使用Spark的数据。 (JSON格式和3 MB的数据)我添加了我的代码 – Tan