Hive源码阅读--SQL的语法解析和语义分析--Driver

2023-05-16

前面五个类,殊途同归都是CliDriver类,他负责接受用户在命令行上输入的信息,然后准备执行并将执行的结果返回。

而真正底层干事情的是Driver,他将接受到的命令编译,优化为MR(或RDD),真正的调动集群跑作业。

processLocalCmd中有这样一句ret = qp.run(cmd).getResponseCode();,这句中的run不是CliDriverrun,而是Driverrun

org.apache.hadoop.hive.ql.Driver.run

org.apache.hadoop.hive.ql.Driver
----------

	public CommandProcessorResponse run(String command) throws CommandNeedRetryException {
        return this.run(command, false);
    }

    public CommandProcessorResponse run() throws CommandNeedRetryException {
        return this.run((String)null, true);
    }

    public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException {
        // 这一句调用的runInternal是对sql的执行,下面都是对执行结果的校验
        // 不管对不对,先执行,保证优先性
        CommandProcessorResponse cpr = this.runInternal(command, alreadyCompiled);
        if (cpr.getResponseCode() == 0) {
            return cpr;
        } else {
            SessionState ss = SessionState.get();
            if (ss == null) {
                return cpr;
            } else {
                MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());
                if (!(mdf instanceof JsonMetaDataFormatter)) {
                    return cpr;
                } else {
                    try {
                        if (this.downstreamError == null) {
                            mdf.error(ss.out, this.errorMessage, cpr.getResponseCode(), this.SQLState);
                            return cpr;
                        }

                        ErrorMsg canonicalErr = ErrorMsg.getErrorMsg(cpr.getResponseCode());
                        if (canonicalErr != null && canonicalErr != ErrorMsg.GENERIC_ERROR) {
                            mdf.error(ss.out, this.errorMessage, cpr.getResponseCode(), this.SQLState, (String)null);
                            return cpr;
                        }

                        if (this.downstreamError instanceof HiveException) {
                            HiveException rc = (HiveException)this.downstreamError;
                            mdf.error(ss.out, this.errorMessage, rc.getCanonicalErrorMsg().getErrorCode(), this.SQLState, rc.getCanonicalErrorMsg() == ErrorMsg.GENERIC_ERROR ? StringUtils.stringifyException(rc) : null);
                        } else {
                            ErrorMsg canonicalMsg = ErrorMsg.getErrorMsg(this.downstreamError.getMessage());
                            mdf.error(ss.out, this.errorMessage, canonicalMsg.getErrorCode(), this.SQLState, StringUtils.stringifyException(this.downstreamError));
                        }
                    } catch (HiveException var8) {
                        console.printError("Unable to JSON-encode the error", StringUtils.stringifyException(var8));
                    }

                    return cpr;
                }
            }
        }
    }

run只是Driver的入口,处在QL阶段,后面还有:
在这里插入图片描述
所以只看关键部分。

org.apache.hadoop.hive.ql.Driver.runInternal

然后看runInternal方法:

org.apache.hadoop.hive.ql.Driver
----------
   private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException {
			......
            perfLogger = null;
            PerfLogger perfLogger;
            int ret;
            if (!alreadyCompiled) {
            	// 调用compileInternal方法把SQL编译为QueryPlan
                ret = this.compileInternal(command, true);
                perfLogger = SessionState.getPerfLogger();
                if (ret != 0) {
                    CommandProcessorResponse var8 = this.createProcessorResponse(ret);
                    return var8;
                }
            } else {
                perfLogger = SessionState.getPerfLogger();
                this.plan.setQueryStartTime(perfLogger.getStartTime("Driver.run"));
            }

          	......
				// 调用execute执行QueryPlan中的所有task
                ret = this.execute(true);
                if (ret != 0) {
                    var10 = this.rollback(this.createProcessorResponse(ret));
                    return var10;
                }
			......
    }

org.apache.hadoop.hive.ql.Driver.compileInternal

继续看compileInternal方法

org.apache.hadoop.hive.ql.Driver
----------

	private int compileInternal(String command, boolean deferClose) {
        ReentrantLock compileLock = this.tryAcquireCompileLock(this.isParallelEnabled, command);
        if (compileLock == null) {
            return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
        } else {
            int ret;
            try {
            	// Driver的run方法最终会执行compile()操作,Compiler作语法解析和语义分析。
            	// compile()是Driver最关键的方法,单个方法代码近300行。
                ret = this.compile(command, true, deferClose);
            } finally {
                compileLock.unlock();
            }

            if (ret != 0) {
                try {
                    this.releaseLocksAndCommitOrRollback(false, (HiveTxnManager)null);
                } catch (LockException var8) {
                    LOG.warn("Exception in releasing locks. " + StringUtils.stringifyException(var8));
                }
            }

            PerfLogger perfLogger = SessionState.getPerfLogger();
            this.queryDisplay.setPerfLogStarts(Phase.COMPILATION, perfLogger.getStartTimes());
            this.queryDisplay.setPerfLogEnds(Phase.COMPILATION, perfLogger.getEndTimes());
            return ret;
        }
    }

org.apache.hadoop.hive.ql.Driver.compile

org.apache.hadoop.hive.ql.Driver
----------

public int compile(String command, boolean resetTaskIds, boolean deferClose) {
	  ......
	  // 将SQL转换为ASTNode
      ParseDriver pd = new ParseDriver();
      ASTNode tree = pd.parse(command, ctx);
      tree = ParseUtils.findRootNonNullToken(tree);
	  ......
	  // 对ASTNode进行封装
      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
	  ......
	  // 将ASTNode转化为Task,包括可能的optimize,过程比较复杂
      sem.analyze(tree, ctx);
	  ......
  
      // 记录所有符合ACID的FileSinkOperators,这样我们就可以将事务ID添加到acidSinks 
      acidSinks = sem.getAcidFileSinks();

      LOG.info("Semantic Analysis Completed");

      // 验证Plan
      sem.validate();
      acidInQuery = sem.hasAcidInQuery();
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);

      if (isInterrupted()) {
        return handleInterruption("after analyzing query.");
      }

      // 将BaseSemanticAnalyzer传入QueryPlan构造函数来创建QueryPlan
      schema = getSchema(sem, conf);
      plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,queryState.getHiveOperation(), schema);
	  ......
}

Driver主要进行SQL的语法解析和语义分析,他调用parse将语句转换为ASTNode,调用analyzeASTNode转换为Task。然后使用Task构建QueryPlan

构建的QueryPlan将会被保存在Driver中,然后交由execute进行执行。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hive源码阅读--SQL的语法解析和语义分析--Driver 的相关文章

随机推荐