Hadoop - 作业提交过程(源码)未完待续

2023-10-30

提交作业

hadoop jar word-count.jar /user/1.txt,/user/2.txt,/user/3.txt /user/output

hadoop shell 关键代码

#core commands  
  *)
    # the core commands
    if [ "$COMMAND" = "fs" ] ; then
      CLASS=org.apache.hadoop.fs.FsShell
    elif [ "$COMMAND" = "version" ] ; then
      CLASS=org.apache.hadoop.util.VersionInfo
    elif [ "$COMMAND" = "jar" ] ; then
      CLASS=org.apache.hadoop.util.RunJar
      if [[ -n "${YARN_OPTS}" ]] || [[ -n "${YARN_CLIENT_OPTS}" ]]; then
        echo "WARNING: Use \"yarn jar\" to launch YARN applications." 1>&2
      fi
    elif [ "$COMMAND" = "key" ] ; then
      CLASS=org.apache.hadoop.crypto.key.KeyShell
    ......
    ......
    fi
		......
		......
    exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
    ;;
esac

org.apache.hadoop.util.RunJar.main

public static void main(String[] args) throws Throwable {
    new RunJar().run(args);
}

public void run(String[] args) throws Throwable {
    String usage = "RunJar jarFile [mainClass] args...";

    if (args.length < 1) {
        System.err.println(usage);
        System.exit(-1);
    }

    int firstArg = 0;
    String fileName = args[firstArg++];
    File file = new File(fileName); // 例子中 word-count.jar
    if (!file.exists() || !file.isFile()) {
        System.err.println("JAR does not exist or is not a normal file: " +
                file.getCanonicalPath());
        System.exit(-1);
    }
    String mainClassName = null;
	
    JarFile jarFile;
    try {
        jarFile = new JarFile(fileName);
    } catch(IOException io) {
        throw new IOException("Error opening job jar: " + fileName)
                .initCause(io);
    }
	// 获取主类
    Manifest manifest = jarFile.getManifest();
    if (manifest != null) {
        mainClassName = manifest.getMainAttributes().getValue("Main-Class");
    }
    jarFile.close();

    if (mainClassName == null) {
        if (args.length < 2) {
            System.err.println(usage);
            System.exit(-1);
        }
        mainClassName = args[firstArg++];
    }
    mainClassName = mainClassName.replaceAll("/", ".");
	// 创建临时目录
    File tmpDir = new File(System.getProperty("java.io.tmpdir"));
    ensureDirectory(tmpDir);
	// 创建工作目录
    final File workDir;
    try {
        workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
    } catch (IOException ioe) {
        // If user has insufficient perms to write to tmpDir, default
        // "Permission denied" message doesn't specify a filename.
        System.err.println("Error creating temp dir in java.io.tmpdir "
                + tmpDir + " due to " + ioe.getMessage());
        System.exit(-1);
        return;
    }

    if (!workDir.delete()) {
        System.err.println("Delete failed for " + workDir);
        System.exit(-1);
    }
    ensureDirectory(workDir);
	// 钩子函数,作业结束时删除工作目录内容
    ShutdownHookManager.get().addShutdownHook(
            new Runnable() {
                @Override
                public void run() {
                    FileUtil.fullyDelete(workDir);
                }
            }, SHUTDOWN_HOOK_PRIORITY);

    // 解压 jar 包到工作目录
    unJar(file, workDir);
	
    ClassLoader loader = createClassLoader(file, workDir);

    Thread.currentThread().setContextClassLoader(loader);
    Class<?> mainClass = Class.forName(mainClassName, true, loader);
    Method main = mainClass.getMethod("main", new Class[] {
            Array.newInstance(String.class, 0).getClass()
    });
    String[] newArgs = Arrays.asList(args)
            .subList(firstArg, args.length).toArray(new String[0]);
    try {
    	// 执行 word-count 主类 main 函数
        main.invoke(null, new Object[] { newArgs });
    } catch (InvocationTargetException e) {
        throw e.getTargetException();
    }
}

