2017-10-10 111 views
2

我想重写Clojure中的Spark结构化流示例。在Clojure中写Spark结构化流示例时出现错误

的例子是用Scala编写如下:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

(ns flambo-example.streaming-example 
    (:import [org.apache.spark.sql Encoders SparkSession Dataset Row] 
      [org.apache.spark.sql.functions] 
      )) 

(def spark 
    (-> 
    (SparkSession/builder) 
    (.appName "sample") 
    (.master "local[*]") 
    .getOrCreate) 
) 


(def lines 
    (-> spark 
     .readStream 
     (.format "socket") 
     (.option "host" "localhost") 
     (.option "port" 9999) 
     .load  
    ) 
) 

(def words 
    (-> lines 
     (.as (Encoders/STRING))  
     (.flatMap #(clojure.string/split % #" "))  
    )) 

上述代码导致以下例外。

;;由java.lang.IllegalArgumentException引发 ;;找不到匹配的方法:flatMap for class ;; org.apache.spark.sql.Dataset

我该如何避免错误?

回答

1

您必须按照签名。 Java的Dataset API提供的Dataset.flatMap两种实现方式,一是这需要scala.Function1

def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

,第二个这需要星火自己o.a.s.api.java.function.FlatMapFunction

def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 

前者是没有用处的,但是你应该能够使用后者。 RDD API flambouses macros to create Spark friendly adapters可以通过flambo.api/fn进行访问 - 我不确定这些是否可以直接与Datasets一起使用,但如果需要,您应该可以调整它们。

由于您不能依赖隐式Encoders您还必须提供与返回类型相匹配的显式编码器。

总体而言,你需要的东西左右:

(def words 
    (-> lines 
    (.as (Encoders/STRING))  
    (.flatMap f e)  
)) 

其中f实现FlatMapFunctioneEncoder。一个示例实现:

(def words 
    (-> lines 
     (.as (Encoders/STRING))  
     (.flatMap 
     (proxy [FlatMapFunction] [] 
      (call [s] (.iterator (clojure.string/split s #" ")))) 
     (Encoders/STRING)))) 

,但我想这是有可能找到一个更好的。

在实践中,我会避​​免输入Dataset,并专注于DataFrameDataset[Row])。

相关问题