Spark实战

What ?

Apache Spark 一站式处理方案

  • sql
  • 离线计算
  • 实时流
  • 机器学习
  • 图计算

<!-- more -->

  • One stack rule them all

    • stream processiong 流式计算
    • ad hoc queries 热查询
    • batch processing 批处理
  • spark的shuffle 当超过一定量之后也会 落地到磁盘 产生 disk io

  • 4种运行模式

    • local eclipse中用
    • standalone 自己的集群
      • client
      • cluster
    • mesos
    • yarn
      • client
      • cluster
  • RDD是基础

    • resilient distributed dataset

    • 弹性分布式数据集

    • 五大特性

      • a list of partitions RDD是由一系列的partition组成的。
      • a function for computing each split函数是作用在每一个partition(split)上的。
      • list of dependencies on othre RDDsRDD之间有一系列的依赖关系。
      • optionally, a partitioner for key-value RDDs分区器是作用在K,V格式的RDD上。
      • optionally, a list of preferred locations to compute each split onRDD提供一系列最佳的计算位置。

  • 主从

    • mapreduce: RM NM
    • HDFS : NN DM
    • JOB TASK
    • spark :master worker

容错

  • 重算 根据 lineage
  • persist() 持久化
  • checkpoint 存磁盘

依赖

  • 窄依赖 narrow
    • 当个rdd转变为一个rdd 不实时处理 lazy 处理 等
    • map
    • union
  • 宽依赖 wide
    • groupby
    • join

术语

  • application --> jobs --> stages--> tasks

  • cluster-->WNs --> exeactors -->threads

  • cluster manager 在集群上获取资源的外部服务 (standalone,Mesos,Yarn)

master
	standalone管理资源的主节点
cluster manager
	在集群上获取外部资源的服务
worker node
	standalone  运行于从节点,管理从节点资源
application
	基于spark的程序,包含了driver和executor
worker
	executor
		默认分配内存1G
dirver
	Driver 负责应用程序资源的申请
	任务的分发。
	连接worker
executor
	worker节点为application启动的一个进程
	负责执行任务,
	每个application都有独立的executor
task
	被送到executor上的工作单元
job
	task 的并行计算,相当于action
stage
	job被拆分后的单元
pipline

算子

  • transfomation

    • repartition
    • coalsce
  • action

    • reduce

shuffle

  1. HashShuffle
  • 普通机制
  1. 每一个maptask将不同结果写到不同的buffer中,每个 buffer 的大小为 32K。buffer 起到数据缓存的作用。
  2. 每个buffer文件最后对应一个磁盘小文件。
  3. reduce task 来拉取对应的磁盘小文件。

产生的磁盘小文件的个数: M(map task 的个数)*R(reduce task 的个数)

  • 合并机制
  1. SortShuffle 磁盘文件个数 : 2 * M
  • 普通机制
  • bypass机制
    • bypass运行机制的触发条件如下: shuffle reduce task 的 数 量 小 于 spark.shuffle.sort.bypassMergeThreshold 的参数值。这个 值默认是 200。
    • 产生的磁盘小文件为:2*M(map task 的个数)
  1. 文件寻址

    1. 当maptask执行完成后,会将task的执行情况和磁盘小文件的 地址封装到 MpStatus 对象中,通过 MapOutputTrackerWorker 对象向 Driver 中的 MapOutputTrackerMaster 汇报。
    2. 在所有的maptask执行完毕后,Driver中就掌握了所有的磁盘 小文件的地址。
    3. 在reducetask执行之前,会通过Excutor中 MapOutPutTrackerWorker 向 Driver 端的 MapOutputTrackerMaster 获取磁盘小文件的地址。
    4. 获取到磁盘小文件的地址后,会通过BlockManager中的 ConnectionManager 连接数据所在节点上的 ConnectionManager,然后通过 BlockTransferService 进行数 据的传输。
    5. BlockTransferService 默认启动 5 个 task 去节点拉取数据。默 认情况下,5 个 task 拉取数据量不能超过 48M。
  2. 内存管理

  • 静态内存管理 1.6之前

application->job->stage->task(pipline)

  • 统一内存管理 1.6之后

分布式计算容易出现的问题

  • 数据倾斜
    • groupbykey 大部分数据集中在某个key上

Why ?

  • hadoop的共享数据慢 spark的共享数据快
  • 快的原因:基于内存还有DAG
  • 支持语言api
    • Scala
    • python
    • java

Where?

http://spark.apache.org

How?

  • Spark代码流程

    • 创建SparkConf对象
      • 可以设置 Application name。
      • 可以设置运行模式及资源需求。
    • 创建SparkContext对象
    • 基于Spark的上下文创建一个RDD,对RDD进行处理。
      • transformation转换 RDD > RDD
      • 缓存RDD,避免多次读取
    • 应用程序中要有Action类算子来触发Transformation类算子执行。
      • action执行 RDD > 结果
    • 关闭Spark上下文对象SparkContext。
  • 选择缓存策略

    1. Cache() MEMORY_ONLY (默认)
    2. MEMORY_AND_DISK
    3. OFF_HEAP
    4. DISK_ONLY
    5. NONE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main (String[] args){
SparkCon conf = new SparkConf().setAppName("WordCound").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRdd(String) lines = sc.textFile("CHANGES.txt");
//
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){
@Override
public Iterable<String> call(String line) throws Exception{
return Arrays.asList(line.split(" "));
}
});