main 函数

public static void main(String[] args) {
    try {
    	// 初始化,包括读取配置文件信息等
        Job job = Job.getInstance(new Configuration());
        
        job.setJarByClass(Application.class);
        // mapreduce.job.map.class
        job.setMapperClass(WordCountMapper.class);
        // mapreduce.map.output.key.class
        job.setMapOutputKeyClass(Text.class);
        // mapreduce.map.output.value.class
        job.setMapOutputValueClass(LongWritable.class);

		// mapreduce.job.reduce.class
        job.setReducerClass(WordCountReducer.class);
        // mapreduce.job.output.key.class
        job.setOutputKeyClass(Text.class);
        // mapreduce.job.output.value.class
        job.setOutputValueClass(LongWritable.class);
		
		// 添加并检查输入路径(里边儿涉及 fs 操作)
        FileInputFormat.addInputPaths(job, args[0]);
        // 添加并检查输出路径 mapreduce.output.fileoutputformat.outputdir
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
		// 执行并等待作业完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    catch (Exception e)  {
        e.printStackTrace();
    }
}

Job.java

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
    if (state == JobState.DEFINE) {
    	// 提交任务到集群,仅提交,不等待执行结果
        submit();
    }
    if (verbose) { // 是否需要打印执行进度
    	// 通过 ClientProtocol 和 JobTracker 交互获取状态信息及进度信息实时打印直到 Job 成功、失败或被杀死
        monitorAndPrintJob();
    } else {
        // 客户端轮询 isComplete() 时间间隔 默认 5000ms
        int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
        while (!isComplete()) {
            try {
                Thread.sleep(completionPollIntervalMillis);
            } catch (InterruptedException ie) {
            }
        }
    }
    return isSuccessful();
}

// 仅提交任务到集群,不等待执行结果
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
	// 检查状态,若 Job 不处于 DEFINE 状态则抛出异常
    ensureState(JobState.DEFINE);
    // 新旧 API 配置的检查和设置
    setUseNewAPI();
    // 与 JobTracker 建立连接
    connect();
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
            // 提交作业
            return submitter.submitJobInternal(Job.this, cluster);
        }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
}
// 关于新旧 mapper\reducer API
// 新指的是:org.apache.hadoop.mapreduce.Mapper\org.apache.hadoop.mapreduce.Reducer
// 旧指的是:org.apache.hadoop.mapred.Mapper\org.apache.hadoop.mapred.Reducer
private void setUseNewAPI() throws IOException {
    // 获取配置的 reduce task 数量(mapreduce.job.reduces),未配置则为 1
    int numReduces = conf.getNumReduceTasks();
    String oldMapperClass = "mapred.mapper.class";  // 该配置表示是否使用旧 mapper API
    String oldReduceClass = "mapred.reducer.class"; // 该配置表示是否使用旧 reducer API 
    // 设置 mapred.mapper.new-api 配置(如果已经含有此项配置则不进行操作),判断是否使用新 mapper API
    conf.setBooleanIfUnset("mapred.mapper.new-api", conf.get(oldMapperClass) == null);
    // 获取 mapred.mapper.new-api 配置,返回 ture\false
    if (conf.getUseNewMapper()) { 
        String mode = "new map API";
        ensureNotSet("mapred.input.format.class", mode);
        ensureNotSet(oldMapperClass, mode);
        if (numReduces != 0) {
            ensureNotSet("mapred.partitioner.class", mode);
        } else {
            ensureNotSet("mapred.output.format.class", mode);
        }
    } else {
        String mode = "map compatability";
        // mapreduce.job.inputformat.class
        ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
        // mapreduce.job.map.class
        ensureNotSet(MAP_CLASS_ATTR, mode);
        if (numReduces != 0) {
        	// mapreduce.job.partitioner.class
            ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
        } else {
        	// mapreduce.job.outputformat.class
            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
        }
    }
    if (numReduces != 0) {
        conf.setBooleanIfUnset("mapred.reducer.new-api", conf.get(oldReduceClass) == null);
        if (conf.getUseNewReducer()) {
            String mode = "new reduce API";
            ensureNotSet("mapred.output.format.class", mode);
            ensureNotSet(oldReduceClass, mode);
        } else {
            String mode = "reduce compatability";
            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
            ensureNotSet(REDUCE_CLASS_ATTR, mode);
        }
    }
}

