DFSClient技术内幕 (DFSClient介绍以及其初始化) – 推酷

以下是本人研究源代码成果, 此文僅献给我和我的小伙伴们,不足之处,欢迎斧正————————————————-致谢道格等人!

注: hadoop版本0.20.2,有童鞋表示看代码头晕,所以本文采用纯文字描述,哥还特意为你们把字体调调颜色噢 ^ o ^

大家都知道,hadoop是最优秀的大数据处理框架之一,而本文研究的DFSClient是 hadoop内部实现中,较为核心的部分

DFSClient,顾名思义,分布式文件系统的客户端

hadoop的客户端包括:shell命令,java接口,pig 等。。

DFSClient在分布式文件系统中扮演的角色:

现实场景如下:我的数据文件分散存储在集群中的很多台机器上,现在我需要获取它,以使用shell命令操作为例

我输入获取数据的指令后,DFSClient通过RPC机制和NameNode(集群中的主机器)通信并获得文件的元数据信息(数据的存放位置,校验和,等等一些关键的信息),然后DFSClient再与DataNode通信(集群中的其它机器)通过数据I/O流来获取到我要的文件信息,

它在分布式文件系统内部实现中起到双方通信的作用,是用户与文件系统通信的桥梁,由此可见,掌握它,驾驭它,显得尤为迫切!

首先一起讨论下关于DFSClient的体系结构:

它位于org.apache.hadoop.hdfs包下:体系图如下

    

DFSClient

    |——-LeaseChecker implements Runnable

    |——-DNAddrPair

    |——-BlockReader extends FSInputChecker ( 

FSInputChecker

  

extends FSInputStream )

    |——-DFSInputStream

    |——-DFSDataInputStream 

extends DataInputStream  

implements Seekable(支持流中随机存储), PositionedReadable(定位读取)

    |——-DFSOutputStream extends FSOutputSummer implements Syncable  (FSOutputSummer extends OutputStream)

              |——–Packet 

              |——–DataStreamer extends Daemon (extends Thread)

              |——–ResponseProcessor extends Thread

LeaseChecker介绍:实现了Runnable接口

在HDFS中可能有多个客户端在同一时刻进行文件的写入操作,有时会出现多个客户端并发的写入一个文件的情况,所以采取一些措施来控制并发写入情况的发送,一般情况下会采用互斥锁的方法来进行控制,使得每一时刻只有一个获得锁的客户端才能执行,写入操作。但是互斥锁的机制在分布式系统中会有很多问题

问题一:每次执行写入时,客户端都需要向NameNode申请互斥锁,从而造成网络开销的增大

问题二:当某个客户端获得锁之后和NameNode失去了联系,此时会造成互斥锁无法释放,使得其他的客户端的操作会被终止

解决方案:HDFS使用Lease租约来解决互斥锁的问题

过程:当DFSClient需要对一个文件执行写入操作时,他首先需要向NameNode申请一个租约(有时间限制),在时间期限内客户端可以             对租约所管理的文件执行写入。一个文件只能被一个租约锁管理,所以只能有一个客户端对文件执行写入操作,在租约的有效时间             内,DFSClient客户端会一直持有写文件的权限,而不需要再向NameNode询问是否有写文件的权限。当客户端一直工作时,它会在          租约过期后向NameNode申请续约,入股在租约的有效期间内,客户端发生了异常,和NameNode失去了联系,当租约期满后,                NameNode会发现发生异常的客户端,此时NameNode会将新的租约赋给其它正常的客户端,当发生异常的客户端已经写入了一部             分数据时,HDFS为了分辨出这些无用的数据,会在客户端每次写入数据时增加版本号信息,异常的客户端的写入的数据的版本号            会很低,从而可以被安全删除掉。

LeaseChecker作用: 在DFSClient中有个LeaseChecker线程,该线程会周期性的检查租约是否过期,在租约快过期的时候会对租约进行续约,此外,在namenode包中有个LeaseManager租约管理器,该管理器会不断的检查它所管理的lease是否过期,如果lease已经过期,会将其删除

DNAddrPair介绍:封装了定位到的DataNode信息和DataNode所对应的IP信息

FSInputChecker 介绍:

抽象类FSInputChecker继承自FSInputStream,加入了HDFS所需要的校验功能,

hadoop会生成与原生文件所对应的校验和文件,并在读写文件的时候对文件进行校验,

以确保数据的准确性

BlockReader介绍:BlockReader 继承自 FSInputChecker  继承自  FSInputStream,校验功能是在readChecksumChunk方法中实现,而readChecksumChunk私有方法是被read1私有方法内部调用,而且所有的read方法的都是通过间接地调用read1方法来实现对数据进行读取并做校验和验证的

DFSInputStream介绍:继承自FSInputStream,该类会创建到DataNode的Socket连接,然后使用Socket来读取DataNode上的数据信息

DFSDataInputStream介绍:继承自DataInputStream,DFSDataInputStream的功能都依靠包装的DFSInputStream来完成

