2016-01-12 13 views
1

正如我们都知道当本地文本文件被复制到HDFS时,文件被分割成固定大小的128 MB。例如,当我将一个256 MB的文本文件复制到HDFS中时,将会有2个包含“分割”文件的块(256/128)。Hadoop HDFS文件分成哪些Java文件块

有人可以告诉我中的哪个java/jar文件Hadoop 2.7.1源代码具有将文件拆分成块以及哪些java/jar文件将块写入数据节点的目录的功能。

帮我看看这段代码。

我只找到了他们在FileInputFormat.java中找到的块进行逻辑输入拆分,而这不是我所需要的。我需要分割物理文件的java文件。

回答

1

该代码用于将数据写入到的DataNodes存在于2个文件:

  • DFSOutputStream.java(包:org.apache.hadoop.hdfs

    由客户端写入的数据被分成数据包(通常为64K大小)。当数据包准备就绪时,数据将被排入数据队列中,数据队列由DataStreamer拾取。

  • DataStreamer(包:org.apache.hadoop.hdfs

    它拿起在数据队列中的分组,并将它们在管道发送到数据节点(典型地有3个数据节点,因为复制因子在数据流水线, 3)。

    它检索一个新的块ID并开始将数据流式传输到数据节点。当一个数据块被写入时,它关闭当前块并获得用于写入下一组数据包的新块。

    的代码,其中,将一个新块得到的,是如下:

    // get new block from namenode. 
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { 
        if(LOG.isDebugEnabled()) { 
        LOG.debug("Allocating new block"); 
        } 
        setPipeline(nextBlockOutputStream()); 
        initDataStreaming(); 
    } 
    

    的代码,其中,所述当前块被关闭时,低于:

    // Is this block full? 
    if (one.isLastPacketInBlock()) { 
        // wait for the close packet has been acked 
        synchronized (dataQueue) { 
        while (!shouldStop() && ackQueue.size() != 0) { 
         dataQueue.wait(1000);// wait for acks to arrive from datanodes 
        } 
        } 
        if (shouldStop()) { 
        continue; 
        } 
    
        endBlock(); 
    } 
    

    endBlock()方法中,再次舞台设置为:

    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; 
    

    这意味着,将创建一个新的管道用于写入下一组pa一个新的块。

编辑:如何检测到块的结束?

由于DataStreamer不断追加数据到一个块,它会更新写入的字节数。

/** 
    * increase bytes of current block by len. 
    * 
    * @param len how many bytes to increase to current block 
    */ 
void incBytesCurBlock(long len) { 
    this.bytesCurBlock += len; 
} 

它也保持检查,如果写入的字节数等于块大小:如果达到块大小

// If packet is full, enqueue it for transmission 
// 
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || 
    getStreamer().getBytesCurBlock() == blockSize) { 
    enqueueCurrentPacketFull(); 
} 

在上面的语句中,以下条件检查:

getStreamer().getBytesCurBlock() == blockSize) 

如果遇到块边界,则调用endBlock()方法:

/** 
* if encountering a block boundary, send an empty packet to 
* indicate the end of block and reset bytesCurBlock. 
* 
* @throws IOException 
*/ 
protected void endBlock() throws IOException { 
    if (getStreamer().getBytesCurBlock() == blockSize) { 
     setCurrentPacketToEmpty(); 
     enqueueCurrentPacket(); 
     getStreamer().setBytesCurBlock(0); 
     lastFlushOffset = 0; 
    } 
} 

这将确保当前块被关闭,并从Name Node获得用于写入数据的新块。

块的大小是由dfs.blocksize参数hdfs-site.xml文件决定(它被设置为128 MB在我的群集= 134217728):

<property> 
    <name>dfs.blocksize</name> 
    <value>134217728</value> 
    <description>The default block size for new files, in bytes. 
     You can use the following suffix (case insensitive): k(kilo), 
     m(mega), g(giga), t(tera), p(peta), e(exa) to specify the 
     size (such as 128k, 512m, 1g, etc.), Or provide complete size 
     in bytes (such as 134217728 for 128 MB). 
    </description> 
</property> 
+0

真的很好的答案,但是如果if(one.isLastPacketInBlock()){}获取最大块大小的信息,if语句如何?代码的哪一部分指示要分割为128 MB的文件? – IFH

+0

@Iris,我已经更新了答案。请检查 –

+0

完美答案!只需确认,调用enqueueCurrentPacketFull()的if语句;在DFSOutputStream.java中,对吗? – IFH

0

这不是一个jar文件或java文件,它具有分割文件的功能。这是执行此任务的客户端守护程序。当你从本地加载文件时,客户端首先只读128MB,它通过询问namenode找到一个存储它的地方,并且它确保文件被正确地复制和复制。在这个阶段,客户端不会知道文件的实际大小,除非它将以相同的方式读取所有的块。

当您要存储文件时,您提到的FileInputFormat.java不会被hdfs使用。它在您想要在该文件上运行任何mapreduce任务时使用。它与文件的存储无关。

+0

感谢您的回答!但是肯定不应该在“客户端守护进程任务”中声明至少有一个if语句,该语句会不断从文件读取数据到块中,直到文件达到最大大小(128 MB) 。 – IFH