该代码用于将数据写入到的DataNodes存在于2个文件:
DFSOutputStream.java
(包:org.apache.hadoop.hdfs
)
由客户端写入的数据被分成数据包(通常为64K大小)。当数据包准备就绪时,数据将被排入数据队列中,数据队列由DataStreamer
拾取。
DataStreamer
(包:org.apache.hadoop.hdfs
)
它拿起在数据队列中的分组,并将它们在管道发送到数据节点(典型地有3个数据节点,因为复制因子在数据流水线, 3)。
它检索一个新的块ID并开始将数据流式传输到数据节点。当一个数据块被写入时,它关闭当前块并获得用于写入下一组数据包的新块。
的代码,其中,将一个新块得到的,是如下:
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block");
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
}
的代码,其中,所述当前块被关闭时,低于:
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
在endBlock()
方法中,再次舞台设置为:
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
这意味着,将创建一个新的管道用于写入下一组pa一个新的块。
编辑:如何检测到块的结束?
由于DataStreamer
不断追加数据到一个块,它会更新写入的字节数。
/**
* increase bytes of current block by len.
*
* @param len how many bytes to increase to current block
*/
void incBytesCurBlock(long len) {
this.bytesCurBlock += len;
}
它也保持检查,如果写入的字节数等于块大小:如果达到块大小
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
在上面的语句中,以下条件检查:
getStreamer().getBytesCurBlock() == blockSize)
如果遇到块边界,则调用endBlock()
方法:
/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
这将确保当前块被关闭,并从Name Node
获得用于写入数据的新块。
块的大小是由dfs.blocksize
参数hdfs-site.xml
文件决定(它被设置为128 MB在我的群集= 134217728):
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>The default block size for new files, in bytes.
You can use the following suffix (case insensitive): k(kilo),
m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
size (such as 128k, 512m, 1g, etc.), Or provide complete size
in bytes (such as 134217728 for 128 MB).
</description>
</property>
真的很好的答案,但是如果if(one.isLastPacketInBlock()){}获取最大块大小的信息,if语句如何?代码的哪一部分指示要分割为128 MB的文件? – IFH
@Iris,我已经更新了答案。请检查 –
完美答案!只需确认,调用enqueueCurrentPacketFull()的if语句;在DFSOutputStream.java中,对吗? – IFH