# Find Spark jars. if [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars" else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" fi if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark with the target \"package\" before running this program." 1>&2 exit 1 else LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" fi # Add the launcher build dir to the classpath if requested. if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" fi
private[deploy] classJavaMainApplication(klass: Class[_]) extendsSparkApplication{ overridedefstart(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", newArray[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { thrownewIllegalStateException("The main method in the given main class must be static") } val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) } }
private[spark] classYarnClusterApplicationextendsSparkApplication{ overridedefstart(args: Array[String], conf: SparkConf): Unit = { // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. conf.remove(JARS) conf.remove(FILES) conf.remove(ARCHIVES) newClient(newClientArguments(args), conf, null).run() } }
defsubmitApplication(): Unit = { ResourceRequestHelper.validateResources(sparkConf) try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() this.appId = newAppResponse.getApplicationId() // The app staging dir based on the STAGING_DIR configuration if configured // otherwise based on the users home directory. // scalastyle:off FileSystemGet val appStagingBaseDir = sparkConf.get(STAGING_DIR) .map { newPath(_, UserGroupInformation.getCurrentUser.getShortUserName) }.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) stagingDirPath = newPath(appStagingBaseDir, getAppStagingDir(appId)) // scalastyle:on FileSystemGet newCallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext() val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") // 提交application yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) } catch { case e: Throwable => if (stagingDirPath != null) { cleanupStagingDir() } throw e }
defmain(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = newApplicationMasterArguments(args) val sparkConf = newSparkConf() if (amArgs.propertiesFile != null) { Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => sparkConf.set(k, v) } }
// Both cases create a new SparkConf object which reads these configs from system properties. sparkConf.getAll.foreach { case (k, v) => sys.props(k) = v }
val yarnConf = newYarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) master = newApplicationMaster(amArgs, sparkConf, yarnConf)
val ugi = sparkConf.get(PRINCIPAL) match { caseSome(principal) if master.isClusterMode => val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull) val newUGI = UserGroupInformation.getCurrentUser() if (master.appAttemptId == null || master.appAttemptId.getAttemptId > 1) { Utils.withContextClassLoader(master.userClassLoader) { val credentialManager = newHadoopDelegationTokenManager(sparkConf, yarnConf, null) credentialManager.obtainDelegationTokens(originalCreds) } }
newUGI.addCredentials(originalCreds) newUGI case _ => SparkHadoopUtil.get.createSparkUser() }
ugi.doAs(newPrivilegedExceptionAction[Unit]() { overridedefrun(): Unit = System.exit(master.run()) }) }
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
thrownewIllegalStateException("User did not initialize spark context!")
}
resumeDriver()
userClassThread.join()
} catch {
case e: SparkExceptionif e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} finally {
resumeDriver()
}
}
privatedefstartUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread") var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { // When running pyspark, the app is run using PythonRunner. The second argument is the list // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. userArgs = Seq(args.primaryPyFile, "") ++ userArgs } if (args.primaryRFile != null && (args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) { // TODO(davies): add R dependencies here } val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]]) val userThread = newThread { overridedefrun(): Unit = { try { if (!Modifier.isStatic(mainMethod.getModifiers)) { logError(s"Could not find static main method in object ${args.userClass}") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS) } else { mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running user class") } } catch { case e: InvocationTargetException => e.getCause match { case _: InterruptedException => // Reporter thread can interrupt to stop user class caseSparkUserAppException(exitCode) => val msg = s"User application exited with status $exitCode" logError(msg) finish(FinalApplicationStatus.FAILED, exitCode, msg) case cause: Throwable => logError("User class threw exception: ", cause) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, "User class threw exception: " + StringUtils.stringifyException(cause)) } sparkContextPromise.tryFailure(e.getCause()) } finally { // Notify the thread waiting for the SparkContext, in case the application did not // instantiate one. This will do nothing when the user code instantiates a SparkContext // (with the correct master), or when the user code throws an exception (due to the // tryFailure above). sparkContextPromise.trySuccess(null) } } } userThread.setContextClassLoader(userClassLoader) userThread.setName("Driver") userThread.start() userThread }
private ProcessBuilder createBuilder()throws IOException { List<String> cmd = new ArrayList<>(); cmd.add(findSparkSubmit()); cmd.addAll(builder.buildSparkSubmitArgs()); // Since the child process is a batch script, let's quote things so that special characters are // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are // weird. if (isWindows()) { List<String> winCmd = new ArrayList<>(); for (String arg : cmd) { winCmd.add(quoteForBatchScript(arg)); } cmd = winCmd; } ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()])); for (Map.Entry<String, String> e : builder.childEnv.entrySet()) { pb.environment().put(e.getKey(), e.getValue()); } if (workingDir != null) { pb.directory(workingDir); } // Only one of redirectError and redirectError(...) can be specified. // Similarly, if redirectToLog is specified, no other redirections should be specified. checkState(!redirectErrorStream || errorStream == null, "Cannot specify both redirectError() and redirectError(...) "); checkState(getLoggerName() == null || ((!redirectErrorStream && errorStream == null) || outputStream == null), "Cannot used redirectToLog() in conjunction with other redirection methods."); if (redirectErrorStream) { pb.redirectErrorStream(true); } if (errorStream != null) { pb.redirectError(errorStream); } if (outputStream != null) { pb.redirectOutput(outputStream); } return pb; }