如何在 PyFlink 1.10 中自定义 Python UDF?

  • 时间:
  • 浏览:0
  • 来源:5分11选5APP下载_5分11选5APP官方

直观的判断,PyFlink Python UDF 的功能并能这样 如上图一样并能越来更慢从幼苗变成大树,为什么会么会有此判断,请继续往下看…

或者在使用从前进行注册,如下:

目前为止,朋友由于完成了 Python UDF 的定义,声明和注册了。接下来朋友还是看从前完全的示例吧:)

上端这图是 Beam Portability Framework 的架构图,他描述了 Beam 怎么支持多语言,怎么支持多 Runner,单独说 Apache Flink 的从前朋友就能这样 说是 Beam on Flink,这样 为什么会么会解释 Flink on Beam 呢?

示例的代码在上端下载的源代码上端由于所含了,为了简单,朋友利用 PyCharm 打开enjoyment.code/myPyFlink。一并在 Terminal 启动从前端口:

在 Apache Flink 1.10 中朋友所说的 Flink on Beam 更精确的说是 PyFlink on Beam Portability Framework。朋友看一下简单的架构图,如下:

启动 blog_demo,由于一切顺利,启动从前,控制台会输出从前 web 地址,如下所示:

UDF 定义

朋友知道 PyFlink 是在 Apache Flink 1.9 版新增的,这样 在 Apache Flink 1.10 中 Python UDF 功能支持的速率单位是不是并能满足用户的急切需求呢?

每一笔订单是从前字符串,字段用逗号分隔, 例如:

这样 定义完 UDF 朋友应该怎么使用呢?Apache Flink 1.10 中提供了 2 种 Decorators,如下:

假设苹果手机6机6公司要统计该公司产品在双 11 期间各城市的销售数量和销售金额分布具体情况。

并能这样 查看一下,朋友核心并能 apache-beam 和 apache-flink,如下命令:

在进行编译代码从前,朋友并能你由于安装了 JDK8 和 Maven3x。

阅读原文可点击:原文链接

概要了解了 Apache Flink 1.10 中 Python UDF 的架构从前,朋友还是切入的代码帕累托图,看看怎么开发和使用 Python UDF。

上端代码中朋友会发现从前陌生的帕累托图,假若 from pyflink.demo import ChartConnector, SocketTableSource. 其中 pyflink.demo 是哪里来的呢?着实假若所含了上端朋友介绍的 自定义 Source/Sink(Java&Python)。下面朋友来介绍怎么增加这个 pyflink.demo 模块。

如上信息证明你朋友所需的 Python 依赖由于没问题了,接下来回过头来在看看怎么进行业务需求的开发。

由于你本地环境 python 命令版本是 2.x,这样 并能对 Python 版本进行设置,如下:

由于目前 PyFlink 还这样 部署到 PyPI 上端,在 Apache Flink 1.10 发布从前,朋友并能通过构建 Flink 的 master 分支源码来构建运行朋友 Python UDF 的 PyFlink 版本。

PyFlink 1.10 从前支持 Python 3.6+ 版本。

在 Apache Flink 1.10 中朋友有多种方法进行 UDF 的定义,比如:

核心的统计逻辑是根据 city 进行分组,或者对 销售数量和销售金额进行求和,如下:

如下代码朋友发现核心实现逻辑非常简单,只并能对数据进行解析和对数据进行集合计算:

在 chart_table_sink.py 朋友封装了从前 http server,从前朋友能这样 在浏览器中查阅朋友的统计结果。

完成自定义的 Source 和 Sink 从前朋友终于能这样 进行业务逻辑的开发了,着实整个过程自定义 Source 和 Sink 是最麻烦的,核心计算逻辑似乎要简单的多。

从前完成的 PyFlink 的 Job 并能有外部数据源的定义,有业务逻辑的定义和最终计算结果输出的定义。也假若 Source connector, Transformations, Sink connector,接下来朋友根据这个从前帕累托图进行介绍来完成朋友的需求。

随之订单数据的不断输入,统计图不断变化。从前完全的 GIF 演示如下:

CsvRetractTableSink 的核心逻辑是缓冲计算结果,每次更新进行一次全量(这是个纯 demo,这样 用于生产环境)文件输出。源代码查阅 CsvRetractTableSink。

朋友看得人基础环境安装比较简单,我这里就不每从前都贴出来了。由于朋友有问题欢迎邮件由于博客留言。

作者:孙金城(金竹)

为了朋友方便我把自定义 Source/Sink(Java&Python)的源代码上放了这里 ,朋友能这样 进行如下操作:

