我载入我的CSV数据框使用我,然后转换为数据集,但它显示像这行这个如何使用Scala在Spark中使用DataSet?
多个标记:
- 无法找到存储在数据集型编码器。通过导入
spark.implicits._支持原始类型(Int,字符串等)和产品类型(案例类别)。将来版本中将添加对序列化其他类型的支持。
- 方法的参数不够:(隐式证据$ 2:
org.apache.spark.sql.Encoder [DataSet.spark.aacsv])org.apache.spark.sql.Dataset [DataSet.spark.aacsv] 。未指定值参数证据$ 2
如何解决此问题? 我的代码 -
case class aaCSV(
a: String,
b: String
)
object WorkShop {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv")
df.printSchema()
df.show()
val googleDS = df.as[aaCSV]
googleDS.show()
}
}
现在我改变的主要功能是这样 -
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
sa.printSchema()
sa.show()
}
但它抛出错误 - 异常线程 “main” org.apache.spark.sql.AnalysisException:不能给定输入列:[_c1,_c2,_c5,_c4,_c6,_c3,_c0]解析'Adj_Close
';第1行pos 7。我该怎么办 ?
现在我执行我的方法使用基于给定的时间间隔使用火花调度。但我提到这个链接 - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application。请帮助我们。
“没有足够的论据法” ......有什么办法?你的代码在哪里? –
嗯。请不要使用注释代码。编辑你的问题并适当地格式化它。谢谢 –
@Sarathkumar Vulchi:在将df转换为ds之前,您是否可以尝试添加此行'sqlContext.implicits._'。 – Shankar