我正尝试使用Zeppelin创建SnappyData流表。 我与参数 'rowConverter'SnappyData + Zeppelin + Kafka streaming - 创建流表时出错
齐柏林笔记本流表定义的问题被分为几段:
第1款:
import org.apache.spark.sql.Row
import org.apache.spark.sql.streaming.{SchemaDStream, StreamToRowsConverter}
class RowsConverter extends StreamToRowsConverter with Serializable {
override def toRows(message: Any): Seq[Row] = {
val log = message.asInstanceOf[String]
val fields = log.split(",")
val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong),
fields(1),
fields(2),
fields(3),
fields(4),
fields(5).toDouble,
fields(6)
)))
rows
}
}
条第2款:
snsc.sql(
"CREATE STREAM TABLE adImpressionStream if not exists ("sensor_id string, metric
metric string) using kafka_stream
options (storagelevel 'MEMORY_AND_DISK_SER_2',
rowConverter 'RowsConverter',
zkQuorum 'localhost:2181',
groupId 'streamConsumer', topics 'test'");"
)
第一段返回错误:
error: not found: type StreamToRowsConverter
class RowsConverter extends StreamToRowsConverter with Serializable {
^
<console>:13: error: not found: type Row
override def toRows(message: Any): Seq[Row] = {
^
<console>:16: error: not found: value Row
val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong),
第二段:
java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: RowsConverter
我一直在尝试使用默认代码的git:
snsc.sql("create stream table streamTable (userId string, clickStreamLog string) " +
"using kafka_stream options (" +
"storagelevel 'MEMORY_AND_DISK_SER_2', " +
" rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' ," +
"kafkaParams 'zookeeper.connect->localhost:2181;auto.offset.reset->smallest;group.id->myGroupId', " +
"topics 'test')")
,但我也有类似的错误:
java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: io.snappydata.app.streaming.KafkaStreamToRowsConverter
你能不能帮我这个问题? 非常感谢。
有用的东西:D我从https://github.com/SnappyDataInc/snappy-poc#lets-get-this-going一步步做了一些动作。现在没有Zeppelin,只是检查它是否工作。流式处理 - 我可以从表中选择数据,但我在Spark上看不到任何应用程序。另外,在端口4040上的webUI不可用。 – Tomtom
你可以尝试端口5050吗? –