//
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String,String,Integer>(){
@Overridepublic Tuple2<String,Integer> call(String word) throws Exception{
return new Tuple2<String,Integer>(word,1);
}
});

JavaPairRDD<Stirng, Integer> results = pairs.reduceByKey(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call (Integer v1, Integer v2)throws Exception{
return v1 + v2;
}
})

results.sortByKey().results.foreach(new VoidFunction<Tuple2<String,Integer>>(){
@Override
public void all(Tuple2<Sting,Integer tuple) throws Exception{
System.out.println(tuple._1 + "" + tuple._2);
}
})
sc.close;
}

任务调度

  1. spark 提交应程序的机器
  2. shell中 spark submit 脚本提交程序
  3. 启动一个driver 进程
  4. spark submit driver actor
  5. spark context 对象
  6. 构造DAGschedule taskschedule
  7. DAGscheduler
  8. taskschedule task set 打散
  9. taskschedule 向master注册application
  10. master接到请求,启动多个exccutor
  11. worker接受分配
  12. DAGScheduler 切分job
  13. stage taskset
  14. taskschedule 把taskset中task提交到executer上执行

local 单机模式

本地直接运行调试

安装完全分布式 standalone

  1. 下载 http://spark.apache.org/downloads.html tar zxvf spark-1.6.3-bin-hadoop2.6.tgz

  2. 修改配置添加从节点

    1
    2
    3
    cd spark-1.6.3-bin-hadoop2.6
    cp slaves.template slaves
    vi slaves

    去掉localhost,添加从节点 名称 vi spark-env.sh 下面这3个值更加自己情况修改

    1
    2
    3
    SPARK_MASTER_IP=sj-node1
    SPARK_MASTER_PORT=7077
    SPARK_WORKER_MEMORY=1g

  3. 发送到从节点

    1
    2
    3
    scp -r spark-1.6.3-bin-hadoop2.6 root@sj-node2:`pwd`
    scp -r spark-1.6.3-bin-hadoop2.6 root@sj-node3:`pwd`
    `

  4. 启动集群 /opt/soft/spark-1.6.0-bin-hadoop2.4/sbin/start-all.sh

  5. 打开Spark WEBUI 界面查看检查 默认8080端口可能与别的应用冲突,可以修改vim /opt/soft/spark-1.6.0-bin-hadoop2.4/sbin/start-master.sh 也可以修改环境变量

  6. 运行example 求 PI

standalone任务提交方式

  1. client
    1. client模式提交任务后,会在客户端启动Driver进程。
    2. Driver会向Master申请启动Application启动的资源。
    3. 资源申请成功,Driver端将task发送到worker端执行。
    4. worker将task执行结果返回到Driver端。

1
2
3
./spark-submit
--master spark://node1:7077 --deploy-mode client
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

  1. cluster

    1. cluster模式提交应用程序后,会向Master请求启动Driver.
    2. Master接受请求,随机在集群一台节点启动Driver进程。
    3. Driver启动后为当前的应用程序申请资源。
    4. Driver端发送task到worker节点上执行。
    5. worker将执行情况和执行结果返回给Driver端

    1
    2
    3
    ./spark-submit
    --master spark://node1:7077 --deploy-mode cluster
    --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

安装yarn运行模式

vi /opt/soft/spark-1.6.0-bin-hadoop2.4/conf/spark-env.sh 添加hadoop的路径即可

yarn任务提交方式

  1. client

    1. 客户端提交一个Application,在客户端启动一个Driver进程。
    2. 应用程序启动后会向RS(ResourceManager)发送请求,启动 AM(ApplicationMaster)的资源。
    3. RS收到请求,随机选择一台NM(NodeManager)启动AM。这 里的 NM 相当于 Standalone 中的 Worker 节点。
    4. AM启动后,会向RS请求一批container资源,用于启动Executor.
    5. RS会找到一批NM返回给AM,用于启动Executor。
    6. AM会向NM发送命令启动Executor。
    7. Executor启动后,会反向注册给Driver,Driver发送task到 Executor,执行情况和结果返回给 Driver 端。
      1
      2
      3
      4
      ./spark-submit
      --master yarn–client
      --class
      org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  2. cluster

    1. 客户机提交Application应用程序,发送请求到 RS(ResourceManager),请求启动 AM(ApplicationMaster)。
    2. RS收到请求后随机在一台NM(NodeManager)上启动AM(相 当于 Driver 端)。
    3. AM启动,AM发送请求到RS,请求一批container用于启动 Excutor。
    4. RS返回一批NM节点给AM。
    5. AM连接到NM,发送请求到NM启动Excutor。
    6. Excutor反向注册到AM所在的节点的Driver。Driver发送task 到 Excutor。

    1
    2
    3
    4
    5
    ./spark-submit
    --master yarn-cluster
    --class
    org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
    `

