2015-09-09 155 views
2

我正在写一个spark工作,尝试使用scala读取文本文件,以下工作在我的本地机器上正常工作。Spark:使用scala从s3读取csv文件

val myFile = "myLocalPath/myFile.csv" 
    for (line <- Source.fromFile(myFile).getLines()) { 
    val data = line.split(",") 
    myHashMap.put(data(0), data(1).toDouble) 
    } 

然后我试图使它在AWS上工作,我做了以下内容,但它似乎没有正确读取整个文件。在s3上读取这样的文本文件的正确方法是什么?非常感谢!

val credentials = new BasicAWSCredentials("myKey", "mySecretKey"); 
val s3Client = new AmazonS3Client(credentials); 
val s3Object = s3Client.getObject(new GetObjectRequest("myBucket", "myFile.csv")); 

val reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent())); 

var line = "" 
while ((line = reader.readLine()) != null) { 
     val data = line.split(",") 
     myHashMap.put(data(0), data(1).toDouble) 
     println(line); 
} 

回答

0

我想我得到了它像以下工作:

val s3Object= s3Client.getObject(new GetObjectRequest("myBucket", "myPath/myFile.csv")); 

    val myData = Source.fromInputStream(s3Object.getObjectContent()).getLines() 
    for (line <- myData) { 
     val data = line.split(",") 
     myMap.put(data(0), data(1).toDouble) 
    } 

    println(" my map : " + myMap.toString()) 
1

阅读csv文件与sc.textFile("s3://myBucket/myFile.csv")。这会给你一个RDD [字符串]。获取到地图

val myHashMap = data.collect 
        .map(line => { 
         val substrings = line.split(" ") 
         (substrings(0), substrings(1).toDouble)}) 
        .toMap 

您可以使用sc.broadcast广播您的地图,所以,这是你的所有工作节点上一应俱全。

(请注意,你当然也可以使用Databricks“火花CSV”包中的CSV文件读取,如果你喜欢。)

+0

我的效用函数需要myHashMap。所以我的代码是这样的:output = input.map {t => myUtiltyFunction(myHashMap,t)}是否可以避免每次都将myHashMap传递给myUtiltiyFunction?有没有办法使用广播myHashMap并让myUtitlityFunction直接知道它?非常感谢! – Edamame

+0

另外,我不想使用sc.textFile(“s3://myBucket/myFile.csv”),因为我想让代码通用,即使没有spark上下文。谢谢。 – Edamame

+0

你意识到如果你让效用函数直接读取地图,并且使用如你所描述的'output = input.map {t => myUtiltyFunction(...)}'这样的效用函数,那么地图将被读取并且为您的输入rdd的每一行创建。我真的不认为你想要那样。另一方面,如果您广播变量(使用'sc.broadcast'),则您只能在驱动程序上读取并创建一次地图,然后所有工作人员都可以直接访问它。你为什么不想将地图传递给效用函数?这对我来说似乎很奇怪。 –