Storm 简介
上一章介绍的 Hadoop 工具能够对海量数据进行批量处理,采用分布式的并行计算架构,只需使用其提供的 MapReduce API 编写脚本即可。但随着人们对数据实时性的要求越来越高,如实时日志分析、实时推荐系统等,Hadoop 就无能为力了。
这时,Storm 诞生了。它的设计初衷就是提供一套分布式的实时计算框架,实现低延迟、高并发的海量数据处理,被誉为“Realtime Hadoop”。它提供了简单易用的 API 接口用于编写实时处理脚本;能够和现有各类消息系统整合;提供了 HA、容错、事务、RPC 等高级特性。
Storm 的官网是:storm-project.net,它的 Wiki 上有非常详尽的说明文档。
Storm 与 Clojure
Storm 的主要贡献者 Nathan Marz 和徐明明都是活跃的 Clojure 开发者,因此在 Storm 框架中也提供了原生的 Clojure DSL。本文就将介绍如何使用这套 DSL 来编写 Storm 处理脚本。
Storm 集群的安装配置这里不会讲述,具体请参考这篇文档。下文的脚本都运行在“本地模式”之下,因此即使不搭建集群也可以运行和调试。
Storm 脚本的组件

Storm 脚本的英文名称叫做“Storm Topology”,直译过来是“拓扑结构”。这个脚本由两大类组建构成,Spout 和 Bolt,分别可以有任意多个。他们之间以“数据流”的方式连接起来,因此整体看来就像一张拓扑网络,因此得名 Topology。
Spout
数据源节点,是整个脚本的入口。Storm 会不断调用该节点的 nextTuple() 方法来获取数据,分发给下游 Bolt 节点。nextTuple() 方法中可以用各种方式从外部获取数据,如逐行读取一个文件、从消息队列(ZeroMQ、Kafka)中获取消息等。一个 Storm 脚本可以包含多个 Spout 节点,从而将多个数据流汇聚到一起进行处理。
Bolt
数据处理节点,它是脚本的核心逻辑。它含有一个 execute() 方法,当接收到消息时,Storm 会调用这个函数,并将消息传递给它。我们可以在 execute() 中对消息进行过滤(只接收符合条件的数据),或者进行聚合(统计某个条件的数据出现的次数)等。处理完毕后,这个节点可以选择将处理后的消息继续传递下去,或是持久化到数据库中。
Bolt 同样是可以有多个的,且能够前后组合。Bolt C 可以同时收取 Bolt A 和 Bolt B 的数据,并将处理结果继续传递给 Bolt D。
此外, 一个 Bolt 可以产生多个实例 ,如某个 Bolt 包含复杂耗时的计算,那在运行时可以调高其并发数量(实例的个数),从而达到并行处理的目的。
Tuple
Tuple 是消息传输的基本单元,一条消息即一个 Tuple。可以将其看做是一个 HashMap 对象,它能够包含任何可序列化的数据内容。对于简单的数据类型,如整型、字符串、Map 等,Storm 提供了内置的序列化支持。而用户自定义的数据类型,可以通过指定序列化/反序列化函数来处理。
Stream Grouping
想象一个 Spout 连接了两个 Bolt(或一个 Bolt 的两个实例),那数据应该如何分发呢?你可以选择轮询(ShuffleGrouping),或是广播(GlobalGrouping)、亦或是按照某一个字段进行哈希分组(FieldGrouping),这些都称作为 Stream Grouping。
示例:WordCount
下面我们就来实现一个实时版的 WordCount 脚本,它由以下几个组件构成:
- sentence-spout:从已知的一段文字中随机选取一句话发送出来;
- split-bolt:将这句话按空格分割成单词;
- count-bolt:统计每个单词出现的次数,每五秒钟打印一次,并清零。
依赖项和配置文件
首先使用 lein new 新建一个项目,并修改 project.clj 文件:
1 | (defproject cia-storm "0.1.0-SNAPSHOT" |
其中 :profiles 表示定义不同的用户配置文件。Leiningen 有类似于 Maven 的配置文件体系(profile),每个配置文件中可以定义 project.clj 所支持的各种属性,执行时会进行合并。lein 命令默认调用 :dev、:user 等配置文件,可以使用 lein with-profiles prod run 来指定配置文件。具体可以参考这份文档。
这里将 [storm "0.8.2"] 依赖项定义在了 :dev 配置下,如果直接定义在外层的 :dependencies 下,那在使用 lein uberjar 进行打包时,会将 storm.jar 包含在最终的 Jar 包中,提交到 Storm 集群运行时就会报冲突。而 lein uberjar 默认会跳过 :dev 配置,所以才这样定义。
:aot 表示 Ahead Of Time,即预编译。我们在 Clojure 实战(3):使用 Noir 框架开发博客(下) 中提过 :gen-class 这个标识表示为当前 .clj 文件生成一个 .class 文件,从而能够作为 main 函数使用,因此也需要在 project.clj 中添加 :main 标识,指向这个 .clj 文件的命名空间。如果想为其它的命名空间也生成对应的 .class 文件,就需要用到 :aot 了。它的另一个用处是加速 Clojure 程序的启动速度。
sentence-spout
1 | (ns cia-storm.wordcount |
defspout 是定义在 backtype.storm.clojure 命名空间下的宏,可以点此查看源码。以下是各个部分的说明:
sentence-spout是该组件的名称。["sentence"]表示该组件输出一个字段,名称为“sentence”。[conf context collector]用于接收 Storm 框架传入的参数,如配置对象、上下文对象、下游消息收集器等。spout表示开始定义数据源组件需要用到的各类方法。它实质上是生成一个实现了 ISpout 接口的对象,从而能够被 Storm 框架调用。nextTuple是 ISpout 接口必须实现的方法之一,Storm 会不断调用这个方法,获取数据。这里使用Thread#sleep函数来控制调用的频率。emit-spout!是一个函数,用于向下游发送消息。
ISpout 还有 open、ack、fail 等函数,分别表示初始化、消息处理成功的回调、消息处理失败的回调。这里我们暂不深入讨论。
split-bolt
1 | (defbolt split-bolt ["word"] {:prepare true} |
defbolt 用于定义一个 Bolt 组件。整段代码的结构和 defspout 是比较相似的。bolt 宏会实现为一个 IBolt 对象,execute 是该接口的方法之一,其它还有 prepare 和 cleanup。execute 方法接收一个参数 tuple,用于接收上游消息。
ack! 是 execute 中必须调用的一个方法。Storm 会对每一个组件发送出来的消息进行追踪,上游组件发出的消息需要得到下游组件的“确认”(ACKnowlege),否则会一直堆积在内存中。对于 Spout 而言,如果消息得到确认,会触发 ISpout#ack 函数,否则会触发 ISpout#fail 函数,这时 Spout 可以选择重发或报错。
代码中比较怪异的是 {:prepare true}。defspout 和 defbolt 有两种定义方式,即 prepare 和非 prepare。两者的区别在于:
- 参数不同,prepare 方式下接收的参数是
[conf context collector],非 prepare 方式下,defspout接收的是[collector],defbolt是[tuple collector]。 - prepare 方式下需要调用
spout和bolt宏来编写组件代码,而非 prepare 方式则不需要——defspout会默认生成nextTuple()函数,defbolt默认生成execute(tuple)。 - 只有 prepare 方式下才能指定
ISpout#open、IBolt#prepare等函数,非 prepare 不能。 defspout默认使用 prepare 方式,defbolt默认使用非 prepare 方式。
因此,split-bolt 可以按如下方式重写:
1 | (defbolt split-bolt ["word"] |
prepare 方式可以用于在组件中保存状态,具体请看下面的计数 Bolt。
count-bolt
1 | (defbolt count-bolt [] {:prepare true} |
原子(Atom)
atom 是我们遇到的第一个可变量(Mutable Variable),其它的有 Ref、Agent 等。Atom 是“原子”的意思,我们很容易想到原子性操作,即同一时刻只有一个线程能够修改 Atom 的值,因此它是处理并发的一种方式。这里我们使用 Atom 来保存每个单词出现的数量。以下是 Atom 的常用操作:
1 | user=> (def cnt (atom 0)) |
需要注意的是,(swap! atom f arg ...) 中的 f 函数可能会被执行多次,因此要确保它没有副作用(side-effect,即不会产生其它状态的变化)。
再来解释一下 (partial merge-with +)。merge-with 函数是对 map 类型的一种操作,表示将一个或多个 map 合并起来。和 merge 不同的是,merge-with 多接收一个 f 函数(merge-with [f & maps]),当键名重复时,会用 f 函数去合并它们的值,而不是直接替代。
partial 可以简单理解为给函数设定默认值,如:
1 | user=> (defn add [a b] (+ a b)) |
这样一来,(swap! counts (partial merge-with +) {word 1}) 就可理解为:将 counts 这个 Atom 中的值(一个 map 类型)和 {word 1} 这个 map 进行合并,如果单词已存在,则递增 1。
线程(Thread)
为了输出统计值,我们为 count-bolt 增加 prepare 方法:
1 | ... |
这段代码的功能是:在 Bolt 开始处理消息之前启动一个线程,每隔 5 秒钟将 (atom counts) 中的单词出现次数打印出来,并对其进行清零操作。
这里我们直接使用了 Java 的 Thread 类型。读者可能会觉得好奇,Thread 类型的构造函数只接收实现 Runnable 接口的对象,Clojure 的匿名函数直接支持吗?我们做一个简单测试:
1 | user=> (defn greet [name] (println "Hi" name)) |
logging 命名空间对应的依赖是 [org.clojure/tools.logging "0.2.6"],需要将其添加到 project.clj 中,它是对 log4j 组件的包装。这里之所以没有使用 println 输出到标准输出,是为了将该脚本上传到 Storm 集群中运行时也能查看到日志输出。
定义和执行 Topology
各个组件已经定义完毕,下面让我们用它们组成一个 Topology:
1 | (defn mk-topology [] |
topology 同样是 Clojure DSL 定义的宏,它接收两个 map 作为参数,一个用于定义使用到的 Spout,一个则是 Bolt。该 map 的键是组件的名称,该名称用于确定各组件之间的关系。
spout-spec 和 bolt-spec 则定义了组件在 Topology 中更具体的参数。如 “split” 使用的是 split-bolt 这个组件,它的上游是 “sentence”,使用 shuffleGrouping 来对消息进行分配,:p 3 表示会启动 3 个 split-bolt 实例。
“count” 使用 count-bolt 组件,上游是 “split”,但聚合方式采用了 fieldGrouping,因此列出了执行哈希运算时使用的消息字段(word)。为何要使用 fieldGrouping?因为我们会开启两个 count-bolt,如果采用 shuffleGrouping,那单词“a”第一次出现的消息会发送给一个 count-bolt,第二次出现会发送给另一个 count-bolt,这样统计结果就会错乱。如果指定了 :p 1,即只开启一个 count-bolt 实例,就不会有这样的问题。
本地模式和 Cluster 模式
1 | (ns cia-storm.wordcount |
我们为 WordCount 生成一个类,它的 main 函数在没有命令行参数时会以本地模式执行 Topology,若传递了参数(即指定了脚本在 Cluster 运行时的名称),则提交至 Cluster。
这里直接使用了 Storm 的 Java 类,对参数有疑惑的可以参考 Javadoc。TOPOLOGY-WORKERS 是在 backtype.storm.config 命名空间中定义的,我们在前面的代码中 :use 过了。Storm 这个项目是用 Java 和 Clojure 混写的,所以查阅代码时还需仔细一些。
运行结果
首先我们直接用 lein 以本地模式运行该 Topology:
1 | $ lein run -m cia-storm.wordcount |
Cluster 模式需要搭建本地集群,可以参考这篇文档。下文使用的 storm 命令则需要配置 ~/.storm/storm.yaml 文件,具体请参考这篇文章。
1 | $ lein do clean, compile, uberjar |
小结
这一章我们简单介绍了 Storm 的设计初衷,它是如何通过分布式并行运算解决实时数据分析问题的。Storm 目前已经十分稳定,且仍处于活跃的开发状态。它的一些高级特性如 DRPC、Trident 等,还请感兴趣的读者自行研究。
本文使用的 WordCount 示例代码:https://github.com/jizhang/blog-demo/tree/master/cia-storm