风暴兼容性Beta
Flink流与Apache Storm接口兼容,因此允许重用为Storm实现的代码。
您可以:
Topology
在Flink 执行整个Storm 。- 在Flink流处理节目中使用Storm
Spout
/Bolt
作为源/算子。
本文档介绍了如何在Flink中使用现有的Storm代码。
项目配置
支持Storm包含在flink-storm
Maven模块中。代码驻留在org.apache.flink.storm
包中。
pom.xml
如果要在Flink中执行Storm代码,请将以下依赖项添加到您的。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-storm_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
请注意:不要添加storm-core
为依赖项。它已包含在内flink-storm
。
请注意:flink-storm
不是提供的二进制Flink发行版的一部分。因此,您需要flink-storm
在提交给Flink的JobManager的程序jar(也称为uber-jar或fat-jar)中包含类(及其依赖项)。见字计数风暴中flink-storm-examples/pom.xml
的一个例子,如何正确地打包罐。
如果你想避免大尤伯杯罐子,你可以手动复制storm-core-0.9.4.jar
,json-simple-1.1.jar
并flink-storm-1.7-SNAPSHOT.jar
进入Flink的lib/
每个群集节点的文件夹(之前在启动群集)。对于这种情况,仅将您自己的Spout和Bolt类(及其内部依赖项)包含在程序jar中就足够了。
执行Storm拓扑
Flink提供与Storm兼容的API(org.apache.flink.storm.api
),它可以替代以下类:
StormSubmitter
取而代之FlinkSubmitter
NimbusClient
并Client
替换为FlinkClient
LocalCluster
取而代之FlinkLocalCluster
为了向Flink提交Storm拓扑,只需使用组装拓扑的Storm 客户端代码中的Flink替换来替换使用过的Storm类。实际的运行时代码,即Spouts和Bolts,可以不加修改地使用。如果拓扑在远程集群执行时,参数nimbus.host
和nimbus.thrift.port
被用作jobmanger.rpc.address
和jobmanger.rpc.port
分别。如果未指定参数,则取值flink-conf.yaml
。
TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
Config conf = new Config();
if(runLocal) { // submit to test cluster
// replaces: LocalCluster cluster = new LocalCluster();
FlinkLocalCluster cluster = new FlinkLocalCluster();
cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
// optional
// conf.put(Config.NIMBUS_HOST, "remoteHost");
// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
// replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
}
在Flink流程序中嵌入Storm算子
作为替代方案,Spouts和Bolts可以嵌入到常规流处理节目中。Storm兼容层为每个提供了一个打包类,即SpoutWrapper
和BoltWrapper
(org.apache.flink.storm.wrappers
)。
每默认情况下,打包转换风暴输出元组Flink的元组类型(即,Tuple0
以Tuple25
根据风暴元组的字段数)。对于单场输出元组,也可以转换为字段的数据类型(例如,String
代替Tuple1<String>
)。
由于Flink无法推断Storm 算子的输出字段类型,因此需要手动指定输出类型。为了获得正确的TypeInformation
对象,TypeExtractor
可以使用Flink 。
嵌入Spouts
要将Spout用作Flink源,请使用StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)
。Spout对象被传递给它的构造函数SpoutWrapper<OUT>
,作为第一个参数addSource(...)
。泛型类型声明OUT
指定源输出流的类型。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// stream has `raw` type (single field output streams only)
DataStream<String> rawInput = env.addSource(
new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
TypeExtractor.getForClass(String.class)); // output type
// process data stream
[...]
如果Spout发出有限数量的元组,SpoutWrapper
可以通过numberOfInvocations
在其构造函数中设置参数来配置为自动终止。这允许Flink程序在处理完所有数据后自动关闭。默认情况下,程序将一直运行,直到手动取消。
嵌入螺栓
要使用Bolt作为Flink 算子,请使用DataStream.transform(String, TypeInformation, OneInputStreamOperator)
。Bolt对象被传递给它的构造函数BoltWrapper<IN,OUT>
,作为最后一个参数transform(...)
。泛型类型声明IN
并分别OUT
指定 算子的输入和输出流的类型。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile(localFilePath);
DataStream<Tuple2<String, Integer>> counts = text.transform(
"tokenizer", // operator name
TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator
// do further processing
[...]
嵌入式螺栓的命名属性访问
螺栓可以通过名称访问输入元组字段(另外通过索引访问)。要在嵌入式螺栓中使用此函数,您需要具有a
对于POJO输入类型,Flink通过反射访问字段。对于这种情况,Flink期望相应的公共成员变量或公共getter方法。例如,如果Bolt通过名称sentence
(例如String s = input.getStringByField("sentence");
)访问字段,则输入POJO类必须具有成员变量public String sentence;
或方法public String getSentence() { ... };
(注意驼峰式命名)。
对于Tuple
输入类型,需要使用Storm的Fields
类指定输入模式。对于这种情况,构造函数BoltWrapper
需要另外一个参数:new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))
。输入类型是Tuple1<String>
和Fields("sentence")
指定input.getStringByField("sentence")
相当于input.getString(0)
。
有关示例,请参阅BoltTokenizerWordCountPojo和BoltTokenizerWordCountWithNames。
配置喷口和螺栓
在Storm中,Spouts和Bolts可以配置一个全局分布的Map
对象,该对象被赋予submitTopology(...)
方法LocalCluster
或StormSubmitter
。这Map
是由拓扑旁边的用户提供的,并作为参数转发给呼叫Spout.open(...)
和Bolt.prepare(...)
。如果在Flink中使用FlinkTopologyBuilder
等执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。
对于嵌入式使用,必须使用Flink的配置机制。可以在StreamExecutionEnvironment
via中设置全局配置.getConfig().setGlobalJobParameters(...)
。Flink的常规Configuration
课程可用于配置Spouts和Bolts。但是,Configuration
不像Storm那样支持任意Keys数据类型(只String
允许Keys)。因此,Flink还提供StormConfig
了可以像raw一样使用的类,Map
以提供与Storm的完全兼容性。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StormConfig config = new StormConfig();
// set config values
[...]
// set global Storm configuration
env.getConfig().setGlobalJobParameters(config);
// assemble program with embedded Spouts and/or Bolts
[...]
多输出流
Flink还可以处理Spout和Bolts的多个输出流的声明。如果在Flink中使用FlinkTopologyBuilder
等执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。
对于嵌入式使用,输出流将是数据类型SplitStreamType<T>
,必须使用DataStream.split(...)
和拆分SplitStream.select(...)
。Flink提供预定义输出选择StormStreamSelector<T>
为.split(...)
已经。此外,SplitStreamTuple<T>
可以使用除去打包类型SplitStreamMapper<T>
。
[...]
// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
DataStream<SplitStreamType<SomeType>> multiStream = ...
SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
// do further processing on s1 and s2
[...]
有关完整示例,请参阅SpoutSplitExample.java。
Flink Extensions
有限的喷口
在Flink中,流处理源可以是有限的,即发出有限数量的记录并在发出最后一条记录后停止。但是,Spouts通常会发出无限的流。两种方法之间的桥接是FiniteSpout
除了IRichSpout
包含reachedEnd()
方法之外的接口,其中用户可以指定停止条件。用户可以通过实现此接口而不是(或另外)来创建有限Spout IRichSpout
,并reachedEnd()
另外实现该方法。与SpoutWrapper
配置为发出有限数量的元组的FiniteSpout
接口相比,接口允许实现更复杂的终止标准。
尽管有限的Spout不需要将Spouts嵌入到Flink流程序中或向Flink提交整个Storm拓扑,但有些情况下它们可能会派上用场:
- 实现原生Spout的行为与有限Flink源相同,只需要很少的修改
- 用户想要只处理一段时间; 之后,Spout可以自动停止
- 将文件读入流中
- 用于测试目的
有限Spout的示例,仅发出10秒的记录:
public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
[...] // implement open(), nextTuple(), ...
private long starttime = System.currentTimeMillis();
public boolean reachedEnd() {
return System.currentTimeMillis() - starttime > 10000l;
}
}
Storm兼容性示例
您可以在Maven模块中找到更多示例flink-storm-examples
。有关不同版本的WordCount,请参阅README.md。要运行示例,您需要组装正确的jar文件。 flink-storm-examples-1.7-SNAPSHOT.jar
是no / not作业执行有效的jar文件(这仅仅是一个标准的Maven神器)。
有嵌入式喷口和螺栓,即例如罐WordCount-SpoutSource.jar
和WordCount-BoltTokenizer.jar
分别。比较pom.xml
看两个罐子是如何构建的。此外,整个Storm拓扑(WordCount-StormTopology.jar
)有一个例子。
您可以通过运行这些示例中的每一个bin/flink run <jarname>.jar
。每个jar的清单文件中都包含正确的入口点类。