2017-09-08 120 views
0

我有以下代码:Spark ml streaming predictOnValues如何保存结果?

StreamingLinearRegressionWithSGD regressionWithSGD = 
     new StreamingLinearRegressionWithSGD() 
       .setInitialWeights(Vectors.zeros(featuresNumber)); 

JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache(); 
JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse); 
regressionWithSGD.trainOn(trainingData); 
regressionWithSGD.predictOnValues(testData.mapToPair(lp -> new Tuple2<>(lp.label(), lp.features()))).print(); 

我希望把结果给某些文件/数据库/队列等,而不是print()这可能吗?

回答

0

我已经想通了

StreamingLinearRegressionWithSGD regressionWithSGD = 
       new StreamingLinearRegressionWithSGD() 
         .setInitialWeights(Vectors.zeros(featuresNumber)); 

     JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache(); 
     JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse); 
     regressionWithSGD.trainOn(trainingData); 
     JavaDStream<Double> doubleJavaDStream=regressionWithSGD.predictOn(testData.map(labeledPoint -> labeledPoint.features())); 
     doubleJavaDStream.dstream().saveAsTextFiles("result","out"); 

因此,作为一个结果,我们正在result- {}时间戳的文件夹的.out。