private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
    	// 当前用户执行获取 cluster,doAS 涉及 javax.security.auth 权限控制
        cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
            public Cluster run() throws IOException, InterruptedException, ClassNotFoundException {
            	// 这里主要是初始化客户端和 JobTracker 进行交互的协议 ClientProtocol,即与 JobTracker 建立连接
            	// 可通过 ClientProtocol 提交 Job 及查看作业状态
                return new Cluster(getConfiguration());
            }
        });
    }
}

org.apache.hadoop.mapreduce.JobSubmitter

JobStatus submitJobInternal(Job job, Cluster cluster)  throws ClassNotFoundException, InterruptedException, IOException {

    // 检查输出规则,如果是 FileOutputFormat 会检查输出路径是否已经存在
    checkSpecs(job);
	
    Configuration conf = job.getConfiguration();
    // 将 MR jar URI 添加到 mapreduce.job.cache.archives 内
    addMRFrameworkToDistributedCache(conf);
	// 获取或创建 MR staging 文件夹,检查文件权限是否为 0700,否则设置为 0700
	// MR staging 文件夹 = [yarn.app.mapreduce.am.staging-dir]/[user]/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    
    // 获取客户端 ip 信息
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    // 给 Job 创建一个 ID
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers",  
      	"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");
      // 获取授权码
      TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf);
	  // 生成 secret 并存储
      populateTokenCache(conf, job.getCredentials());
      // 生成 shuffle 的 secret 
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }
	  // 上传配置文件及依赖的包到 fs 的 submitJobDir 下
      copyAndConfigureFiles(job, submitJobDir);
      // 获取 job 配置文件 submitJobDir/job.xml
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      // 分片,一个分片会被一个 map task 处理,分片和 block 是不同的。分片的具体实现在 InputFormat 中
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      Limits.reset(conf);

      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
}

private void checkSpecs(Job job) throws ClassNotFoundException,  InterruptedException, IOException {
    JobConf jConf = (JobConf)job.getConfiguration();
    // 如果是使用新 API
    if (jConf.getNumReduceTasks() == 0 ? jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
    	// 实例化新 API 的 OutputFormat 
        org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = 
    		ReflectionUtils.newInstance(job.getOutputFormatClass(), job.getConfiguration());
    	// 执行检查
        output.checkOutputSpecs(job);
    } else {
    	// 旧 API 执行检查
        jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
    }
}

// 将 MR jar 包加到分布式缓存中
private static void addMRFrameworkToDistributedCache(Configuration conf) throws IOException {
	// 获取 mapreduce.application.framework.path 值
    String framework = conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, "");
    if (!framework.isEmpty()) {
        URI uri;
        try {
            uri = new URI(framework);
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Unable to parse '" + framework
                    + "' as a URI, check the setting for "
                    + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e);
        }
		// 符号链接的处理
        String linkedName = uri.getFragment();
        FileSystem fs = FileSystem.get(conf);
        Path frameworkPath = fs.makeQualified(new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()));
        FileContext fc = FileContext.getFileContext(frameworkPath.toUri(), conf);
        frameworkPath = fc.resolvePath(frameworkPath);
        uri = frameworkPath.toUri();
        try {
            uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, linkedName);
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException(e);
        }
		// 将 MR jar URI 添加到 mapreduce.job.cache.archives 内
        DistributedCache.addCacheArchive(uri, conf);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop - 作业提交过程(源码)未完待续 的相关文章

随机推荐