请选择 进入手机版 | 继续访问电脑版
 找回密码
 立即注册

QQ登录

只需要一步,快速开始


作者 | 滕昱、黄飞情、周煜敏 编辑 | 陈思 AI 前线导读:Pravega 与 Flink 的设计理念类似,都以流为基础实现流批统一的接口便于应用使用。Pravega 团队也希望与 Flink 一起打造从底层存储到上层计算的统一大数据流水线架构。在开发层面,Pravega 与 Flink 也有着深度的合作,Pravega Flink Connector 的开发,特别是在 Flink 的端到端的仅一次语义实现的过程中,都得到了 Apache Flink PMC 成员的通力协作和大力支持。本文将从 API 的简介及使用入手,重点介绍 Pravega+Flink 的流计算编程。
更多优质内容请关注微信公众号“AI 前线”(ID:ai-front) 简 介 Pravega 是根据 Apache 2.0 许可证开源的流存储引擎,为连续流数据提供统一的 Stream 抽象。Pravega 提供了持久化、强一致性以及高性能低延迟的数据存储。同样在实时大数据领域,Apache Flink 是由 Apache 软件基金会开发的开源分布式处理引擎,用于对无界和有界数据流进行有状态的计算。Flink 提供高吞吐量、低延迟的流数据计算,以及对事件发生时间处理和状态管理的支持。Flink 的应用程序在发生机器故障时具有容错能力,并且支持 exactly-once 语义。Pravega 与 Flink 的设计理念类似,都以流为基础实现流批统一的接口便于应用使用。Pravega 团队也希望与 Flink 一起打造从底层存储到上层计算的统一大数据流水线架构。Pravega 从诞生之初就积极参加 Flink 社区的活动,自从 2017 年起在每一次的 Flink Forward 大会上都有 相关内容 的分享,包括在 2018 年 12 月第一次在中国举办的 Flink Forward 同样也有 Pravega 中国团队的 参与。在开发层面,Pravega 与 Flink 也有着深度的合作,Pravega Flink Connector 的开发,特别是在 Flink 的端到端的仅一次语义实现的过程中,都得到了 Apache Flink PMC 成员的通力协作和大力支持。 Pravega Flink Connectors Pravega Flink Connectors 实现了 Flink 的接口,提供了对 Pravega Stream 的读取和写入,并且结合 Flink 的 Checkpoint 机制提供了端到端的 exactly-once 处理语义(详情可见 上一篇文章)。Flink 对数据有两种读和写处理办法,对应的,每一种 API 都需要定义 Flink 程序的数据源 (Source) 和数据汇 (Sink)。Pravega Flink Connector 打通存储与计算之间的通道,Pravega 就可以作为统一的流存储和消息总线,用户可以使用统一的 Flink API 在 Pravega Stream 上进行批或者流计算,构建一个完整的实时数据仓库。Flink 在数据源上自底向上有着 4 层抽象的 API,对于 Pravega 而言需要提供三种不同的 API 来满足不同的使用需求:
DataStream APIDataSet APITable API

