我建议你编程scala
为spark
。如果您在mapreduce
中编程,仅对hadoop
有用,但在scala
中编程为spark
将使您能够在spark
以及hadoop
中处理。 Spark
已启动,以解决mapreduce
模型中的缺陷。你可以在这个主题上找到许多资源。其中之一是this
关于你的问题,我建议你使用dataframe
首要任务是创建schema
为dataframes。
val schema = StructType(Array(StructField("OgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))
下一个任务是读取两个文件,并使用上述模式
import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
的df1
输出是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136 |4 |1 |I |
|4295877346|136 |4 |1 |I |
|4295877341|138 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877341|145 |14 |1 |I |
+----------+------+---------+--------+------+
和df2
输出创建数据帧是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
现在根据您的要求,如果OgId
与df2
匹配并且将df2
的所有附加到df1
,则要从df1
删除rows
。这些要求可以做如下
val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)
最终输出
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136 |4 |1 |I |
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
这最后的结果可以保存在hdfs
作为
df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")
我希望这是有益
注意:确保你写入输入路径d输出位置正确
为什么你想在mapreduce?我可以建议你在Spark和Hadoop的scala中回答吗? –
是的请...一些代码会很好 – SUDARSHAN
我猜你对Spark和Scala和dataFrame有所了解,是吗? –