本篇从架构到 UDF 接口定义,再到具体的实例,向朋友介绍了在 Apache Flink 1.10 发布从前,怎么利用 PyFlink 进行业务开发,其中 用户自定义 Source 和 Sink帕累托图比较比较复杂,这也是目前社区并能进行改进的帕累托图(Java/Scala)。真正的核心逻辑帕累托图着实比较简单,为了朋友按照本篇进行实战操作一些成就感,一些我增加了自定义 Source/Sink 和图形化帕累托图。但由于朋友想比较复杂实例的实现并能这样 利用 Kafka 作为 Source 和 Sink,从前就能这样 省去自定义的帕累托图,做起来也会简单一些。

除了 JDK 和 MAVEN 完全的环境依赖性如下:

朋友都知道有 Beam on Flink 的场景,假若 Beam 支持多种 Runner,也假若说 Beam SDK 编写的 Job 能这样 运行在 Flink 之上。如下图所示:

接下来就能这样 在 Table API/SQL 中进行使用了,如下:

当输入一根绳子 绳子 订单 苹果手机6机6 11,60 ,5499,Beijing,从前,页面变化如下:

注册 UDF

朋友尝试将下面的数据,一根绳子 ,一根绳子 的发送给 Source Connector:

朋友并能实现从前 Socket Connector,首这样实现从前 StreamTableSource, 核心代码是实现 getDataStream,代码如下:

Beam Portability Framework 是从前心智心智成长期期期期的句子的句子 图片 图片 图片 图片 图片 的多语言支持框架,框架深度抽象了语言之间的通信协议(gRPC),定义了数据的传输格式(Protobuf),或者根据通用流计算框架所并能的组件,抽象个各种服务,比如 DataService,StateService,MetricsService 等。在从前从前心智心智成长期期期期的句子的句子 图片 图片 图片 图片 图片 的框架下,PyFlink 能这样 快速的构建本人的 Python 算子,一并重用 Apache Beam Portability Framework 中现有 SDK harness 组件,能这样 支持多种 Python 运行模式,如:Process,Docker,etc.,这使得 PyFlink 对 Python UDF 的支持变得非常容易,在 Apache Flink 1.10 中的功能也非常的稳定和完全。这样 为什么会么会说是 Apache Flink 和 Apache Beam 一并打造呢,是由于我发现目前 Apache Beam Portability Framework 的框架也地处一些优化的空间,一些我在 Beam 社区进行了优化讨论,或者在 Beam 社区也贡献了 20+ 的优化补丁。

朋友发现上端定义函数除了第从前扩展 ScalaFunction 的方法是 PyFlink 特有的,一些方法并能 Python 语言一种就支持的,也假若说,在 Apache Flink 1.10 中 PyFlink 允许以任何 Python 语言所支持的方法定义 UDF。

计算结果写入到朋友自定义的 Sink 中,如下:

朋友预期要得到的从前效果是并能将结果数据进行图形化展示,简单的思路是将数据写到从前本地的文件,或者在写从前 HTML 页面,使其并能自动更新结果文件,并展示结果。一些朋友并能自定义从前 Sink 来完成该功能,朋友的需求计算结果是会不断的更新的,也假若涉及到 Retraction(由于朋友不理解这个概念,能这样 查阅我从前的博客),目前在 Flink 上端还这样 默认支持 Retract 的 Sink,一些朋友并能自定义从前 RetractSink,比如朋友实现一下 CsvRetractTableSink。

朋友并能对上端列进行分析,为了演示 Python UDF,朋友在 SocketTableSource中并这样 对数据进行预避免,一些朋友利用上端 UDF 定义 一节定义的 UDF,来对原始数据进行预避免。

一并,朋友并能在 Python 封装从前 SocketTableSource,详情查阅 socket_table_source.py。

上端这个行代码定义了监听端口 9999 的数据源,一并底部形态化 Table 这样 从前名为 line 的列。

PyFlink 读取数据源非常简单,如下:

冒出上端信息证明由于将 PyFlink.demo 模块成功安装。接下来朋友能这样 运行朋友的示例了 :)

上端的代码朋友假设是从前 Socket 的 Source,Sink 是从前 Chart Sink,这样 最终运行效果图,如下:

上端代码利用了 StreamExecutionEnvironment 中现有 socketTextStream 方法接收数据,或者将业务订单数据传个从前 FlatMapFunction, FlatMapFunction 主要实现将数据类型封装为 Row,完全代码查阅 Spliter。

朋友打开这个页面,开始了了是从前空白页面,如下:

根据案例的需求和数据底部形态分析,朋友并能对原始字符串进行底部形态化解析,这样 并能从前按“,”号分隔的 UDF(split) 和从前并能将各个列信息展平的 DUF(get)。一并朋友并能根据城市进行分组统计。

我老会 认为在博客中假若文本描述而这样 让读者真正的在本人的机器上运行起来的博客,并能好博客,一些接下来朋友看看按照朋友下面的操作,是不是能在你的机器上也运行起来?:)

一并朋友并能利用 Python 进行封装,详见 chart_table_sink.py。