风暴兼容性Beta
Flink流与Apache Storm接口兼容,因此允许重用为Storm实现的代码。
您可以:
Topology在Flink 执行整个Storm 。- 在Flink流处理节目中使用Storm 
Spout/Bolt作为源/算子。 
本文档介绍了如何在Flink中使用现有的Storm代码。
项目配置
支持Storm包含在flink-stormMaven模块中。代码驻留在org.apache.flink.storm包中。
pom.xml如果要在Flink中执行Storm代码,请将以下依赖项添加到您的。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-storm_2.11</artifactId>
    <version>1.7-SNAPSHOT</version>
</dependency>
请注意:不要添加storm-core为依赖项。它已包含在内flink-storm。
请注意:flink-storm不是提供的二进制Flink发行版的一部分。因此,您需要flink-storm在提交给Flink的JobManager的程序jar(也称为uber-jar或fat-jar)中包含类(及其依赖项)。见字计数风暴中flink-storm-examples/pom.xml的一个例子,如何正确地打包罐。
如果你想避免大尤伯杯罐子,你可以手动复制storm-core-0.9.4.jar,json-simple-1.1.jar并flink-storm-1.7-SNAPSHOT.jar进入Flink的lib/每个群集节点的文件夹(之前在启动群集)。对于这种情况,仅将您自己的Spout和Bolt类(及其内部依赖项)包含在程序jar中就足够了。
执行Storm拓扑
Flink提供与Storm兼容的API(org.apache.flink.storm.api),它可以替代以下类:
StormSubmitter取而代之FlinkSubmitterNimbusClient并Client替换为FlinkClientLocalCluster取而代之FlinkLocalCluster
为了向Flink提交Storm拓扑,只需使用组装拓扑的Storm 客户端代码中的Flink替换来替换使用过的Storm类。实际的运行时代码,即Spouts和Bolts,可以不加修改地使用。如果拓扑在远程集群执行时,参数nimbus.host和nimbus.thrift.port被用作jobmanger.rpc.address和jobmanger.rpc.port分别。如果未指定参数,则取值flink-conf.yaml。
TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
Config conf = new Config();
if(runLocal) { // submit to test cluster
    // replaces: LocalCluster cluster = new LocalCluster();
    FlinkLocalCluster cluster = new FlinkLocalCluster();
    cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
    // optional
    // conf.put(Config.NIMBUS_HOST, "remoteHost");
    // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
    // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
    FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
}
在Flink流程序中嵌入Storm算子
作为替代方案,Spouts和Bolts可以嵌入到常规流处理节目中。Storm兼容层为每个提供了一个打包类,即SpoutWrapper和BoltWrapper(org.apache.flink.storm.wrappers)。
每默认情况下,打包转换风暴输出元组Flink的元组类型(即,Tuple0以Tuple25根据风暴元组的字段数)。对于单场输出元组,也可以转换为字段的数据类型(例如,String代替Tuple1<String>)。
由于Flink无法推断Storm 算子的输出字段类型,因此需要手动指定输出类型。为了获得正确的TypeInformation对象,TypeExtractor可以使用Flink 。
嵌入Spouts
要将Spout用作Flink源,请使用StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)。Spout对象被传递给它的构造函数SpoutWrapper<OUT>,作为第一个参数addSource(...)。泛型类型声明OUT指定源输出流的类型。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// stream has `raw` type (single field output streams only)
DataStream<String> rawInput = env.addSource(
    new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
    TypeExtractor.getForClass(String.class)); // output type
