背景
最近打算要重启DataNode,之前有试过重启过程中导致业务任务失败的情况。所以想了解DataNode什么时候才算启动完成,以及能否检测DataNode是否已经准备好了。
本文分析的源码版本是hadoop cdh5.4.0
源码分析
从Datanode.java main方法开始
public static void main(String args[]) {
if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
System.exit(0);
}
secureMain(args, null);
}
进入secureMain方法,这里调用了createDataNode方法
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
// 实例化datanode
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
// 启动BPOfferService、dataXceiverServer、ipcServer等
dn.runDatanodeDaemon();
}
return dn;
}
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
if (conf == null)
conf = new HdfsConfiguration();
if (args != null) {
// parse generic hadoop options
GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
args = hParser.getRemainingArgs();
}
if (!parseArguments(args, conf)) {
printUsage(System.err);
return null;
}
// 获取实际的存储路径(根据配置dfs.datanode.data.dir)
Collection<StorageLocation> dataLocations = getStorageLocations(conf);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
return makeInstance(dataLocations, conf, resources);
}
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
// DataNode磁盘检查类,检查目录权限以及是否能够创建目录
DataNodeDiskChecker dataNodeDiskChecker =
new DataNodeDiskChecker(permission);
// 找出能够正常读写的路径
List<StorageLocation> locations =
checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
DefaultMetricsSystem.initialize("DataNode");
assert locations.size() > 0 : "number of data directories should be > 0";
return new DataNode(conf, locations, resources);
}
进入DataNode构造方法,主要看startDataNode(conf, dataDirs, resources)方法
void startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
SecureResources resources
) throws IOException {
// settings global for all BPs in the Data Node
this.secureResources = resources;
synchronized (this) {
this.dataDirs = dataDirs;
}
this.conf = conf;
this.dnConf = new DNConf(conf);
checkSecureConfig(dnConf, conf, resources);
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
if (dnConf.maxLockedMemory > 0) {
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
throw new RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) is greater than zero and native code is not available.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
}
if (Path.WINDOWS) {
NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
} else {
long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
if (dnConf.maxLockedMemory > ulimit) {
throw new RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) of %d bytes is more than the datanode's available" +
" RLIMIT_MEMLOCK ulimit of %d bytes.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
dnConf.maxLockedMemory,
ulimit));
}
}
}
LOG.info("Starting DataNode with maxLockedMemory = " +
dnConf.maxLockedMemory);
storage = new DataStorage();
// global DN settings
registerMXBean();
// 初始化xceiverServer一些对象
initDataXceiver(conf);
// 启动http web hdfs
startInfoServer(conf);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
// Login is done by now. Set the DN user name.
dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.info("dnUserName = " + dnUserName);
LOG.info("supergroup = " + supergroup);
// 初始化ipc服务
initIpcServer(conf);
metrics = DataNodeMetrics.create(conf, getDisplayName());
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
// 重要,构造BlockPoolManager
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(conf);
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
}
在blockPoolManager.refreshNamenodes里面调用了doRefreshNamenodes()方法
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newLinkedHashSet();
Set<String> toAdd = Sets.newLinkedHashSet();
Set<String> toRemove;
synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether
// it's an update of the set of NNs for an existing NS,
// or an entirely new nameservice.
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
toAdd.add(nameserviceId);
}
}
// Step 2. Any nameservices we currently have but are no longer present
// need to be removed.
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));
assert toRefresh.size() + toAdd.size() ==
addrMap.size() :
"toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
" toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
" toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
// 由于是重启,所以都是在toAdd里面
// Step 3. Start new nameservices
if (!toAdd.isEmpty()) {
LOG.info("Starting BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toAdd));
// 这里遍历的是nameServices,如果用了hdfs federation就会有多个
for (String nsToAdd : toAdd) {
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToAdd).values());
// 创建BPOfferService
BPOfferService bpos = createBPOS(addrs);
// 加入bpByNameserviceId里面,下次再执行这个方法的话,就是加入toRefresh里面了
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
// 启动所有offerServices的BPOfferService
startAll();
}
关注一下createBPOS方法,里面会包含多个BPServiceActor,存储在 BPOfferService.bpServices 变量内
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
return new BPOfferService(nnAddrs, dn);
}
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
this.dn = dn;
// 如果有standby和active两个namenode,就会创建两个BPServiceActor
for (InetSocketAddress addr : nnAddrs) {
this.bpServices.add(new BPServiceActor(addr, this));
}
}
上面的startAll()把所有的BPOfferService都启动了,实际是调用下面的BPServiceActor.start(),BPServiceActor的作用在这文件的最上面注释写着:1.和namenode预注册,2.和namenode注册,3.周期性发送心跳到namenode,4.处理来自于namenode的命令。继续看这个类的run()方法
public void run() {
LOG.info(this + " starting to offer service");
try {
while (true) {
// init stuff
try {
// 向namenode注册以及初始化blockPool
// setup storage
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}
runningState = RunningState.RUNNING;
while (shouldRun()) {
try {
// 更新当前那个是active actor,发送blockReport给namenode等
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally {
LOG.warn("Ending block pool service for: " + this);
cleanUp();
}
}
关注connectToNNAndHandshake()方法,这里初始化了blockPool
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
bpNamenode = dn.connectToNN(nnAddr);
// 从namenode获取一些版本等信息用于校验
// First phase of the handshake with NN - get the namespace
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(nsInfo);
// 向namenode注册,并且设定了一下blockReport的时间(当前时间 - (blockReportInterval - delay)),默认是马上
// Second phase of the handshake with the NN.
register();
}
在verifyAndSetNamespaceInfo()方法里,主要看dn.initBlockPool(this);
void initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
if (nsInfo == null) {
throw new IOException("NamespaceInfo not found: Block pool " + bpos
+ " should have retrieved namespace info before initBlockPool.");
}
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// 把BlockPoolId和BPOfferService关联起来,存放在bpByBlockPoolId
// Register the new block pool with the BP manager.
blockPoolManager.addBlockPool(bpos);
// 初始化data变量,创建出FsDatasetImpl
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
initStorage(nsInfo);
// 去掉坏了的磁盘
// Exclude failed disks before initializing the block pools to avoid startup
// failures.
checkDiskError();
// 开启DirectoryScanner,定期运行,用于处理内存中的对象和实际存储文件的差异
initDirectoryScanner(conf);
// 添加blockPool
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
}
这里面关注的是data.addBlockPool方法
public void addBlockPool(String bpid, Configuration conf)
throws IOException {
LOG.info("Adding block pool " + bpid);
synchronized(this) {
// 创建BlockPoolSlice对象,加入bpSlices变量内
volumes.addBlockPool(bpid, conf);
// 初始化ReplicaMap对象(ReplicaMap:Maintains the replica map)
volumeMap.initBlockPool(bpid);
}
// 获取所有磁盘的副本map
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
}
volumes.addBlockPool方法里,是创建BlockPoolSlice。BlockPoolSlice的介绍是说这是BlockPool存储在一个磁盘的一部分,里面主要是记录一些目录,还有使用大小等。注意这个有个计算磁盘使用大小非常耗时,这里使用了缓存,每600秒更新一次,在datanode退出的时候会写把数值写到文件里。
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
public void run() {
try (FsVolumeReference ref = v.obtainReference()) {
FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
" on volume " + v + "...");
long startTime = Time.monotonicNow();
// 创建BlockPoolSlice
v.addBlockPool(bpid, conf);
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
" on " + v + ": " + timeTaken + "ms");
} catch (ClosedChannelException e) {
// ignore.
} catch (IOException ioe) {
FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
". Will throw later.", ioe);
exceptions.add(ioe);
}
}
};
volumes.getAllVolumesMap()方法,实际是调用每一个BlockPoolSlice.getVolumeMap(),我们当前版本在这里没有cache,因为要遍历目录,所以这里的操作比较耗时。添加cache具体见HDFS-7928补丁。
void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap)
throws IOException {
// Recover lazy persist replicas, they will be added to the volumeMap
// when we scan the finalized directory.
if (lazypersistDir.exists()) {
int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
FsDatasetImpl.LOG.info(
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
}
// 遍历底下所有目录,构造副本map
// add finalized replicas
addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
// add rbw replicas
addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
}
回到最上面,执行完bpos.verifyAndSetNamespaceInfo(nsInfo);后,执行register()方法,向namenode注册。执行完后,再往上就是调用offerService(),发送心跳,blockMap等。
结论
namenode启动过程中会有多个线程,http和rpc的端口会先启动,但是这个时候还是不能提供服务的。当前版本主要耗时在构建volumeMap上,需要遍历磁盘目录。
初始化完volumeMap会向namenode注册,注册成功后会在日志打印successfully registered with NN(有两条,active和standby namenode)
之后datanode向namenode汇报块信息,发送完成后在日志打印Successfully sent block report(同样有两条)
只有在blockReport完成后才算真的启动完成。
在重启的过程中,如果时间不算太长,在namenode中还没有判定这个datanode为dead node,里面的block map信息还保留着,所以在完成磁盘扫描 volumeMap 之后也算完成了。
其他
代码比较多,限于个人能力,可能理解也有误,如果发现错误以后再更新。同时也没找出可编程的方法来判断datanode是否启动完成,除非修改datanode代码自己加标志位。
update(2017-07-18):判断datanode是否启动完成核心在判断两次blockreport是否完成(分别向active和standby namenode发送),所以可以通过访问datanode jmx地址http://xxx:50075/jmx,检查参数”BlockReportsNumOps” : 大于等于2来判断。
参考
第七章:小朱笔记hadoop之源码分析-hdfs分析 第五节:Datanode 分析 版本差异比较大
记一次DataNode慢启动问题