2016-12-15 65 views
0

我有在C. 生成一个二进制文件中的文件具有26000个双打固定记录长度和包含1067条记录与没有分隔符。我需要在spark中读取它,并获得double值。 我也有一个Python代码,它可以获取双精度值并在Spark-shell中使用来自Java的Files.readAllBytes我也获得了这些值,所以基于Python输出我期望第一个记录的前1000个双精度值值-3509.580466022612。 从火花外壳摘自:读取二进制文件产生错误的输出

import java.nio.ByteBuffer 
import java.nio.ByteOrder 
val doubleByteSize = 8 
val recordLength = 1000 * 26 * doubleByteSize 
val bytesRdd = sc.binaryRecords("file:///myBinaryFile.val", recordLength) 
val arrayOfRecords = bytesRdd.collect 
val firstRecord = arrayOfRecords(0) 
// group 8 bytes together to transform them to doubles 
val listOfDoubles = firstRecord.grouped(doubleByteSize).toList 
// I get 1000 times the same double but isn't -3509.580466022612 it's 1.1848107264484659E181 
val result = listOfDoubles.map(arrayOfBytes => ByteBuffer.wrap(arrayOfBytes).getDouble) 
// try with little endian and it is wrong again -6.045003065652023E-27 
val result2 = listOfDoubles.map(arrayOfBytes => ByteBuffer.wrap(arrayOfBytes).order(ByteOrder.LITTLE_ENDIAN).getDouble) 

记录的数量看起来是正确的(arrayOfRecords.length = 1068),每条记录的字节数对我来说很好(firstRecord.length = 208000)和第一双打1000包含相同的值,如预期的那样,但双倍值(1.1848107264484659E181)不是预期值(-3509.580466022612)。我试图将其更改为little endian,但数字仍然是错误的(-6.045003065652023E-27)。 Python代码:

def str_all(data): 
ret_str = "" 
for d in data: 
    ret_str+= " " + str(d) 
return ret_str 

def main(): 

num_sim = 1000 
num_ts = 26 

record_index = 0 

deal_data = array('d') 
with open("/myBinaryFile.val","rb") as data_file: 
    data_file.seek(SIZEOFDOUBLE*record_index*num_sim*num_ts) 
    deal_data.fromfile(data_file,num_sim*num_ts) 
ts_index = 0 
deal_range = slice(ts_index*num_sim,(ts_index+1)*num_sim) 
# it prints 1000 times -3509.580466022612 
print(str_all(deal_data[deal_range])) 

简单的Java代码来读取二进制(从火花壳)得到的预期值:

val byteArray = Files.readAllBytes(Paths.get("/mybinaryFile.val")) 
// gets the correct value -3509.580466022612 
ByteBuffer.wrap(byteArray).order(ByteOrder.LITTLE_ENDIAN).getDouble 

任何人有怎么在这里的任何想法?

在此先感谢。

星火版本1.6.0,使用Scala的版本2.10.5(Java的热点(TM)64位服务器VM,爪哇1.7.0_67) Python版本2.6

+0

请显示python代码。 –

+0

另外,从Java-IO版本开始(即不要使用Spark来读取文件)以确认其余的处理是否符合您的期望。 –

+0

我的2美分:如果你有一个工作的Python代码片段来读取二进制数据,然后切换到PySpark,您的数据加载到数据帧的熊猫,转换成数据帧星火即'mySpkDF = sqlContext.createDataFrame(myPndDF)',保存为镶木地板;然后切换回Scala并读取Parquet数据。 –

回答

0

问题不在于与二进制数据有关本身,当我在做:

val arrayOfRecords = bytesRdd.collect 
val firstRecord = arrayOfRecords(0) 

我没有得到的阵列排列,改变的是解决问题:

val firstRecord = bytesRdd.first 

貌似收集不保留订购。感谢原型保罗为您的时间和帮助。