DFSOutputStream介绍:继承自DFSOutputStream

Packet介绍:

数据包,

DFSOutputStream的内部类,

DFSClient是通过一个个Packet来向DataNode写入数据的,

一个Packet由多个数据chunk组成,每个chunk对应着一个校验和,当写入足够的chunk之后,packet会被添加到dataQueue中

DataStreamer 介绍:

 

DataStreamer是真正写入数据的进程,

在发送Packet之前,它会首先从Namenode中获得一个blockid和Block的位置信息,

然后它会循环地从dataQueue中取得一个Packet,然后将该Packet真正写入到与DataNode所建立的socket中,

 当将属于一个Block的所有Packet都发送给DataNode,并且返回了与每个Packet所对应的响应信息之后,

DataStream 会关闭当前的数据Block

ResponseProcessor 介绍:响应处理器ResponseProcessor

 

至此,DFSClient讨论完毕

————————————————————————————————————————————————————————-

DFSClient构造器群:

  public DFSClient(Configuration conf) throws IOException {

    this(NameNode.getAddress(conf), conf);

  }

 

  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf

      ) throws IOException {

     

this(nameNodeAddr, conf, null);

  }

  

   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,

                   FileSystem.Statistics stats)

    throws IOException {

   

 this(nameNodeAddr, null, conf, stats);

  } 

 

DFSClient为用户提供了简单,一致的标准访问接口下面,但其内部实现较为复杂,本人陪同大家一起一起去探索这个神奇的国度

我们需要构建一个DFSClient对象:

DFSClient提供了4种形式的构造器,构造方法的主要任务有两个:

a,读入配置项并初始化一些成员变量

b,建立和名字节点的IPC连接

详细过程分析:a过程被初始化对象如下:

1,配置对象configuration,

2,

收集文件系统统计信息的对象,

3,

socket连接的过期时间,

4,

写入的数据包的大小

5,

通过socket向dataNode写入数据的超期时间

6,

创建socket连接的工厂类

 

7, 用户组信息

8, 最大块获取失败次数

9,客户端的名称(

如果该任务是Map-reduces任务,则使用任务ID作为客户端名称)

9, 默认的块大小(64M),默认的块副本数

 

构造器代码如下:

  /** 

   * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.

   * Exactly one of nameNodeAddr or rpcNamenode must be null.

   */

  DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,

      Configuration conf, FileSystem.Statistics stats)

    throws IOException {

    this.conf = conf;

    this.stats = stats;

    this.socketTimeout = conf.getInt(“dfs.socket.timeout”, 

                                     HdfsConstants.READ_TIMEOUT);

    this.datanodeWriteTimeout = conf.getInt(“dfs.datanode.socket.write.timeout”,

                                            HdfsConstants.WRITE_TIMEOUT);

    this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);

    // dfs.write.packet.size is an internal config variable

    this.writePacketSize = conf.getInt(“dfs.write.packet.size”, 64*1024);

    this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);

    

    try {

      this.ugi = UnixUserGroupInformation.login(conf, true);

    } catch (LoginException e) {

      throw (IOException)(new IOException().initCause(e));

    }

    String taskId = conf.get(“mapred.task.id”);

    if (taskId != null) {

    //如果是MapReduce任务,则客户端名称为任务Id号,否则取随机号

      this.clientName = “DFSClient_” + taskId; 

    } else {

      this.clientName = “DFSClient_” + r.nextInt();

    }

    defaultBlockSize = conf.getLong(“dfs.block.size”, DEFAULT_BLOCK_SIZE);

    defaultReplication = (short) conf.getInt(“dfs.replication”, 3);

    if (nameNodeAddr != null && rpcNamenode == null) {

      this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);

      //非常关键的一步 

         

通过RetryProxy的create方法来创建NameNode的RPC客户端ClientProtocol

   

 

      

this.namenode = 

createNamenode

(this.rpcNamenode);

    } else if (nameNodeAddr == null && rpcNamenode != null) {

      //This case is used for testing.

      this.namenode = this.rpcNamenode = rpcNamenode;

    } else {

      throw new IllegalArgumentException(

          “Expecting exactly one of nameNodeAddr and rpcNamenode being null: “

          + “nameNodeAddr=” + nameNodeAddr + “, rpcNamenode=” + rpcNamenode);

    }

  }

b过程 :

通过调用私有方法

createNamenode建立与名字节点的连接,方法内部通过RetryProxy的create方法来创建NameNode的RPC客户端ClientProtocol

至此,DFSClient初始化完成 

注:对于文件系统,本文的讨论中一直区分两种情况,namenode的远程方法不在本文讨论范围内

a,文件和目录相关事务(都使用远程接口客户端namenode,调用其同名远程方法)

b,数据块读写 

http://user.qzone.qq.com/578333569/infocenter#!app=2&via=QZ.HashRefresh&pos=1383552016

 

来源URL:http://www.tuicool.com/articles/y6NVVn