在介绍这三种 API 之前,我们首先先了解一下这些 API 所共有的参数。
共有配置 1. PravegaConfig 类 Pravega Flink Connectors 提供了一个配置对象PravegaConfig,用于为 Flink 配置 Pravega 的上下文。PravegaConfig会自动从环境变量、配置文件属性和程序运行时参数进行配置。PravegaConfig 信息来源如下: [td]设置[/td][td]环境变量 / 系统属性 / 程序参数[/td][td]缺省值[/td]Controller URIPRAVEGA_CONTROLLER_URI / pravega.controller.uri / --controllertcp://localhost:9090Default ScopePRAVEGA_SCOPE / pravega.scope / --scope-Credentials--Hostname Validation-true创建 PravegaConfig创建PravegaConfig实例的推荐方法是利用 Flink 的ParameterToolParameterTool params = ParameterTool.fromArgs(args);
PravegaConfig config = PravegaConfig.fromParams(params);
如果您的应用程序不使用 Flink 提供的ParameterTool类,也可以使用fromDefaults创建PravegaConfig:PravegaConfig config = PravegaConfig.fromDefaults();
此外,PravegaConfig也提供了 builder->PravegaConfig config = PravegaConfig.fromDefaults()
.withControllerURI("tcp://...")
.withDefaultScope("SCOPE-NAME")
.withCredentials(credentials)
.withHostnameValidation(false);
使用 PravegaConfig随连接器库提供的所有各种源和接收器类都具有 builder->PravegaConfig。通过withPravegaConfig将PravegaConfig对象传递给相应的构建器。例如:PravegaConfig config = ...;
FlinkPravegaReader pravegaSource = FlinkPravegaReader.builder()
.forStream(...)
.withPravegaConfig(config)
.build();
值得注意的是,source 或 sink 的 stream 可以使用完整的 stream 名称(使用/分隔 scope 和 stream,例如my-scope/my-stream),也可以在设置DefaultScope的情况下使用不完整的 stream 名称(例如my-stream)。 2. 序列化 / 反序列化 序列化 / 反序列化是指 Flink 程序中的数据元素与 Pravega Stream 中存储的二进制消息相互转换的过程。Flink 定义了数据序列化 / 反序列化的标准接口,核心接口是:
org.apache.flink.api.common.serialization.SerializationSchemaorg.apache.flink.api.common.serialization.DeserializationSchemaFlink 内置的序列化器包括:
org.apache.flink.api.common.serialization.SimpleStringSchemaorg.apache.flink.api.common.serialization.TypeInformationSerializationSchemaPravega Connector 可以使用 Flink 的序列化接口。例如,要将每个事件读取为 UTF-8 字符串:DeserializationSchema schema = new SimpleStringSchema();
FlinkPravegaReader reader = new FlinkPravegaReader(..., schema);
DataStream stream = env.addSource(reader);
与其他应用程序的互操作性更常见的情况是,Pravega 的数据由其它客户端程序注入,使用 Flink 处理。此类应用程序使用的 Pravega 客户端库定义了用于处理事件数据的 io.pravega.client.stream.Serializer 接口。Pravega 提供了内置的适配器,实现了序列化方法的转化,使得 Flink 程序能够正常读写 Pravega 中的数据
io.pravega.connectors.flink.serialization.PravegaSerializationSchemaio.pravega.connectors.flink.serialization.PravegaDeserializationSchema下面是一个示例,将实现了 Pravega Serializer 接口的内置 Java POJO 类序列化器JavaSerializer传递给适配器的构造函数,最终数据转化为DataStream进行进一步处理:import io.pravega.client.stream.impl.JavaSerializer;
...
DeserializationSchema adapter = new PravegaDeserializationSchema(
MyEvent.class, new JavaSerializer());
FlinkPravegaReader reader = new FlinkPravegaReader(..., adapter);
DataStream stream = env.addSource(reader);
Pravega 序列化程序必须实现java.io.Serializable才能在 Flink 程序中使用。 3. Stream Cuts StreamCut表示 Pravega 流中的特定位置,可以从与 Pravega 客户端的各种 API 交互中获得,它包含一组 segment 和 offset 的键值对。偏移量始终指向事件边界,因此没有指向不完整事件的 offset。Pravega 中的 Checkpoint 底层也由这一 API 实现。读客户端可以接受StreamCut作为给定流的开始和 / 或结束位置。由于数据在不断产生以及不断地下沉至第二级存储,stream 的开始和结束位置都会发生变化,因此 Pravega 使用StreamCut.UNBOUNDED表示 Stream 中的不断变化的位置。这样的设计有助于使得 Flink 使用一套统一的 API 读取用户自定义的有边界和无边界的数据流。 DataStream API DataStream API 是 Flink 最常用的 API,主要负责数据流的处理。数据流通过 Source 来创建,结果通过 Sink 返回,中间可以进行丰富的有状态的 transformation 操作。Pravega Flink Connectors 扩展了 Flink 的RichParallelSourceFunction和RichSinkFunction,以 DataStream API 实现了 Pravega 的数据读写。 FlinkPravegaReader 使用io.pravega.connectors.flink.FlinkPravegaReader的实例作为 Flink 程序的数据源 (Source)。FlinkPravegaReader读取 Pravega 的一个或多个 Stream,抽象为 Flink 的DataStream。使用 StreamExecutionEnvironment::addSource 方法将 Pravega Stream 作为 DataStream 打开。代码示例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(params);
// Define the event deserializer
DeserializationSchema deserializer = ...
// Define the data stream
FlinkPravegaReader pravegaSource = FlinkPravegaReader.builder()
.forStream(...)
.withPravegaConfig(config)
.withDeserializationSchema(deserializer)
.build();
DataStream stream = env.addSource(pravegaSource);
Pravega Flink Connectors 提供 builder->FlinkPravegaReader的实例,通常需要指定PravegaConfig,读取的 Stream 名称以及反序列化方法。如果需要并行读取多个 Stream,可以反复调用forStream。Stream 也可以跨 scope 被 Flink 所读取,只需指定完整的 stream 名称scope/stream 即可。更多参数可参阅 此表。forStream提供了一个重载方法,可以接受StreamCut类型的参数以处理历史流数据。FlinkPravegaReader 支持并行化,可使用setParallelism方法配置要执行的并行实例的数量。每个实例消耗一个或多个 segment。读客户端与 Flink checkpoints 和 savepoints 兼容,其中会包含从正确位置恢复所需的所有信息,读客户端支持倒回到 Pravega Checkpoint 位置从故障中恢复。Checkpoint 的工作过程分为两步:
FlinkPravegaReader在初始化期间注册 ReaderCheckpointHook,Job manager 中的 master hook 处理程序启动 triggerCheckpoint request 到ReaderCheckpointHook。ReaderCheckpointHook处理程序通知 Pravega 检查当前读客户端状态。这是一个非阻塞调用,一旦 Pravega 读者完成了检查点,就会返回。Pravega 将发送 CheckPoint 事件作为数据流流的一部分,并且在接收事件时,FlinkPravegaReader将启动triggerCheckpoint请求以有效地让 Flink 继续并完成检查点过程。FlinkPravegaReader默认开启性能指标 (Metrics) 监控,可使用.enableMetrics(false)选项禁用。用户可以实时地观察 Flink 消费 Pravega 数据的运行状态,性能指标包括: [td]名称[/td][td]说明[/td]readerGroupNameReader 组名称scopeReader 组的范围名称streams作为 Reader 组一部分的流的完全限定名称 (i.e., scope/stream)onlineReaders当前在线 / 可用的 ReadersegmentPositionsStreamCut信息,指示 Reader 到目前为止所阅读的位置。unreadBytes尚未读取的总字节数 FlinkPravegaWriter 使用io.pravega.connectors.flink.FlinkPravegaWriter的实例作为 Flink 程序里面的数据汇 (Sink)。使用 DataStream::addSink 方法将写客户端的实例添加到 Flink 程序中。代码示例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(params);
// Define the event serializer
SerializationSchema serializer = ...
// Define the event router for selecting the Routing Key
PravegaEventRouter router = ...
// Define the sink function
FlinkPravegaWriter pravegaSink = FlinkPravegaWriter.builder()
.forStream(...)
.withPravegaConfig(config)
.withSerializationSchema(serializer)
.withEventRouter(router)
.withWriterMode(EXACTLY_ONCE)
.build();
DataStream stream = ...
stream.addSink(pravegaSink);
FlinkPravegaWriter同样利用 builder API 构造,通常需要指定PravegaConfig,写入的 Stream 名称,写入的模式,路由函数以及序列化方法。完整参数列表可参阅 此表。写入 Pravega stream 的每个事件都有一个关联的路由键。路由键是事件分 segment 的依据以及有序性保证的基础,详细信息可参阅之前 Pravega 系列文章。用户需要提供io.pravega.connectors.flink.PravegaEventRouter接口的实现,例如,为了保证特定于传感器 id 的写入顺序,您可以提供如下所示的路由实现。private static class SensorEventRouter implements PravegaEventRouter {
@Override
public String getRoutingKey(SensorEvent event) {
return event.getId();
}
}
用户可以进一步使用FlinkPravegaUtils::writeToPravegaInEventTimeOrder 方法将给定的 DataStream 按照事件时间顺序写入 Pravega 流,该方法将自动对事件按事件时间进行排序 (基于每个键)。FlinkPravegaWriter根据性能以及持久化保证的权衡,支持三种写模式:
[ol]最多一次(BEST_EFFORT):任何写入失败都将被忽略, 因此可能会出现数据丢失。最少一次(ATLEAST_ONCE):所有事件都在 Pravega 持续存在。由于重试或在失败和后续恢复的情况下, 可能会发生重复的事件。仅一次(EXACTLY_ONCE):集成 Flink Checkpoint 功能使用事务性写入,在 Pravega 中保留所有事件且仅写入一次。[/ol]默认情况下, 启用ATLEAST_ONCE选项。 DataSet API DataSet API 是 Flink 进行批处理程序中使用的 API。Pravega Stream 作为数据源和数据汇。Pravega Flink Connectors 扩展了 Flink 的RichInputFormat和RichOutputFormat,实现了 DataSet API 的 Pravega 读写。参数与 DataStream API 类似。 FlinkPravegaInputFormat 使用io.pravega.connectors.flink.FlinkPravegaInputFormat的实例用作 Flink 批处理程序中的数据源。输入格式将流的事件作为 DataSet(Flink Batch API 的基本抽象)读取。此输入格式并行处理 stream segments,而不遵循路由键顺序。使用ExecutionEnvironment::createInput方法将 Pravega Stream 作为 DataSet 打开。代码示例:// Define the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(params);
// Define the event deserializer
DeserializationSchema deserializer = ...
// Define the input format based on a Pravega stream
FlinkPravegaInputFormat inputFormat = FlinkPravegaInputFormat.builder()
.forStream(...)
.withPravegaConfig(config)
.withDeserializationSchema(deserializer)
.build();
DataSource dataSet = env.createInput(inputFormat, TypeInformation.of(EventType.class)
.setParallelism(2);
FlinkPravegaOutputFormat 使用io.pravega.connectors.flink.FlinkPravegaOutputFormat的实例用作 Flink 批处理程序中的数据汇。使用DataSet::output方法将写客户端的实例添加到 Flink 程序中。代码示例:// Define the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(params);
// Define the event serializer
SerializationSchema serializer = ...
// Define the event router for selecting the Routing Key
PravegaEventRouter router = ...
// Define the input format based on a Pravega Stream
FlinkPravegaOutputFormat outputFormat = FlinkPravegaOutputFormat.builder()
.forStream(...)
.withPravegaConfig(config)
.withSerializationSchema(serializer)
.withEventRouter(router)
.build();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Collection inputData = Arrays.asList(...);
env.fromCollection(inputData)
.output(outputFormat);
env.execute("...");
Table API Table API 是一种关系型 API,用户可以使用简单的 SQL 操作数据,降低了大数据分析的门槛。更重要的是,Table API 统一了 Flink 里批和流不同的 API。在同一套 API 下,批处理的查询返回有限数据集,而流处理的查询则会持续返回流式结果。因此,这也是 Flink 社区贡献的热点和 Flink 未来的发展重点。Pravega Flink Connectors 提供了FlinkPravegaTableSource和FlinkPravegaTableSink使用 Flink Table API 读写 Pravega 数据,以便用户使用 SQL 语句操作数据。用户既可以在代码中使用 Pravega Descriptor 指定数据源,也可以通过声明式的 YAML 配置文件启用 SQL client. FlinkPravegaTableSource/Sink Pravega Stream 可以用作 Flink Table 程序中的 Table 源。Flink Table API 面向 Flink 的 TableSchema 类,它们描述了 table 字段。然后使用FlinkPravegaTableSource/Sink的具体子类将流数据解析为符合 table 模式的 Row 对象进行相应的读写。FlinkPravegaTableSource 通过 TableEnvironment::registerTableSource连接,Sink 的创建过程与 Source 类似,FlinkPravegaTableSink 通过 table.writeToSink(sink)写入。代码示例:以下示例使用 Table API 从 Pravega Stream 读取 JSON 格式的用户网站访问事件:// define table schema definition
Schema schema = new Schema()
.field("user", Types.STRING())
.field("uri", Types.STRING())
.field("accessTime", Types.SQL_TIMESTAMP()).rowtime(
new Rowtime().timestampsFromField("accessTime")
.watermarksPeriodicBounded(30000L));
// define pravega reader configurations using Pravega descriptor
Pravega pravega = new Pravega();
pravega.tableSourceReaderBuilder()
.withReaderGroupScope(stream.getScope())
.forStream(stream)
.withPravegaConfig(pravegaConfig);
// Streaming Source
StreamExecutionEnvironment execEnvRead = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnvRead);
StreamTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true).deriveSchema())
.withSchema(schema)
.inAppendMode();
final Map propertiesMap = DescriptorProperties.toJavaMap(desc);
final TableSource source = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
.createStreamTableSource(propertiesMap);
tableEnv.registerTableSource("MyTableRow", source);
String sqlQuery = "SELECT user, count(uri) from MyTableRow GROUP BY user";
Table result = tableEnv.sqlQuery(sqlQuery);
...
利用TableEnvironment::connect连接 Pravega 的相应 stream,并通过withFormat方法指定数据格式,withSchema方法指定表结构。之后通过相应的工厂模式,以及传入的装饰器的 Map 来构造FlinkPravegaTableSource的具体子类。FlinkPravegaTableSource/Sink支持 Flink 流和批处理环境。数据读取在流处理环境中使用FlinkPravegaReader/Writer实现;在批处理环境中使用FlinkPravegaInputFormat/OutputFormat实现。 快速入门 程序中使用 在编写 Flink 代码之前,需要确保 Flink 集群能够访问到 Pravega 集群,并且将 Pravega Connector 的依赖添加到项目中,pom.xml 如下:
io.pravega
pravega-connectors-flink_2.11
0.3.2
用户应该根据所运行的 Pravega 的版本使用对应的 Pravega Connector 版本。在运行 / 部署应用程序时,Pravega Flink Connector 不属于 Flink 的核心运行时,因此用户需要保证其代码必须是应用程序代码 artifacts (JAR 文件)的一部分。 使用 SQL 客户端 Flink SQL Client 是在 Flink 1.6 中引入的,旨在提供一种简单的方法来编写,调试和提交 Table API 程序到 Flink 集群,而无需编写 Java 或 Scala 代码。SQL Client CLI 允许在命令行上检索和可视化运行的分布式应用程序的实时结果。Pravega Stream 支持通过 Flink 的 SQL 客户端使用标准 SQL 命令访问。为此,必须下载以下文件(可使用 maven)并复制到 Flink 集群 library 路径:$FLINK_HOME/lib
Pravega connector jarFlink JSON jar(以 json 格式序列化 / 反序列化数据)Flink Avro jar(以 avro 格式序列化 / 反序列化数据)之后准备如下的 SQL 客户端 YAML 格式的配置文件,并确保任何相关的 Pravega Stream 已经创建。详细格式标准参见 文档。tables:
- name: sample
type: both
update-mode: append
# declare the external system to connect to
connector:
type: pravega
version: "1"
metrics: true
connection-config:
controller-uri: "tcp://localhost:9090"
default-scope: wVamQsOSaCxvYiHQVhRl
reader:
stream-info:
- stream: streamX
writer:
stream: streamX
mode: atleast_once
txn-lease-renewal-interval: 10000
routingkey-field-name: category
format:
type: json
fail-on-missing-field: true
derive-schema: true
schema:
- name: category
type: VARCHAR
- name: value
type: INT
functions: []
execution:
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0
之后使用命令$FLINK-HOME/bin/sql-client.sh embedded -d  以嵌入式模式运行 SQL client shell,若能成功运行SELECT 'Hello World',即可以运行 SQL 命令与 Pravega 进行交互。 Pravega 系列文章计划 Pravega 根据 Apache 2.0 许可证开源,0.5 版本即将发布。我们欢迎对流式存储感兴趣的大咖们加入 Pravega 社区,与 Pravega 共同成长。本篇文章为 Pravega 系列的最后一篇, 希望这系列文章能让你对流和流式实时计算有一个初步的了解,也欢迎感兴趣的朋友后续继续交流。下面是我们这个系列的文章标题,以备参考:
[ol]实时流处理 (Streaming) 统一批处理 (Batch) 的最后一块拼图:Pravega开源 Pravega 架构解析:如何通过分层解决流存储的三大挑战?Pravega 应用实战:为什么云原生特性对流存储至关重要“ToB” 产品必备特性: Pravega 的动态弹性伸缩取代 ZooKeeper!高并发下的分布式一致性开源组件 StateSynchronizer分布式一致性解决方案 - 状态同步器 (StateSynchronizer) API 示例流处理系统正确性基石:ExactlyOnce 的设计和实现Flink 流计算编程 --Pravega+Flink[/ol]作者简介 滕昱:就职于 DellEMC 非结构化数据存储部门 (Unstructured Data Storage) 团队并担任软件开发总监。2007 年加入 DellEMC 以后一直专注于分布式存储领域。参加并领导了中国研发团队参与两代 DellEMC 对象存储产品的研发工作并取得商业上成功。从 2017 年开始,兼任 Streaming 存储和实时计算系统的设计开发与领导工作。黄飞情,现就职于 DellEMC,10 年 + 存储、分布式虚拟化、云计算开发以及架构设计经验,现从事流存储和实时计算系统的设计与开发工作;周煜敏,复旦大学计算机专业研究生,从本科起就参与 DellEMC 分布式对象存储的实习工作。现参与 Flink 相关领域研发工作。参考链接:
[ol]https://www.pravega.iohttp://pravega.io/connectors/flink/docs/latest/https://github.com/pravega/flink-connectorshttps://github.com/pravega/pravega-samples/tree/master/flink-connector-examples[/ol]

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
发表评论
您需要登录后才可以回帖 登录 | 立即注册