用户定义的源和接收器
A TableSource
提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问。TableSource在TableEnvironment中注册后,可以通过 Table API或SQL查询访问它。
A TableSink
将表发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Parquet或ORC)。
A TableFactory
允许将与外部系统的连接声明与实际实现分开。表工厂从规范化的基于字符串的属性创建表源和接收器的已配置实例。可以使用SQL客户端Descriptor
或通过YAML配置文件以编程方式生成属性。
有关如何注册TableSource以及如何通过TableSink发出表的详细信息,请查看常见概念和API页面。有关如何使用工厂的示例,请参阅内置源,接收器和格式页面。
定义TableSource
A TableSource
是一个通用接口,它使 Table API和SQL查询可以访问存储在外部系统中的数据。它提供了表的模式以及使用表的模式映射到行的记录。根据TableSource
是在流式查询还是批量查询中使用,记录将生成为DataSet
或DataStream
。
如果TableSource
在流式查询中使用a,则必须实现该StreamTableSource
接口,如果在批处理查询中使用它,则必须实现该BatchTableSource
接口。A TableSource
还可以实现两个接口,并用于流式和批量查询。
StreamTableSource
并BatchTableSource
扩展TableSource
定义以下方法的基接口:
TableSource<T> {
public TableSchema getTableSchema();
public TypeInformation<T> getReturnType();
public String explainSource();
}
TableSource[T] {
def getTableSchema: TableSchema
def getReturnType: TypeInformation[T]
def explainSource: String
}
getTableSchema()
:返回表的架构,即表的字段的名称和类型。字段类型使用Flink定义TypeInformation
(请参阅 Table API类型和SQL类型)。getReturnType()
:返回DataStream
(StreamTableSource
)或DataSet
(BatchTableSource
)的物理类型以及由此生成的记录TableSource
。explainSource()
:返回描述TableSource
。的String 。此方法是可选的,仅用于显示目的。
所述TableSource
界面分离从物理类型的返回的逻辑表模式DataStream
或DataSet
。因此,表schema(getTableSchema()
)的所有字段都必须映射到具有相应类型的物理返回类型(getReturnType()
)的字段。默认情况下,此映射基于字段名称完成。例如,TableSource
定义具有两个字段的表模式的a [name: String, size: Integer]
要求TypeInformation
具有至少两个被调用的字段name
和size
类型String
和的字段Integer
。这可能是一个PojoTypeInfo
或RowTypeInfo
有两个名为领域name
,并size
与匹配的类型。
但是,某些类型(如Tuple或CaseClass类型)确实支持自定义字段名称。如果a TableSource
返回a DataStream
或DataSet
具有固定字段名称的类型,则它可以实现DefinedFieldMapping
接口以将字段名称从表模式映射到物理返回类型的字段名称。
定义BatchTableSource
的BatchTableSource
接口扩展了TableSource
接口,并定义一个额外的方法:
BatchTableSource<T> extends TableSource<T> {
public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
}
BatchTableSource[T] extends TableSource[T] {
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
getDataSet(execEnv)
:返回DataSet
带有表数据的a 。类型DataSet
必须与TableSource.getReturnType()
方法定义的返回类型相同。在DataSet
通过使用常规创建罐数据源的数据集的API。通常,aBatchTableSource
通过打包InputFormat
或批处理连接器来实现。
定义StreamTableSource
的StreamTableSource
接口扩展了TableSource
接口,并定义一个额外的方法:
StreamTableSource<T> extends TableSource<T> {
public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}
StreamTableSource[T] extends TableSource[T] {
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
getDataStream(execEnv)
:返回DataStream
带有表数据的a 。类型DataStream
必须与TableSource.getReturnType()
方法定义的返回类型相同。在DataStream
通过使用常规罐创建数据源数据流的API。通常,aStreamTableSource
通过打包SourceFunction
或流连接器来实现。
使用时间属性定义TableSource
流 Table API和SQL查询的基于时间的 算子操作(例如窗口化聚合或连接)需要明确指定的时间属性。
A TableSource
将时间属性定义Types.SQL_TIMESTAMP
为其表模式中的类型字段。与模式中的所有常规字段相比,时间属性不能与表源的返回类型中的物理字段匹配。相反,a TableSource
通过实现某个接口来定义时间属性。
定义处理时间属性
处理时间属性通常用于流式查询。处理时间属性返回访问它的算子的当前挂钟时间。A TableSource
通过实现DefinedProctimeAttribute
接口来定义处理时间属性。界面如下:
DefinedProctimeAttribute {
public String getProctimeAttribute();
}
DefinedProctimeAttribute {
def getProctimeAttribute: String
}
getProctimeAttribute()
:返回处理时间属性的名称。必须Types.SQL_TIMESTAMP
在表模式中定义指定的属性类型,并且可以在基于时间的 算子操作中使用该属性。一个DefinedProctimeAttribute
表源可以通过返回没有定义的处理时间属性null
。
注意两者StreamTableSource
并BatchTableSource
可以实现DefinedProctimeAttribute
并定义的处理时间属性。在BatchTableSource
处理时间字段的情况下,在表扫描期间使用当前时间戳初始化字段。
定义行时属性
Rowtime属性是类型的属性,TIMESTAMP
并在流和批处理查询中以统一的方式处理。
SQL_TIMESTAMP
可以通过指定将类型的表模式字段声明为rowtime属性
- 该字段的名称,
- a
TimestampExtractor
计算属性的实际值(通常来自一个或多个其他字段),和 - a
WatermarkStrategy
指定如何为rowtime属性生成水印。
A TableSource
通过实现DefinedRowtimeAttributes
接口来定义行时属性。界面如下:
DefinedRowtimeAttribute {
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
DefinedRowtimeAttributes {
def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
getRowtimeAttributeDescriptors()
:返回列表RowtimeAttributeDescriptor
。ARowtimeAttributeDescriptor
描述了具有以下属性的rowtime属性:attributeName
:表架构中rowtime属性的名称。必须使用类型定义该字段Types.SQL_TIMESTAMP
。timestampExtractor
:时间戳提取器从具有返回类型的记录中提取时间戳。例如,它可以将Long字段转换为时间戳或解析字符串编码的时间戳。Flink附带了一组TimestampExtractor
针对常见用例的内置实现。还可以提供自定义实现。watermarkStrategy
:水印策略定义如何为rowtime属性生成水印。Flink附带了一组WatermarkStrategy
针对常见用例的内置实现。还可以提供自定义实现。
注意虽然该getRowtimeAttributeDescriptors()
方法返回描述符列表,但目前仅支持单个rowtime属性。我们计划在将来删除此限制,并支持具有多个rowtime属性的表。
注意两者,StreamTableSource
并且BatchTableSource
,可以实现DefinedRowtimeAttributes
并定义rowtime属性。在任何一种情况下,都使用提取行时字段TimestampExtractor
。因此,TableSource
实现StreamTableSource
和BatchTableSource
定义行时属性的实现为流和批处理查询提供完全相同的数据。
提供时间戳提取器
Flink提供TimestampExtractor
了常见用例的实现。
TimestampExtractor
目前提供以下实现:
ExistingField(fieldName)
:提取一个rowtime属性的从现有的值LONG
,SQL_TIMESTAMP
或时间戳记格式化STRING
字段。这种字符串的一个例子是'2018-05-28 12:34:56.000'。StreamRecordTimestamp()
:从时间戳中提取rowtime属性的值DataStream
StreamRecord
。请注意,这TimestampExtractor
不适用于批处理表源。
TimestampExtractor
可以通过实现相应的接口来定义自定义。
提供水印策略
Flink提供WatermarkStrategy
了常见用例的实现。
WatermarkStrategy
目前提供以下实现:
AscendingTimestamps
:用于提升时间戳的水印策略。带有无序时间戳的记录将被视为迟到。BoundedOutOfOrderTimestamps(delay)
:时间戳的水印策略,指定的延迟最多是无序的。PreserveWatermarks()
:一种策略,指示应从底层保存水印DataStream
。
WatermarkStrategy
可以通过实现相应的接口来定义自定义。
使用Projection Push-Down定义TableSource
A TableSource
通过实现ProjectableTableSource
接口支持Projection下推。该接口定义了一个方法:
ProjectableTableSource<T> {
public TableSource<T> projectFields(int[] fields);
}
ProjectableTableSource[T] {
def projectFields(fields: Array[Int]): TableSource[T]
}
projectFields(fields)
:返回一个副本的的TableSource
与调整身体返回类型。该fields
参数提供必须由提供的字段的索引TableSource
。索引与TypeInformation
物理返回类型有关,而与逻辑表模式无关。复制TableSource
必须调整其返回类型和返回的DataStream
或DataSet
。在TableSchema
复制的TableSource
不能改变,即它必须跟原来一样TableSource
。如果TableSource
实现DefinedFieldMapping
接口,则必须将字段映射调整为新的返回类型。
在ProjectableTableSource
增加支持项目平场。如果TableSource
使用嵌套模式定义表,则可以实现NestedFieldsProjectableTableSource
将Projection扩展到嵌套字段。的NestedFieldsProjectableTableSource
定义如下:
NestedFieldsProjectableTableSource<T> {
public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
}
NestedFieldsProjectableTableSource[T] {
def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
projectNestedField(fields, nestedFields)
:返回一个副本的的TableSource
与调整身体返回类型。可以删除或重新排序物理返回类型的字段,但不得更改其类型。该方法的合同与该方法基本相同ProjectableTableSource.projectFields()
。此外,该nestedFields
参数包含fields
列表中每个字段索引,该列表指向查询访问的所有嵌套字段的路径。所有其他嵌套字段不需要在由TableSource
。生成的记录中读取,解析和设置。重要信息不得更改Projection字段的类型,但可以将未使用的字段设置为空或默认值。
使用过滤器下推定义TableSource
该FilterableTableSource
界面增加了对过滤器下推的支持TableSource
。一个TableSource
扩展这个接口能够过滤记录,从而使返回DataStream
或者DataSet
返回较少的记录。
界面如下:
FilterableTableSource<T> {
public TableSource<T> applyPredicate(List<Expression> predicates);
public boolean isFilterPushedDown();
}
FilterableTableSource[T] {
def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
def isFilterPushedDown: Boolean
}
applyPredicate(predicates)
:返回一个副本的TableSource
添加了谓词。该predicates
参数是一个可变的联合谓词列表,它被“提供”给TableSource
。在TableSource
受理用户从列表中删除它来评估一个谓语。列表中剩余的谓词将由后续过滤器 算子进行评估。isFilterPushedDown()
:如果applyPredicate()
之前调用该方法,则返回true 。因此,isFilterPushedDown()
必须对TableSource
从applyPredicate()
调用返回的所有实例返回true 。
定义TableSink
A TableSink
指定如何向Table
外部系统或位置发射。该接口是通用的,因此它可以支持不同的存储位置和格式。批处理表和流表有不同的表接收器。
一般界面如下所示:
TableSink<T> {
public TypeInformation<T> getOutputType();
public String[] getFieldNames();
public TypeInformation[] getFieldTypes();
public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
}
TableSink[T] {
def getOutputType: TypeInformation<T>
def getFieldNames: Array[String]
def getFieldTypes: Array[TypeInformation]
def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
}
TableSink#configure
调用该方法以传递表的模式(字段名称和类型)以发送到TableSink
。该方法必须返回TableSink的新实例,该实例配置为发出提供的表模式。
BatchTableSink
定义外部TableSink
以发出批处理表。
界面如下:
BatchTableSink<T> extends TableSink<T> {
public void emitDataSet(DataSet<T> dataSet);
}
BatchTableSink[T] extends TableSink[T] {
def emitDataSet(dataSet: DataSet[T]): Unit
}
AppendStreamTableSink
定义外部TableSink
以发出仅具有插入更改的流表。
界面如下:
AppendStreamTableSink<T> extends TableSink<T> {
public void emitDataStream(DataStream<T> dataStream);
}
AppendStreamTableSink[T] extends TableSink[T] {
def emitDataStream(dataStream: DataStream<T>): Unit
}
如果还通过更新或删除更改来修改表,TableException
则将抛出a。
RetractStreamTableSink
定义外部TableSink
以发出包含插入,更新和删除更改的流表。
界面如下:
RetractStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {
public TypeInformation<T> getRecordType();
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
def getRecordType: TypeInformation[T]
def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
该表将被转换为累积和撤销消息流,这些消息被编码为Java Tuple2
。第一个字段是一个布尔标志,用于指示消息类型(true
表示插入,false
表示删除)。第二个字段保存所请求类型的记录T
。
UpsertStreamTableSink
定义外部TableSink
以发出包含插入,更新和删除更改的流表。
界面如下:
UpsertStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {
public void setKeyFields(String[] keys);
public void setIsAppendOnly(boolean isAppendOnly);
public TypeInformation<T> getRecordType();
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
def setKeyFields(keys: Array[String]): Unit
def setIsAppendOnly(isAppendOnly: Boolean): Unit
def getRecordType: TypeInformation[T]
def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
该表必须具有唯一的键字段(原子或复合)或仅附加。如果表没有唯一键并且不是仅附加,TableException
则将抛出a。表的唯一键由该UpsertStreamTableSink#setKeyFields()
方法配置。
该表将被转换为upsert和delete消息流,这些消息被编码为Java Tuple2
。第一个字段是一个布尔标志,用于指示消息类型。第二个字段保存所请求类型的记录T
。
具有true布尔字段的消息是已配置Keys的upsert消息。带有错误标志的消息是已配置Keys的删除消息。如果表是仅附加的,则所有消息都将具有true标志,并且必须解释为插入。
定义TableFactory
A TableFactory
允许从基于字符串的属性创建不同的表相关实例。调用所有可用工厂以匹配给定的属性集和相应的工厂类。
工厂利用Java服务提供者接口(SPI)进行发现。这意味着每个依赖项和JAR文件都应该org.apache.flink.table.factories.TableFactory
在META_INF/services
资源目录中包含一个文件,该文件列出了它提供的所有可用表工厂。
每个表工厂都需要实现以下接口:
package org.apache.flink.table.factories;
interface TableFactory {
Map<String, String> requiredContext();
List<String> supportedProperties();
}
package org.apache.flink.table.factories
trait TableFactory {
def requiredContext(): util.Map[String, String]
def supportedProperties(): util.List[String]
}
requiredContext()
:指定已为此工厂实现的上下文。如果满足指定的属性和值集,框架保证仅匹配此工厂。典型属性可能是connector.type
,format.type
或update-mode
。诸如connector.property-version
和的属性键format.property-version
保存用于将来的向后兼容性情况。supportedProperties
:此工厂可以处理的属性键列表。此方法将用于验证。如果传递了该工厂无法处理的属性,则会抛出异常。该列表不得包含上下文指定的键。
为了创建特定实例,工厂类可以实现以下提供的一个或多个接口org.apache.flink.table.factories
:
BatchTableSourceFactory
:创建批处理表源。BatchTableSinkFactory
:创建批处理表接收器。StreamTableSoureFactory
:创建流表源。StreamTableSinkFactory
:创建流表接收器。DeserializationSchemaFactory
:创建反序列化架构格式。SerializationSchemaFactory
:创建序列化架构格式。
工厂的发现发生在多个阶段:
- 发现所有可用的工厂。
- 按工厂类别过滤(例如
StreamTableSourceFactory
)。 - 通过匹配上下文过滤。
- 按支持的属性过滤。
- 验证一个工厂是否匹配,否则抛出一个
AmbiguousTableFactoryException
或NoMatchingTableFactoryException
。
以下示例显示如何为参数化提供带有附加connector.debug
属性标志的自定义流式源。
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class MySystemTableSourceFactory extends StreamTableSourceFactory<Row> {
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put("update-mode", "append");
context.put("connector.type", "my-system");
return context;
}
@Override
public List<String> supportedProperties() {
List<String> list = new ArrayList<>();
list.add("connector.debug");
return list;
}
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));
# additional validation of the passed properties can also happen here
return new MySystemAppendTableSource(isDebug);
}
}
import java.util
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row
class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
context.put("update-mode", "append")
context.put("connector.type", "my-system")
context
}
override def supportedProperties(): util.List[String] = {
val properties = new util.ArrayList[String]()
properties.add("connector.debug")
properties
}
override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))
# additional validation of the passed properties can also happen here
new MySystemAppendTableSource(isDebug)
}
}
在SQL客户端中使用TableFactory
在SQL客户端环境文件中,先前显示的工厂可以声明为:
tables:
- name: MySystemTable
type: source
update-mode: append
connector:
type: my-system
debug: true
YAML文件被转换为扁平字符串属性,并使用描述与外部系统的连接的属性调用表工厂:
update-mode=append
connector.type=my-system
connector.debug=true
注意属性,例如tables.#.name
或是tables.#.type
SQL客户端细节,不会传递给任何工厂。该type
属性决定,取决于运行环境,无论是BatchTableSourceFactory
/ StreamTableSourceFactory
(对source
),一个BatchTableSinkFactory
/ StreamTableSinkFactory
(对于sink
),或两者(对both
)需要发现。
在Table&SQL API中使用TableFactory
对于具有解释性Scaladoc / Javadoc的类型安全的编程方法,Table&SQL API提供org.apache.flink.table.descriptors
了转换为基于字符串的属性的描述符。请参阅源,接收器和格式的内置描述符作为参考。
MySystem
我们示例中的连接器可以扩展ConnectorDescriptor
,如下所示:
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;
/**
* Connector to MySystem with debug mode.
*/
public class MySystemConnector extends ConnectorDescriptor {
public final boolean isDebug;
public MySystemConnector(boolean isDebug) {
super("my-system", 1, false);
this.isDebug = isDebug;
}
@Override
public void addConnectorProperties(DescriptorProperties properties) {
properties.putString("connector.debug", Boolean.toString(isDebug));
}
}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import org.apache.flink.table.descriptors.DescriptorProperties
/**
* Connector to MySystem with debug mode.
*/
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, formatNeeded = false) {
override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
properties.putString("connector.debug", isDebug.toString)
}
}
然后可以在API中使用描述符,如下所示:
StreamTableEnvironment tableEnv = // ...
tableEnv
.connect(new MySystemConnector(true))
.inAppendMode()
.registerTableSource("MySystemTable");
val tableEnv: StreamTableEnvironment = // ...
tableEnv
.connect(new MySystemConnector(isDebug = true))
.inAppendMode()
.registerTableSource("MySystemTable")