0

在我的应用程序中,我需要接收来自kafka的流数据并需要应用时间序列模型接收Spark中的流数据。如何将时间序列模型应用于火花流数据

我能够读取来自kafka的流式数据,但我不知道如何在流式数据上应用时间序列模型。

任何人都请告诉我如何时间序列工程和使用案例。

数据集:以下

725030:14732,2008,01,01,00,5.0,-3.9,1020.4,270,4.6,2,0.0,0.0 
725030:14732,2008,01,01,01,5.0,-3.3,1020.6,290,4.1,2,0.0,0.0 
725030:14732,2008,01,01,02,5.0,-3.3,1020.0,310,3.1,2,0.0,0.0 
725030:14732,2008,01,01,03,4.4,-2.8,1020.1,300,1.5,2,0.0,0.0 
725030:14732,2008,01,01,04,3.3,-4.4,1020.5,240,2.6,0,0.0,0.0 

Sparkjava码给出:

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.VoidFunction; 
import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SparkSession; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 

//KafkatoSparkStreaming Working Code 
/*this code is converting KafkaToSparkstreaming into DataSet by useing SparkJava 
* */ 
public class a { 
    public static void main(String arr[]) throws InterruptedException 
    { 


     SparkConf conf = new SparkConf(); 
     conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data. 
     //conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data. 
     conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload 
     //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP 
     conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form. 
     conf.setMaster("local"); 
     conf.set("spark.streaming.stopGracefullyOnShutdown", "true"); 

     JavaSparkContext sc = new JavaSparkContext(conf); 
     // Create the context with 2 seconds batch size 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Map<String, String> kafkaParams = new HashMap<String, String>(); 

     kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. 
     kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic. 
     kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker. 
     kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received. 

     Set<String> topics = Collections.singleton("ny-2008.csv"); 

     //Create an input DStream for Receiving data from socket 
     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, topics); 

     //Create JavaDStream<String> 
     JavaDStream<String> msgDataStream = directKafkaStream.map(new Function<Tuple2<String, String>, String>() { 
      @Override 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
      }); 
     //Create JavaRDD<Row> 
     msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { 
       @Override 
       public void call(JavaRDD<String> rdd) { 
        JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() { 
         @Override 
         public Row call(String msg) { 
         Row row = RowFactory.create(msg); 
         return row; 
         } 
        }); 
     //Create Schema  
     StructType schema = DataTypes.createStructType(new StructField[] {DataTypes.createStructField("Message", DataTypes.StringType, true)}); 
     //Get Spark 2.0 session  
     SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); 
     Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema); 
     msgDataFrame.show(); 
     msgDataFrame.createOrReplaceTempView("weatherTemporaryData"); 
     msgDataFrame.select("280").show(); 



       } 
     }); 

     ssc.start();    
     ssc.awaitTermination(); 
    } 

} 

class JavaSparkSessionSingleton { 
     private static transient SparkSession instance = null; 
     public static SparkSession getInstance(SparkConf sparkConf) { 
     if (instance == null) { 
      instance = SparkSession 
      .builder() 
      .config(sparkConf) 
      .getOrCreate(); 
     } 
     return instance; 
     } 
    } 
+0

什么是_“时间序列模型”_?什么是“应用时间序列模型”_? –

+0

随着时间的推移聚集它们。例如每24小时,每48小时或每周。或者对你的数据有意义的东西。这是时间序列模型。 – kumar

回答

0

在统计角度来看,一个时间序列模型使用时移窗口。例如,您将尝试预测N个过去值的未来值。如果这是你想达到的目标,你应该看看Window函数。 Jacek在这方面写了一个很好的问题:Window Aggregate Operators

总之,帮助您了解正在发生的事情,你必须创建一个WindowSpec实例来说明:

  • 分区方案,例如基于记录的事件类
  • 本地订单在分区中,在您的情况下,事件时间戳

然后,您将使用lag函数获取发生在给定事件之前的事件。

对于时间序列模型推断,由于连续事件不是独立的,因此必须小心平稳性。在某些情况下,您可以通过差异实现更好的平稳性(消除趋势);你不会考虑事件值,但它们与上一个事件的区别 - lag函数将再次成为你的朋友,但你需要N + 1个值。

一旦完成差异化和延迟,您可以尝试应用任何Spark ML库统计建模方法。