广播变量

1
2
3
4
5
6
val conf = new SparkConf() conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println} sc.stop()

累加器

1
2
3
4
val conf = new SparkConf() conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0) sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}} println(accumulator.value)
sc.stop()

SparkUI

  1. SparkUI界面介绍
  2. 配置historyServer
    1. 临时配置,对本次提交的应用程序起作用 shell ./spark-shell --master spark://node1:7077 --name myapp1 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://node1:9000/spark/test 停止程序,在 Web Ui 中 Completed Applications 对应的 ApplicationID 中能查看 history
    2. spark-default.conf配置文件中配置HistoryServer,对所有提交的 Application 都起作用
      • vi ../spark-1.6.0/conf/ spark-defaults.conf

        1
        2
        3
        4
        5
        6
        7
        8
        //开启记录事件日志的功能
        spark.eventLog.enabled true
        //设置事件日志存储的目录
        spark.eventLog.dir hdfs://node1:9000/spark/test
        //设置 HistoryServer 加载事件日志的位置
        spark.history.fs.logDirectory hdfs://node1:9000/spark/test
        //日志优化选项,压缩日志
        spark.eventLog.compress true

      • 启动 HistoryServer: ./start-history-server.sh 访问 HistoryServer:node4:18080,之后所有提交的应用程序运行状况都会被记录。

Master HA 高可用

zookeeper 有选举和存储功能,可以存储 Master 的元素据信息,使用 zookeeper 搭建的 Master 高可用,当 Master 挂掉时,备用的 Master 会自动切换,推荐使用这种方式搭建 Master 的 HA。

  1. 配置spark-env.sh,并发送到所有节点 vim /opt/soft/spark-1.6.0-bin-hadoop2.4/conf/spark-env.sh

    1
    export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181 -Dspark.deploy.zookeeper.dir=/sparkmaster0821

  2. 找一台节点(非主 Master 节点)配置备用 Master,修改 spark-env.sh 配置节点上的 MasterIP SPARK_MASTER_IP=sj-node2

  3. 启动zookeeper集群

  4. 启动spark集群 sbin/start-all.sh

  5. 启动备用master sbin/start-daemon.sh

主备切换过程中不能提交 Application。主备切换过程中不影响已经在集群中运行的 Application。因为 Spark 是粗粒度资源调度。

优化

  1. 取top(N) 用手写排序,可以提高效率
  2. 资源调优
    • spark集群配置默认参数conf/spark-env.sh
    • 提交application时分配资源
    • 动态资源分配
  3. 并行度调优
    • hdfs的block数据就是并行度,调低block大小相当于增加并行度,也可以读的时候指定
    • sc.parallelize(xxx, numPartitions)
    • sc.makeRDD(xxx, numPartitions)
    • sc.parallelizePairs(xxx, numPartitions)
    • repartions/coalesce
    • redecByKey/groupByKey/join ---(xxx, numPartitions)
    • spark.default.parallelism net set
    • spark.sql.shuffle.partitions---200
    • 自定义分区器
    • 如果读取数据是在SparkStreaming中
      • Receiver: spark.streaming.blockInterval—200ms
      • Direct:读取的 topic 的分区数
  4. 代码调优
    1. 避免创建重复rdd
    2. 复用同一个RDD
    3. 对多次使用的RDD进行持久化
      • cache
      • persist
      • checkpoint
    4. 尽量避免使用shuffle类的算子
      • join 算子=广播变量+filter、广播变量+map、广播变量+flatMap
    5. map side 预聚合的shuffle操作
      • combine好处
        • 降低 shuffle write 写磁盘的数据量。
        • 降低 shuffle read 拉取数据量的大小
        • 降低reduce端聚合的次数。
      • 有 combiner 的 shuffle 类算子
        • reduceByKey
        • aggregateByKey
        • combineByKey
    6. 尽量使用高性能的算子
      • 使用 reduceByKey 替代 groupByKey   - 使用 mapPartition 替代 map
      • 使用 foreachPartition 替代 foreach   - filter 后使用 coalesce 减少分区数
      • 使用使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作
      • 使用 repartition 和 coalesce 算子操作分区
    7. 使用广播变量
    8. 使用Kryo优化序列化性能
    9. 优化数据结构
      • 尽量使用字符串替代对象,使用 原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型
    10. 使用高性能的库 fastutil
  5. 数据本地化
    • 级别
      • PROCESS_LOCAL
      • NODE_LOCAL
      • NO_PREF
      • RACK_LOCAL
      • ANY
    • Spark数据本地化调优
      • spark.locality.wait

参考

http://spark.apache.org/docs/1.6.0/configuration.html

Juforg wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!