// process data stream
[...]
如果Spout发出有限数量的元组,SpoutWrapper可以通过numberOfInvocations在其构造函数中设置参数来配置为自动终止。这允许Flink程序在处理完所有数据后自动关闭。默认情况下,程序将一直运行,直到手动取消。
嵌入螺栓
要使用Bolt作为Flink 算子,请使用DataStream.transform(String, TypeInformation, OneInputStreamOperator)。Bolt对象被传递给它的构造函数BoltWrapper<IN,OUT>,作为最后一个参数transform(...)。泛型类型声明IN并分别OUT指定 算子的输入和输出流的类型。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile(localFilePath);
DataStream<Tuple2<String, Integer>> counts = text.transform(
    "tokenizer", // operator name
    TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
    new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator
// do further processing
[...]
嵌入式螺栓的命名属性访问
螺栓可以通过名称访问输入元组字段(另外通过索引访问)。要在嵌入式螺栓中使用此函数,您需要具有a
对于POJO输入类型,Flink通过反射访问字段。对于这种情况,Flink期望相应的公共成员变量或公共getter方法。例如,如果Bolt通过名称sentence(例如String s = input.getStringByField("sentence");)访问字段,则输入POJO类必须具有成员变量public String sentence;或方法public String getSentence() { ... };(注意驼峰式命名)。
对于Tuple输入类型,需要使用Storm的Fields类指定输入模式。对于这种情况,构造函数BoltWrapper需要另外一个参数:new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))。输入类型是Tuple1<String>和Fields("sentence")指定input.getStringByField("sentence")相当于input.getString(0)。
有关示例,请参阅BoltTokenizerWordCountPojo和BoltTokenizerWordCountWithNames。
配置喷口和螺栓
在Storm中,Spouts和Bolts可以配置一个全局分布的Map对象,该对象被赋予submitTopology(...)方法LocalCluster或StormSubmitter。这Map是由拓扑旁边的用户提供的,并作为参数转发给呼叫Spout.open(...)和Bolt.prepare(...)。如果在Flink中使用FlinkTopologyBuilder等执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。
对于嵌入式使用,必须使用Flink的配置机制。可以在StreamExecutionEnvironmentvia中设置全局配置.getConfig().setGlobalJobParameters(...)。Flink的常规Configuration课程可用于配置Spouts和Bolts。但是,Configuration不像Storm那样支持任意Keys数据类型(只String允许Keys)。因此,Flink还提供StormConfig了可以像raw一样使用的类,Map以提供与Storm的完全兼容性。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StormConfig config = new StormConfig();
// set config values
[...]
// set global Storm configuration
env.getConfig().setGlobalJobParameters(config);
// assemble program with embedded Spouts and/or Bolts
[...]
多输出流
Flink还可以处理Spout和Bolts的多个输出流的声明。如果在Flink中使用FlinkTopologyBuilder等执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。
对于嵌入式使用,输出流将是数据类型SplitStreamType<T>,必须使用DataStream.split(...)和拆分SplitStream.select(...)。Flink提供预定义输出选择StormStreamSelector<T>为.split(...)已经。此外,SplitStreamTuple<T>可以使用除去打包类型SplitStreamMapper<T>。
[...]
// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
DataStream<SplitStreamType<SomeType>> multiStream = ...
SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
// do further processing on s1 and s2
[...]
有关完整示例,请参阅SpoutSplitExample.java。
Flink Extensions
有限的喷口
在Flink中,流处理源可以是有限的,即发出有限数量的记录并在发出最后一条记录后停止。但是,Spouts通常会发出无限的流。两种方法之间的桥接是FiniteSpout除了IRichSpout包含reachedEnd()方法之外的接口,其中用户可以指定停止条件。用户可以通过实现此接口而不是(或另外)来创建有限Spout IRichSpout,并reachedEnd()另外实现该方法。与SpoutWrapper配置为发出有限数量的元组的FiniteSpout接口相比,接口允许实现更复杂的终止标准。
尽管有限的Spout不需要将Spouts嵌入到Flink流程序中或向Flink提交整个Storm拓扑,但有些情况下它们可能会派上用场:
- 实现原生Spout的行为与有限Flink源相同,只需要很少的修改
 - 用户想要只处理一段时间; 之后,Spout可以自动停止
 - 将文件读入流中
 - 用于测试目的
 
有限Spout的示例,仅发出10秒的记录:
public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
    [...] // implement open(), nextTuple(), ...
    private long starttime = System.currentTimeMillis();
    public boolean reachedEnd() {
        return System.currentTimeMillis() - starttime > 10000l;
    }
}
Storm兼容性示例
您可以在Maven模块中找到更多示例flink-storm-examples。有关不同版本的WordCount,请参阅README.md。要运行示例,您需要组装正确的jar文件。 flink-storm-examples-1.7-SNAPSHOT.jar是no / not作业执行有效的jar文件(这仅仅是一个标准的Maven神器)。
有嵌入式喷口和螺栓,即例如罐WordCount-SpoutSource.jar和WordCount-BoltTokenizer.jar分别。比较pom.xml看两个罐子是如何构建的。此外,整个Storm拓扑(WordCount-StormTopology.jar)有一个例子。
您可以通过运行这些示例中的每一个bin/flink run <jarname>.jar。每个jar的清单文件中都包含正确的入口点类。