在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):
第一种:
通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:
./spark-submit –class com.learn.spark.SimpleApp –master yarn –deploy-mode client –driver-memory 2g –executor-memory 2g –executor-cores 3 ../spark-demo.jar
参数含义就不解释了,请参考官网资料。
第二种:
提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
根据官网的示例,通过JAVA API编程的方式提交有两种方式:
第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:
1 package com.learn.spark;
2
3 import org.apache.spark.launcher.SparkAppHandle;
4 import org.apache.spark.launcher.SparkLauncher;
5
6 import java.io.IOException;
7 import java.util.HashMap;
8 import java.util.concurrent.CountDownLatch;
9
10 public class LanuncherAppV {
11 public static void main(String[] args) throws IOException, InterruptedException {
12
13
14 HashMap env = new HashMap();
15 //这两个属性必须设置
16 env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
17 env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
18 //可以不设置
19 //env.put("YARN_CONF_DIR","");
20 CountDownLatch countDownLatch = new CountDownLatch(1);
21 //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
22 SparkAppHandle handle = new SparkLauncher(env)
23 .setSparkHome("/usr/local/spark")
24 .setAppResource("/usr/local/spark/spark-demo.jar")
25 .setMainClass("com.learn.spark.SimpleApp")
26 .setMaster("yarn")
27 .setDeployMode("cluster")
28 .setConf("spark.app.id", "11222")
29 .setConf("spark.driver.memory", "2g")
30 .setConf("spark.akka.frameSize", "200")
31 .setConf("spark.executor.memory", "1g")
32 .setConf("spark.executor.instances", "32")
33 .setConf("spark.executor.cores", "3")
34 .setConf("spark.default.parallelism", "10")
35 .setConf("spark.driver.allowMultipleContexts", "true")
36 .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
37 //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
38 @Override
39 public void stateChanged(SparkAppHandle sparkAppHandle) {
40 if (sparkAppHandle.getState().isFinal()) {
41 countDownLatch.countDown();
42 }
43 System.out.println("state:" + sparkAppHandle.getState().toString());
44 }
45
46
47 @Override
48 public void infoChanged(SparkAppHandle sparkAppHandle) {
49 System.out.println("Info:" + sparkAppHandle.getState().toString());
50 }
51 });
52 System.out.println("The task is executing, please wait ....");
53 //线程等待任务结束
54 countDownLatch.await();
55 System.out.println("The task is finished!");
56
57
58 }
59 }
注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:
1 package com.learn.spark;
2
3 import org.apache.spark.launcher.SparkAppHandle;
4 import org.apache.spark.launcher.SparkLauncher;
5
6 import java.io.IOException;
7 import java.util.HashMap;
8
9 public class LauncherApp {
10
11 public static void main(String[] args) throws IOException, InterruptedException {
12
13 HashMap env = new HashMap();
14 //这两个属性必须设置
15 env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
16 env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
17 //env.put("YARN_CONF_DIR","");
18
19 SparkLauncher handle = new SparkLauncher(env)
20 .setSparkHome("/usr/local/spark")
21 .setAppResource("/usr/local/spark/spark-demo.jar")
22 .setMainClass("com.learn.spark.SimpleApp")
23 .setMaster("yarn")
24 .setDeployMode("cluster")
25 .setConf("spark.app.id", "11222")
26 .setConf("spark.driver.memory", "2g")
27 .setConf("spark.akka.frameSize", "200")
28 .setConf("spark.executor.memory", "1g")
29 .setConf("spark.executor.instances", "32")
30 .setConf("spark.executor.cores", "3")
31 .setConf("spark.default.parallelism", "10")
32 .setConf("spark.driver.allowMultipleContexts","true")
33 .setVerbose(true);
34
35
36 Process process =handle.launch();
37 InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
38 Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
39 inputThread.start();
40
41 InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
42 Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
43 errorThread.start();
44
45 System.out.println("Waiting for finish...");
46 int exitCode = process.waitFor();
47 System.out.println("Finished! Exit code:" + exitCode);
48
49 }
50 }
使用的自定义InputStreamReaderRunnable类实现如下:
1 package com.learn.spark;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.InputStreamReader;
7
8 public class InputStreamReaderRunnable implements Runnable {
9
10 private BufferedReader reader;
11
12 private String name;
13
14 public InputStreamReaderRunnable(InputStream is, String name) {
15 this.reader = new BufferedReader(new InputStreamReader(is));
16 this.name = name;
17 }
18
19 public void run() {
20
21 System.out.println("InputStream " + name + ":");
22 try {
23 String line = reader.readLine();
24 while (line != null) {
25 System.out.println(line);
26 line = reader.readLine();
27 }
28 reader.close();
29 } catch (IOException e) {
30 e.printStackTrace();
31 }
32 }
33 }
