2017-05-25 139 views
2

我已经结构化HDF基础文本文件,其中有这样的数据(file.txt的):如何处理增量更新在Hadoop的HDFS地图,减少

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!| 

4295877341|^|136|^|4|^|1|^|I|!| 
4295877346|^|136|^|4|^|1|^|I|!| 
4295877341|^|138|^|2|^|1|^|I|!| 
4295877341|^|141|^|4|^|1|^|I|!| 
4295877341|^|143|^|2|^|1|^|I|!| 
4295877341|^|145|^|14|^|1|^|I|!| 
123456789|^|145|^|14|^|1|^|I|!| 

的file.txt的的尺寸为30 GB。

我有大小的增量数据FILE1.TXT约2 GB即将在同一格式HFDS象下面这样:

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!| 

4295877341|^|213|^|4|^|1|^|I|!| 
4295877341|^|213|^|4|^|1|^|I|!| 
4295877341|^|215|^|2|^|1|^|I|!| 
4295877341|^|141|^|4|^|1|^|I|!| 
4295877341|^|143|^|2|^|1|^|I|!| 
4295877343|^|149|^|14|^|2|^|I|!| 
123456789|^|145|^|14|^|1|^|D|!| 

现在我必须结合file.txt的和FILE1.TXT和创建最终包含所有唯一记录的文本文件。

这两个文件中的关键都是OrgId。如果在第一个文件中找到相同的OrgId,那么我必须用新的OrgId替换,如果不是,那么我必须插入新的OrgId。

最终输出是这样的。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!| 

4295877346|^|136|^|4|^|1|^|I|!| 
4295877341|^|213|^|4|^|1|^|I|!| 
4295877341|^|215|^|2|^|1|^|I|!| 
4295877341|^|141|^|4|^|1|^|I|!| 
4295877341|^|143|^|2|^|1|^|I|!| 
4295877343|^|149|^|14|^|2|^|I|!| 

我该怎么做mapreduce?

我不打算为HIVE解决方案,因为我有这么多独特的文件,这样大约10,000,所以我必须在HIVE中创建10,000个分区。

对此用例有任何建议使用Spark?

+0

为什么你想在mapreduce?我可以建议你在Spark和Hadoop的scala中回答吗? –

+0

是的请...一些代码会很好 – SUDARSHAN

+0

我猜你对Spark和Scala和dataFrame有所了解,是吗? –

回答

3

我建议你编程scalaspark。如果您在mapreduce中编程,仅对hadoop有用,但在scala中编程为spark将使您能够在spark以及hadoop中处理。 Spark已启动,以解决mapreduce模型中的缺陷。你可以在这个主题上找到许多资源。其中之一是this

关于你的问题,我建议你使用dataframe

首要任务是创建schema为dataframes。

val schema = StructType(Array(StructField("OgId", StringType), 
    StructField("ItemId", StringType), 
    StructField("segmentId", StringType), 
    StructField("Sequence", StringType), 
    StructField("Action", StringType))) 

下一个任务是读取两个文件,并使用上述模式

import org.apache.spark.sql.functions._ 
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs") 
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
var df1 = sqlContext.createDataFrame(rowRdd1, schema) 
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", "")) 

val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs") 
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
var df2 = sqlContext.createDataFrame(rowRdd2, schema) 
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", "")) 

df1输出是

+----------+------+---------+--------+------+ 
|OgId  |ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877341|136 |4  |1  |I  | 
|4295877346|136 |4  |1  |I  | 
|4295877341|138 |2  |1  |I  | 
|4295877341|141 |4  |1  |I  | 
|4295877341|143 |2  |1  |I  | 
|4295877341|145 |14  |1  |I  | 
+----------+------+---------+--------+------+ 

df2输出创建数据帧是

+----------+------+---------+--------+------+ 
|OgId  |ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877341|213 |4  |1  |I  | 
|4295877341|215 |2  |1  |I  | 
|4295877341|141 |4  |1  |I  | 
|4295877341|143 |2  |1  |I  | 
|4295877343|149 |14  |2  |I  | 
+----------+------+---------+--------+------+ 

现在根据您的要求,如果OgIddf2匹配并且将df2的所有附加到df1,则要从df1删除rows。这些要求可以做如下

val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1") 

df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left") 
df1 = df1.filter("OgId_1 is null").drop("OgId_1") 
df1 = df1.union(df2) 

最终输出

+----------+------+---------+--------+------+ 
|OgId  |ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877346|136 |4  |1  |I  | 
|4295877341|213 |4  |1  |I  | 
|4295877341|215 |2  |1  |I  | 
|4295877341|141 |4  |1  |I  | 
|4295877341|143 |2  |1  |I  | 
|4295877343|149 |14  |2  |I  | 
+----------+------+---------+--------+------+ 

这最后的结果可以保存在hdfs作为

df1.write.format("com.databricks.spark.csv").save("output file path in hdfs") 

我希望这是有益

注意:确保你写入输入路径d输出位置正确

+0

非常感谢你,我会实现这一目标..非常适合我.. – SUDARSHAN

+0

如果您遇到任何问题,请让我知道。 :)如果你认为答案值得一个,请点赞。 ;)谢谢 –

+0

嗨Ramesh只是一个问题,如果我将有增量文件,没有相同的头文件作为基本文件,那么这种解决方案将工作?而且我的增量文件是为了所以如果我们加入,然后更新顺序将是保存? – SUDARSHAN