Spark Launcher Java API提交Spark算法_sparklauncher setmaster-程序员宅基地

技术标签: spark  spark launcher  Spark  

在介绍之前,我先附上spark 官方文档地址:

http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/package-summary.html

个人源码github地址:

https://github.com/yyijun/framework/tree/master/framework-spark

1.主要提交参数说明

 spark-submit \ 
    --master yarn \ 
    --deploy-mode cluster \ 
    --driver-memory 4g \
    --driver-cores 4 \
    --num-executors 20 \
    --executor-cores 4 \
    --executor-memory  10g \
    --class com.yyj.train.spark.launcher.TestSparkLauncher \ 
    --conf spark.yarn.jars=hdfs://hadoop01.xxx.xxx.com:8020/trainsparklauncher/jars/*.jar \ 
    --jars $(ls lib/*.jar| tr '\n' ',') \ 
    lib/ train-spark-1.0.0.jar

--conf spark.yarn.jars:提交算法到yarn集群时算法依赖spark安装包lib目录下的jar包,如果不指定,则每次启动任务都会先上传相关依赖包,耗时严重;

--jars:算法依赖的相关包,spark standalone模式、yarn模式都有用,多个依赖包用逗号”,”分隔;

2.Idea提交算法到yarn集群

2.1.入口参数配置

    val spark = SparkSession
      .builder
      .appName("TestSparkLauncher")
      .master("yarn")
      .config("deploy.mode", "cluster")
      .config("spark.yarn.jars", "hdfs://hadoop01.xxx.xxx.com:8020/trainsparklauncher/jars/*.jar")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .enableHiveSupport()
      .getOrCreate()

2.2.pom.xml配置

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-yarn_2.11</artifactId>
      <version>2.1.0</version>
</dependency>

3.提交准备

1、从大数据平台下载hadoop相关的xml配置文件:
    core-site.xml:必须;
    hdfs-site.xml:必须;
    hive-site.xml:提交的算法里面用到spark on hive时需要此文件;
    yarn-site.xml:提交算法到yarn时必须要此文件;

2、准备自己的算法包,这里对应替换为自己的算法包:
    train-spark-1.0.0.jar和train-common-1.0.0.jar

3、上传spark安装目录下jars目录下相关的jar包到hdfs:hadoop fs –put –f /opt/cloudera/parcels/SPARK2/lib/spark2/jars /hdfs目录

测试提交算法

package com.yyj.framework.spark.launcher;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by yangyijun on 2019/5/20.
 * 提交spark算法入口类
 */
public class SparkLauncherMain {

    public static void main(String[] args) {
        System.out.println("starting...");
        String confPath = "/Users/yyj/workspace/alg/src/main/resources";
        System.out.println("confPath=" + confPath);

        //开始构建提交spark时依赖的jars
        String rootPath = "/Users/yyj/workspace/alg/lib/";
        File file = new File(rootPath);
        StringBuilder sb = new StringBuilder();
        String[] files = file.list();
        for (String s : files) {
            if (s.endsWith(".jar")) {
                sb.append("hdfs://hadoop01.xxx.xxx.com:8020/user/alg/jars/");
                sb.append(s);
                sb.append(",");
            }
        }
        String jars = sb.toString();
        jars = jars.substring(0, jars.length() - 1);

        Map<String, String> conf = new HashMap<>();
        conf.put(SparkConfig.DEBUG, "false");
        conf.put(SparkConfig.APP_RESOURCE, "hdfs://hadoop01.xxx.xxx.com:8020/user/alg/jars/alg-gs-offline-1.0.0.jar");
        conf.put(SparkConfig.MAIN_CLASS, "com.yyj.alg.gs.offline.StartGraphSearchTest");
        conf.put(SparkConfig.MASTER, "yarn");
        //如果是提交到spark的standalone集群则采用下面的master
        //conf.put(SparkConfig.MASTER, "spark://hadoop01.xxx.xxx.com:7077");
        conf.put(SparkConfig.APP_NAME, "offline-graph-search");
        conf.put(SparkConfig.DEPLOY_MODE, "client");
        conf.put(SparkConfig.JARS, jars);
        conf.put(SparkConfig.HADOOP_CONF_DIR, confPath);
        conf.put(SparkConfig.YARN_CONF_DIR, confPath);
        conf.put(SparkConfig.SPARK_HOME, "/Users/yyj/spark2");
        conf.put(SparkConfig.DRIVER_MEMORY, "2g");
        conf.put(SparkConfig.EXECUTOR_CORES, "2");
        conf.put(SparkConfig.EXECUTOR_MEMORY, "2g");
        conf.put(SparkConfig.SPARK_YARN_JARS, "hdfs://hadoop01.xxx.xxx.com:8020/user/alg/jars/*.jar");
        conf.put(SparkConfig.APP_ARGS, "params");
        SparkActionLauncher launcher = new SparkActionLauncher(conf);
        boolean result = launcher.waitForCompletion();
        System.out.println("============result=" + result);
    }
}

构造SparkLauncher对象,配置Spark提交算法相关参数及说明

 private SparkLauncher createSparkLauncher() {
        logger.info("actionConfig:\n" + JSON.toJSONString(conf, true));
        this.debug = Boolean.parseBoolean(conf.get(SparkConfig.DEBUG));
        Map<String, String> env = new HashMap<>();
        //配置hadoop的xml文件本地路径
        env.put(SparkConfig.HADOOP_CONF_DIR, conf.get(SparkConfig.HADOOP_CONF_DIR));
        //配置yarn的xml文件本地路径
        env.put(SparkConfig.YARN_CONF_DIR, conf.get(SparkConfig.HADOOP_CONF_DIR));
        SparkLauncher launcher = new SparkLauncher(env);
        //设置算法入口类所在的jar包本地路径
        launcher.setAppResource(conf.get(SparkConfig.APP_RESOURCE));
        //设置算法入口类保证包名称及类名,例:com.yyj.train.spark.launcher.TestSparkLauncher
        launcher.setMainClass(conf.get(SparkConfig.MAIN_CLASS));
        //设置集群的master地址:yarn/spark standalone的master地址,例:spark://hadoop01.xxx.xxx.com:7077
        launcher.setMaster(conf.get(SparkConfig.MASTER));
        //设置部署模式:cluster(集群模式)/client(客户端模式)
        launcher.setDeployMode(conf.get(SparkConfig.DEPLOY_MODE));
        //设置算法依赖的包的本地路径,多个jar包用逗号","隔开,如果是spark on yarn只需要把核心算法包放这里即可,
        // spark相关的依赖包可以预先上传到hdfs并通过 spark.yarn.jars参数指定;
        // 如果是spark standalone则需要把所有依赖的jar全部放在这里
        launcher.addJar(conf.get(SparkConfig.JARS));
        //设置应用的名称
        launcher.setAppName(conf.get(SparkConfig.APP_NAME));
        //设置spark客户端安装包的home目录,提交算法时需要借助bin目录下的spark-submit脚本
        launcher.setSparkHome(conf.get(SparkConfig.SPARK_HOME));
        //driver的内存设置
        launcher.addSparkArg(SparkConfig.DRIVER_MEMORY, conf.getOrDefault(SparkConfig.DRIVER_MEMORY, "4g"));
        //driver的CPU核数设置
        launcher.addSparkArg(SparkConfig.DRIVER_CORES, conf.getOrDefault(SparkConfig.DRIVER_CORES, "2"));
        //启动executor个数
        launcher.addSparkArg(SparkConfig.NUM_EXECUTOR, conf.getOrDefault(SparkConfig.NUM_EXECUTOR, "30"));
        //每个executor的CPU核数
        launcher.addSparkArg(SparkConfig.EXECUTOR_CORES, conf.getOrDefault(SparkConfig.EXECUTOR_CORES, "4"));
        //每个executor的内存大小
        launcher.addSparkArg(SparkConfig.EXECUTOR_MEMORY, conf.getOrDefault(SparkConfig.EXECUTOR_MEMORY, "4g"));
        String sparkYarnJars = conf.get(SparkConfig.SPARK_YARN_JARS);
        if (StringUtils.isNotBlank(sparkYarnJars)) {
            //如果是yarn的cluster模式需要通过此参数指定算法所有依赖包在hdfs上的路径
            launcher.setConf(SparkConfig.SPARK_YARN_JARS, conf.get(SparkConfig.SPARK_YARN_JARS));
        }
        //设置算法入口参数
        launcher.addAppArgs(new String[]{conf.get(SparkConfig.APP_ARGS)});
        return launcher;
    }

准spark安装包,用于提交spark算法的客户端,因为提交算法的时候需要用到Spark的home目录下的bin/spark-submit脚本

重命名conf目录下的spark-env.sh脚本,否则会包如下的错误。原因是spark-env.sh里面配置了大数据平台上的路径,而在提交算法的客户端机器没有对应路径

debug模式提交或者非debug模式

 /**
     * Submit spark application to hadoop cluster and wait for completion.
     *
     * @return
     */
    public boolean waitForCompletion() {
        boolean success = false;
        try {
            SparkLauncher launcher = this.createSparkLauncher();
            if (debug) {
                Process process = launcher.launch();
                // Get Spark driver log
                new Thread(new ISRRunnable(process.getErrorStream())).start();
                new Thread(new ISRRunnable(process.getInputStream())).start();
                int exitCode = process.waitFor();
                System.out.println(exitCode);
                success = exitCode == 0 ? true : false;
            } else {
                appMonitor = launcher.setVerbose(true).startApplication();
                success = applicationMonitor();
            }
        } catch (Exception e) {
            logger.error(e);
        }
        return success;
    }

非debug模式提交时,控制台获取处理结果信息

    ///
    // private functions
    ///
    private boolean applicationMonitor() {
        appMonitor.addListener(new SparkAppHandle.Listener() {
            @Override
            public void stateChanged(SparkAppHandle handle) {
                logger.info("****************************");
                logger.info("State Changed [state={0}]", handle.getState());
                logger.info("AppId={0}", handle.getAppId());
            }

            @Override
            public void infoChanged(SparkAppHandle handle) {
            }
        });
        while (!isCompleted(appMonitor.getState())) {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        boolean success = appMonitor.getState() == SparkAppHandle.State.FINISHED;
        return success;
    }

    private boolean isCompleted(SparkAppHandle.State state) {
        switch (state) {
            case FINISHED:
                return true;
            case FAILED:
                return true;
            case KILLED:
                return true;
            case LOST:
                return true;
        }
        return false;
    }

可以从处理结果中获取到app ID,用于杀掉yarn任务时使用

4.任务详情

//访问URL:
http://<rm http address:port>/ws/v1/cluster/apps/{appID}

//例子
http://localhost:8088/ws/v1/cluster/apps/application15617064805542301

访问详情地址,返回数据格式如下:

 

"id": "application15617064805542301",--任务ID

"user": "haizhi",--提交任务的用户名称

"name": "TestSparkLauncher",--应用名称

"queue": "root.users.haizhi",--提交队列

"state": "FINISHED",--任务状态

"finalStatus": "SUCCEEDED",--最终状态

"progress": 100,--任务进度

"trackingUI": "History",

"trackingUrl": "http://hadoop01.xx.xxx.com:18088/proxy/application15617064805542301/A",

"diagnostics":"",--任务出错时的主要错误信息

"clusterId": 1561706480554,

"applicationType": "SPARK",--任务类型

"startedTime":  1562808570464,--任务开始时间,单位毫秒

"finishedTime": 1562808621348,--任务结束时间,单位毫秒

"elapsedTime": 50884,--任务耗时,毫秒

"amContainerLogs": "http://hadoop01.xx.xxx.com:8042/node/containerlogs/container15617064805542301_01_000001/haizhi",--任务详细日志

"amHostHttpAddress": "hadoop01.xx.xxx.com:8042",

"memorySeconds": 198648,--任务分配到的内存数,单位MB

"vcoreSeconds": 145,--任务分配到的CPU核数

"logAggregationStatus": "SUCCEEDED"

 

5.rest API杀掉任务请求格式:

  • 请求URL:http://<rm http address:port>/ws/v1/cluster/apps/{appid}/state

  • 请求方式:put

  • 请求参数: { "state": "KILLED" }

例:

请求URL:http://192.168.1.3:18088/ws/v1/cluster/apps/application15617064805542302/state
请求方式:put
请求参数: { "state": "KILLED" }

 

 

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yangyijun1990/article/details/106600114

智能推荐

leetcode 172. 阶乘后的零-程序员宅基地

文章浏览阅读63次。题目给定一个整数 n,返回 n! 结果尾数中零的数量。解题思路每个0都是由2 * 5得来的,相当于要求n!分解成质因子后2 * 5的数目,由于n中2的数目肯定是要大于5的数目,所以我们只需要求出n!中5的数目。C++代码class Solution {public: int trailingZeroes(int n) { ...

Day15-【Java SE进阶】IO流(一):File、IO流概述、File文件对象的创建、字节输入输出流FileInputStream FileoutputStream、释放资源。_outputstream释放-程序员宅基地

文章浏览阅读992次,点赞27次,收藏15次。UTF-8是Unicode字符集的一种编码方案,采取可变长编码方案,共分四个长度区:1个字节,2个字节,3个字节,4个字节。文件字节输入流:每次读取多个字节到字节数组中去,返回读取的字节数量,读取完毕会返回-1。注意1:字符编码时使用的字符集,和解码时使用的字符集必须一致,否则会出现乱码。定义一个与文件一样大的字节数组,一次性读取完文件的全部字节。UTF-8字符集:汉字占3个字节,英文、数字占1个字节。GBK字符集:汉字占2个字节,英文、数字占1个字节。GBK规定:汉字的第一个字节的第一位必须是1。_outputstream释放

jeecgboot重新登录_jeecg 登录自动退出-程序员宅基地

文章浏览阅读1.8k次,点赞3次,收藏3次。解决jeecgboot每次登录进去都会弹出请重新登录问题,在utils文件下找到request.js文件注释这段代码即可_jeecg 登录自动退出

数据中心供配电系统负荷计算实例分析-程序员宅基地

文章浏览阅读3.4k次。我国目前普遍采用需要系数法和二项式系数法确定用电设备的负荷,其中需要系数法是国际上普遍采用的确定计算负荷的方法,最为简便;而二项式系数法在确定设备台数较少且各台设备容量差..._数据中心用电负荷统计变压器

HTML5期末大作业:网页制作代码 网站设计——人电影网站(5页) HTML+CSS+JavaScript 学生DW网页设计作业成品 dreamweaver作业静态HTML网页设计模板_网页设计成品百度网盘-程序员宅基地

文章浏览阅读7k次,点赞4次,收藏46次。HTML5期末大作业:网页制作代码 网站设计——人电影网站(5页) HTML+CSS+JavaScript 学生DW网页设计作业成品 dreamweaver作业静态HTML网页设计模板常见网页设计作业题材有 个人、 美食、 公司、 学校、 旅游、 电商、 宠物、 电器、 茶叶、 家居、 酒店、 舞蹈、 动漫、 明星、 服装、 体育、 化妆品、 物流、 环保、 书籍、 婚纱、 军事、 游戏、 节日、 戒烟、 电影、 摄影、 文化、 家乡、 鲜花、 礼品、 汽车、 其他 等网页设计题目, A+水平作业_网页设计成品百度网盘

【Jailhouse 文章】Look Mum, no VM Exits_jailhouse sr-iov-程序员宅基地

文章浏览阅读392次。jailhouse 文章翻译,Look Mum, no VM Exits!_jailhouse sr-iov

随便推点

chatgpt赋能python:Python怎么删除文件中的某一行_python 删除文件特定几行-程序员宅基地

文章浏览阅读751次。本文由chatgpt生成,文章没有在chatgpt生成的基础上进行任何的修改。以上只是chatgpt能力的冰山一角。作为通用的Aigc大模型,只是展现它原本的实力。对于颠覆工作方式的ChatGPT,应该选择拥抱而不是抗拒,未来属于“会用”AI的人。AI职场汇报智能办公文案写作效率提升教程 专注于AI+职场+办公方向。下图是课程的整体大纲下图是AI职场汇报智能办公文案写作效率提升教程中用到的ai工具。_python 删除文件特定几行

Java过滤特殊字符的正则表达式_java正则表达式过滤特殊字符-程序员宅基地

文章浏览阅读2.1k次。【代码】Java过滤特殊字符的正则表达式。_java正则表达式过滤特殊字符

CSS中设置背景的7个属性及简写background注意点_background设置背景图片-程序员宅基地

文章浏览阅读5.7k次,点赞4次,收藏17次。css中背景的设置至关重要,也是一个难点,因为属性众多,对应的属性值也比较多,这里详细的列举了背景相关的7个属性及对应的属性值,并附上演示代码,后期要用的话,可以随时查看,那我们坐稳开车了······1: background-color 设置背景颜色2:background-image来设置背景图片- 语法:background-image:url(相对路径);-可以同时为一个元素指定背景颜色和背景图片,这样背景颜色将会作为背景图片的底色,一般情况下设置背景..._background设置背景图片

Win10 安装系统跳过创建用户,直接启用 Administrator_windows10msoobe进程-程序员宅基地

文章浏览阅读2.6k次,点赞2次,收藏8次。Win10 安装系统跳过创建用户,直接启用 Administrator_windows10msoobe进程

PyCharm2021安装教程-程序员宅基地

文章浏览阅读10w+次,点赞653次,收藏3k次。Windows安装pycharm教程新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入下载安装PyCharm1、进入官网PyCharm的下载地址:http://www.jetbrains.com/pycharm/downl_pycharm2021

《跨境电商——速卖通搜索排名规则解析与SEO技术》一一1.1 初识速卖通的搜索引擎...-程序员宅基地

文章浏览阅读835次。本节书摘来自异步社区出版社《跨境电商——速卖通搜索排名规则解析与SEO技术》一书中的第1章,第1.1节,作者: 冯晓宁,更多章节内容可以访问云栖社区“异步社区”公众号查看。1.1 初识速卖通的搜索引擎1.1.1 初识速卖通搜索作为速卖通卖家都应该知道,速卖通经常被视为“国际版的淘宝”。那么请想一下,普通消费者在淘宝网上购买商品的时候,他的行为应该..._跨境电商 速卖通搜索排名规则解析与seo技术 pdf

推荐文章

热门文章

相关标签