storm实战

What

storm是个实时的,分布式,高容错的计算系统

  • storm进程常驻内存
  • strom数据不经过磁盘,在内存中处理 <!-- more -->

架构

  • Nimbus
    • 资源调度
    • 任务分配
    • 接收jar包
  • Supervisor
    • 接收nimbus分配的任务
    • 启动、停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)
  • Worker
    • 运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)
    • worker任务类型,即spout任务、bolt任务两种
    • 启动executor (executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务)

编程模型

  • DAG
  • Spout数据源
  • Bolt数据流处理组件

数据分发策略 storm grouping

  • Shuffle grouping
    • 随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
    • 轮询,平均分配
  • Fields Grouping
    • 按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一 个task, 而不同的"user-id"则可能会被分配到不同的task。
  • None Grouping 不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果
  • All Grouping 广播发送,对于每一个tuple,所有的bolts都会收到
  • Global Grouping 全局分组,把tuple分配给task id最低的task 。
  • Direct Grouping 指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的 哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必 须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
  • Local or shuffle grouping 本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发 送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
  • customGouping 自定义,相当于mapreduce那里自己去实现一个partition一样。

计算模型

  • Topology– DAG有向无环图的实现
    • 对于Storm实时计算逻辑的封装
    • 即,由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构
    • 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止 (区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)
  • Tuple – 元组 Stream中最小数据组成单元
  • Stream – 数据流
    • 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做 Stream
    • Stream声明时需给其指定一个Id(默认为Default)
      • 实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId
  • Spout – 数据源
    • 拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中
    • 一个Spout可以发送多个数据流(Stream)
      • 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的 emit方法指定数据流Id(streamId)参数将数据发送出去
    • Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit 方法将数据生成元组(Tuple)发送给之后的Bolt计算
  • Bolt – 数据流处理组件
    • 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往 需要多个Bolt分多个步骤完成
    • 一个Bolt可以发送多个数据流(Stream)
      • 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的 emit方法指定数据流Id(streamId)参数将数据发送出去
    • Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑

任务提交流程

storm 并发机制

  • worker 进程
    • 一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)
    • 这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中 多台服务器上的进程所组成
    • 设置Worker进程数
      • Config.setNumWorkers(int workers)
  • executor 线程
    • Executor是由Worker进程中生成的一个线程
    • 每个Worker进程中会运行拓扑当中的一个或多个Executor线程
    • 一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些 Task任务都是对应着同一个组件(Spout、Bolt)
    • 设置Executor线程数
      • TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
      • TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
      • 其中, parallelism_hint即为executor线程数
  • task
    • 实际执行数据处理的最小单元
    • 每个task即为一个Spout或者一个Bolt
    • 设置Task数量
      • ComponentConfigurationDeclarer.setNumTasks(Number val)
  • Rebalance – 再平衡
    • 即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量
    • 通过Storm UI
    • 通过Storm CLI
      • storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整 Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务

Storm 通信机制

  • Worker进程间的数据通信
    • Netty
  • Worker内部的数据通信
    • Disruptor

storm 容错机制

  • 集群节点宕机
    • Nimbus服务器
    • 非Nimbus服务器
  • 进程挂掉
    • Worker
      • 挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该 Worker重新分配到其他服务器上
    • Supervisor
      • 无状态(所有的状态信息都存放在Zookeeper中来管理)
      • 快速失败(每当遇到任何异常情况,都会自动毁灭)
    • Nimbus
      • 无状态(所有的状态信息都存放在Zookeeper中来管理)
      • 快速失败(每当遇到任何异常情况,都会自动毁灭)
  • Acker -- 消息完整性的实现机制
    • Storm的拓扑当中特殊的一些任务
    • 负责跟踪每个Spout发出的Tuple的DAG(有向无环图)

Why

Storm:纯流式处理

  • 专门为流式处理设计
  • 数据传输模式更为简单,很多地方也更为高效
  • 并不是不能做批处理,它也可以来做微批处理,来提高吞吐

Where

http://storm.apache.org

How

完全分布式部署

tar zxvf apache-storm-0.10.0.tar.gz

  • 创建日志目录 mkdir logs
  • 在node1 上执行 bin/storm nimbus > logs/nimbus.out 2>&1 & bin/storm ui > ./logs/ui.out 2>&1 &
  • 在node2上执行 bin/storm supervisor > logs/supervisor.out 2>&1 &
  • 在node3 上执行 bin/storm supervisor > logs/supervisor.out 2>&1 &

命令:

bin/strom supervisor > logs/supervisor.out 2 >&1 & bin/strom --help bin/strom rebalance wc -w 5 -n 3 -e splitbolt=6

Juforg wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!