2017-10-17 89 views
1

我正尝试使用Zeppelin创建SnappyData流表。 我与参数 'rowConverter'SnappyData + Zeppelin + Kafka streaming - 创建流表时出错

齐柏林笔记本流表定义的问题被分为几段:

第1款:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.streaming.{SchemaDStream, StreamToRowsConverter} 

class RowsConverter extends StreamToRowsConverter with Serializable { 

override def toRows(message: Any): Seq[Row] = { 
val log = message.asInstanceOf[String] 
val fields = log.split(",") 
val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong), 
    fields(1), 
    fields(2), 
    fields(3), 
    fields(4), 
    fields(5).toDouble, 
    fields(6) 
))) 

rows 
} 
} 

条第2款:

snsc.sql(
"CREATE STREAM TABLE adImpressionStream if not exists ("sensor_id string, metric 
metric string) using kafka_stream 
options (storagelevel 'MEMORY_AND_DISK_SER_2', 
rowConverter 'RowsConverter', 
zkQuorum 'localhost:2181', 
groupId 'streamConsumer', topics 'test'");" 
) 

第一段返回错误:

error: not found: type StreamToRowsConverter 
class RowsConverter extends StreamToRowsConverter with Serializable { 
          ^
<console>:13: error: not found: type Row 
    override def toRows(message: Any): Seq[Row] = { 
              ^
<console>:16: error: not found: value Row 
     val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong), 

第二段:

java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: RowsConverter 

我一直在尝试使用默认代码的git:

snsc.sql("create stream table streamTable (userId string, clickStreamLog string) " + 
"using kafka_stream options (" + 
"storagelevel 'MEMORY_AND_DISK_SER_2', " + 
" rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' ," + 
"kafkaParams 'zookeeper.connect->localhost:2181;auto.offset.reset->smallest;group.id->myGroupId', " + 
"topics 'test')") 

,但我也有类似的错误:

java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: io.snappydata.app.streaming.KafkaStreamToRowsConverter 

你能不能帮我这个问题? 非常感谢。

回答

0

您需要在类路径中提供特定于应用程序的类。请参考这里设置classpath的步骤。齐柏林会拿起设置CLASSPATH在spark-env.sh https://github.com/SnappyDataInc/snappy-poc#lets-get-this-going

+0

有用的东西:D我从https://github.com/SnappyDataInc/snappy-poc#lets-get-this-going一步步做了一些动作。现在没有Zeppelin,只是检查它是否工作。流式处理 - 我可以从表中选择数据,但我在Spark上看不到任何应用程序。另外,在端口4040上的webUI不可用。 – Tomtom

+0

你可以尝试端口5050吗? –