hadoop job -kill 调用的是CLI.java里面的job.killJob(); 这里会分几种情况,如果是能查询到状态是RUNNING的话,是直接向AppMaster发送kill请求的。
YARNRunner.java
@Override
public void killJob(JobID arg0) throws IOException, InterruptedException {
/* check if the status is not running, if not send kill to RM */
JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
// get status from RM and return
if (status == null) {
killUnFinishedApplication(appId);
return;
}
if (status.getState() != JobStatus.State.RUNNING) {
killApplication(appId);
return;
}
try {
/* send a kill to the AM */
clientCache.getClient(arg0).killJob(arg0);
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
while ((currentTimeMillis < timeKillIssued + 10000L)
&& !isJobInTerminalState(status)) {
try {
Thread.sleep(1000L);
} catch (InterruptedException ie) {
/** interrupted, just break */
break;
}
currentTimeMillis = System.currentTimeMillis();
status = clientCache.getClient(arg0).getJobStatus(arg0);
if (status == null) {
killUnFinishedApplication(appId);
return;
}
}
} catch(IOException io) {
LOG.debug("Error when checking for application status", io);
}
if (status != null && !isJobInTerminalState(status)) {
killApplication(appId);
}
}
MRClientService.java
@SuppressWarnings("unchecked")
@Override
public KillJobResponse killJob(KillJobRequest request)
throws IOException {
JobId jobId = request.getJobId();
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill job " + jobId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message);
verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new JobDiagnosticsUpdateEvent(jobId, message));
appContext.getEventHandler().handle(
new JobEvent(jobId, JobEventType.JOB_KILL));
KillJobResponse response =
recordFactory.newRecordInstance(KillJobResponse.class);
return response;
}
yarn application -kill 使用的是ApplicationCLI.java,是向RM发送kill请求
/**
* Kills the application with the application id as appId
*
* @param applicationId
* @throws YarnException
* @throws IOException
*/
private void killApplication(String applicationId) throws YarnException,
IOException {
ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
ApplicationReport appReport = null;
try {
appReport = client.getApplicationReport(appId);
} catch (ApplicationNotFoundException e) {
sysout.println("Application with id '" + applicationId +
"' doesn't exist in RM.");
throw e;
}
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
|| appReport.getYarnApplicationState() == YarnApplicationState.KILLED
|| appReport.getYarnApplicationState() == YarnApplicationState.FAILED) {
sysout.println("Application " + applicationId + " has already finished ");
} else {
sysout.println("Killing application " + applicationId);
client.killApplication(appId);
}
}
YarnClientImpl.java
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
try {
int pollCount = 0;
long startTime = System.currentTimeMillis();
while (true) {
KillApplicationResponse response =
rmClient.forceKillApplication(request);
if (response.getIsKillCompleted()) {
LOG.info("Killed application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be killed.");
}
if (++pollCount % 10 == 0) {
LOG.info("Waiting for application " + applicationId + " to be killed.");
}
Thread.sleep(asyncApiPollIntervalMillis);
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for application " + applicationId
+ " to be killed.");
}
}