测试
本页简要讨论如何在IDE或本地环境中测试Flink应用程序。
单元测试
通常,可以假设Flink在用户定义之外产生正确的结果Function。因此,建议Function尽可能使用单元测试来测试包含主业务逻辑的类。
例如,如果实现以下内容ReduceFunction:
public class SumReduce implements ReduceFunction<Long> {
    @Override
    public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
    }
}
class SumReduce extends ReduceFunction[Long] {
    override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
        value1 + value2
    }
}
通过传递合适的参数并验证输出,可以很容易地使用您喜欢的框架对其进行单元测试:
public class SumReduceTest {
    @Test
    public void testSum() throws Exception {
        // instantiate your function
        SumReduce sumReduce = new SumReduce();
        // call the methods that you have implemented
        assertEquals(42L, sumReduce.reduce(40L, 2L));
    }
}
class SumReduceTest extends FlatSpec with Matchers {
    "SumReduce" should "add values" in {
        // instantiate your function
        val sumReduce: SumReduce = new SumReduce()
        // call the methods that you have implemented
        sumReduce.reduce(40L, 2L) should be (42L)
    }
}
集成测试
为了端到端测试Flink流管道,您还可以编写针对本地Flink迷你集群执行的集成测试。
为此,添加测试依赖项flink-test-utils:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils_2.11</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>
例如,如果要测试以下内容MapFunction:
public class MultiplyByTwo implements MapFunction<Long, Long> {
    @Override
    public Long map(Long value) throws Exception {
        return value * 2;
    }
}
class MultiplyByTwo extends MapFunction[Long, Long] {
    override def map(value: Long): Long = {
        value * 2
    }
}
您可以编写以下集成测试:
public class ExampleIntegrationTest extends AbstractTestBase {
    @Test
    public void testMultiply() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // configure your test environment
        env.setParallelism(1);
        // values are collected in a static variable
        CollectSink.values.clear();
        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new MultiplyByTwo())
                .addSink(new CollectSink());
        // execute
        env.execute();
        // verify your results
        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
    }
    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {
        // must be static
        public static final List<Long> values = new ArrayList<>();
        @Override
        public synchronized void invoke(Long value) throws Exception {
            values.add(value);
        }
    }
}
class ExampleIntegrationTest extends AbstractTestBase {
    @Test
    def testMultiply(): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // configure your test environment
        env.setParallelism(1)
        // values are collected in a static variable
        CollectSink.values.clear()
        // create a stream of custom elements and apply transformations
        env
            .fromElements(1L, 21L, 22L)
            .map(new MultiplyByTwo())
            .addSink(new CollectSink())
        // execute
        env.execute()
        // verify your results
        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
    }
}    
// create a testing sink class CollectSink extends SinkFunction[Long] {
    override def invoke(value: java.lang.Long): Unit = {
        synchronized {
            values.add(value)
        }
    }
}
object CollectSink {
    // must be static
    val values: List[Long] = new ArrayList()
}
CollectSink此处使用静态变量in ,因为Flink在将所有 算子分布到集群之前将其序列化。通过静态变量与本地Flink迷你集群实例化的算子进行通信是解决此问题的一种方法。或者,您可以使用测试接收器将数据写入临时目录中的文件。您还可以实现自己的自定义源以发出水印。
测试检查点和状态处理
测试状态处理的一种方法是在集成测试中启用检查点。
您可以通过StreamExecutionEnvironment在测试中配置来完成此 算子操作:
env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
env.enableCheckpointing(500)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
例如,向Flink应用程序添加一个身份映射器 算子,该 算子将每次抛出一次异常1000ms。但是,由于动作之间存在时间依赖关系,因此编写此类测试可能会非常棘手。
另一种方法是写使用Flink内部测试效用一个单元测试AbstractStreamOperatorTestHarness从flink-streaming-java模块。
对于如何做到这一点,请看看在一个例子org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest中也flink-streaming-java模块。
请注意,AbstractStreamOperatorTestHarness目前它不是公共API的一部分,可能会有所变化。