Flink快速入门 2024年08月3日
0 收藏 0 点赞 941 浏览 3705 个字
摘要 :

         构建一个典型的 Flink 流式程序需要以下几步:  设置执行环境; 从数据源中读取一条或多条流; 通过一系列流式转换来实现应用逻辑; 选择性地将结果输出到一个……

         构建一个典型的 Flink 流式程序需要以下几步:

  1.  设置执行环境
  2. 数据源中读取一条或多条流;
  3. 通过一系列流式转换来实现应用逻辑
  4. 选择性地将结果输出到一个或多个数据汇中;
  5.  执行程序。

         Flink 程序都是通过延迟计算Lazily Execute)的方式执行。也就是说,那些创建数据源和转换操作的 API 调用不会立即触发任何实际的数据处理。相反,这些 API 调用只是在执行环境中创建一个执行计划。该计划包括从环境创建的流式数据源以及应用于这些数据源之上的一系列转换。只有在调用execute() 时,系统才会触发程序的执行

         流应用程序通常将其结果发送到一些外部系统External System),如 Apache Kafka、文件系统或数据库。Flink 提供了一组流式数据汇,可用于将数据写入不同的系统。也可以实现自己的流式数据汇。还有一些应用程序不发出结果,而是通过 Flink 的可查询状态(Queryable State)功能在内部保存结果。

         接下来让我们一起实现 Flink 版本的 WordCount。首先创建项目。

一、创建项目

        创建一个 Maven 项目,Flink 1.15.2 版本要求 Java 11Scala 2.12.x

八、快速入门

1.1 添加依赖

       完整 pom.xml 文件如下

八、快速入门

1.2 配置日志

       pom.xml 已添加 Log4j 日志依赖,接下来将自定义的 log4j2.properties 文件添加至项目的resources 文件夹即可。也可以将 Flink 安装包中 conf 目录下的 log4j*.properties 文件修改为log4j2.properties 再根据项目需求自行修改。

八、快速入门

1.3 准备数据

       在项目根路径下创建 data 目录, data 目录下再创建 wordcount 目录, wordcount 目录下创建以下两文件。

八、快速入门

 

二、DataSet API

2.1 Java

八、快速入门

2.2 Scala

八、快速入门

三、DataStream API

3.1 安装 NetCat

       NetCat(简称 nc)是一款强大的命令行网络工具,用来在两台机器之间建立 TCP/UDP 连接,并通过标准的输入输出进行数据的读写。

       官网:https://eternallybored.org/misc/netcat/

       下载地址:https://eternallybored.org/misc/netcat/netcat-win32-1.12.zip

八、快速入门

       下载后解压并将其配置为系统环境变量。

八、快速入门

       Linux 安装与使用 nc 方式如下:

八、快速入门

  •  l :开启监听模式,表示 nc 将处于监听模式。通常代表着开启一个服务,等待客户端来链接指定的端口
  • p <通信端口> :设置本地主机使用的通信端口
  • k <通信端口> :强制 nc 待命链接,使用 lp 选项,当客户端从服务端断开连接后,服务端也会停止监听。但通过 lk 选项,可以强制服务器保持连接并继续监听端口,简单的理解就是客户端断开连接后服务端不断开。

3.2 Java

八、快速入门

3.3 Scala

八、快速入门

四、代码简化

4.1  Scala

八、快速入门

4.2 Java

       对于 Java 版本的代码而言,就稍微有点麻烦了,我们一步步来。刚才编写的 Java 代码基本使用的都是匿名内部类的方式来实现的,通过查看源码发现这些接口都属于函数式接口(只有一个抽象方法的接口),那么可以使用 Lambda 表达式对代码进行优化,优化后代码如下:

八、快速入门

       以上代码编译阶段正常,但运行时会报以下错误。

八、快速入门

       大致意思是:Lambda 无法实现自动类型推断(因为泛型擦除),因为缺少泛型类型参数而导致无法运行。在许多情况下,当涉及 Java 泛型时,Lambda 方法不能为自动类型提取提供足够的信息。一个简单的解决方法是使用一个(匿名)类来实现对应的接口。否则,必须使用类型信息显式指定类型。Scala 就没有这个问题,所以 Scala 真香!

       这里的类型信息指的是:TypeInformation

五、TypeInformation

       Flink 程序所处理的流中的事件一般是对象类型,操作符接收对象输出对象。所以 Flink 的内部机制需要能够处理事件的类型。并且在网络中传输数据,或者将数据写入到状态后端、检查点和保存点中,都需要我们对数据进行序列化和反序列化。所以,为了更加高效的存储和便于类型检查,Flink 自己搞了一套类型系统,叫做 TypeInformation

       Flink 为何要别出心裁,自己搞一套类型系统呢?原因有二:

  • 一、是做类型检查,Flink 支持比较灵活的基于 field join group,需要先检查这个 field 是否可以作为 Key,或这个 field 是否可以做 join group
  • 二、是利于存储,这样就能针对不同的数据类型产生特定的序列化器,反序列化器和比较操作符。Flink 对不同类型做了不同的存储性能优化。

5.1 数据类型

      Flink 中,数据类型的描述信息都是定义在 TypeInformation 中,比较常用的 TypeInformaticaBasicTypeInfoTupleTypeInfoCaseClassTypeInfoPojoTypeInfo 等。

八、快速入门

5.1.1 原生数据类型(BasicTypeInfo

      包括所有的 Java 基本类型和装箱类型以及 voidStringDateBigDecimal BigInteger

      如下代码所示,通过从给定的元素集中创建 DataStream 数据集。

八、快速入门

5.1.2 基本类型数组(BasicArrayTypeInfo)

      包括基本类型的数组和 String 对象的数组,如下

八、快速入门

5.1.3 Java Tuples 类型(TupleTypeInfo

      Tuple 类是强类型,是一种组合数据类型,由固定数量的元素组成。Flink 实现的 Java Tuple 最多可以有 25 个元素,根据元素数量的不同,Tuple 都被实现成了不同的类:Tuple1Tuple2,一直到Tuple25Tuple 的元素可以通过它们的 public 属性访问—— f0f1f2 等等。或者使用getField(int pos) 方法来访问,元素下标从 0 开始。

八、快速入门

5.1.4 Case Class 类型(CaseClassTypeInfo

     支持任意 CaseClass,字段上限是 22

八、快速入门

5.1.5 Pojos 类型(PojoTypeInfo

       Pojos 类可以完成复杂数据类型的定义,Flink 通过实现 PojoTypeInfo 来描述任意的 POJOs,包括Java Scala 类。

       但是 Pojos 得遵循以下要求:

  • 类必须是 public 修饰,且必须独立定义,不能是内部类;
  • 必须包含默认空构造器;
  • 所有的 Fields 必须是 public 或者有 public 修饰的 getter setter 方法;
  • 类的字段类型必须是 Flink 支持的。

八、快速入门

5.2 返回类型

       由于 JVM 运行时候会擦除类型(泛型类型),导致 Flink 无法准确的获取到数据类型。因此,在使用 Java API 的时候,我们需要手动指定类型。使用 Scala 的时候则无需指定。所以,在一些特定的情况下,例如匿名函数或者使用泛型的情况下,我们需要明确的提供数据的类型信息,来提高我们程序的性能。

       具体使用 SingleOutputStreamOperator returns 方法来指定算子的返回数据类型。

  • returns(Class<T> typeClass) :使用 Class 的方式指定返回数据类型;
  • returns(TypeHint<T> typeHint) :使用 TypeHint 的方式指定返回数据类型,通常使用TypeHint 来指定泛型类型;
  • returns(TypeInformation<T> typeInfo) :使用 TypeInformation 指定。

5.2.1 TypeInformation

       TypeInformation Flink 类型系统的核心,是生成序列化/反序列化工具和 Comparator 的工具类。同时它还是连接 Schema 和编程语言内部类型系统的桥梁。

       可以使用 of 方法创建 TypeInformation

  • of(Class<T> typeClass) :从 Class 创建;
  • of(TypeHint<T> typeHint) :从 TypeHint 创建。

5.2.2 TypeHint

       由于泛型类型在运行时会被 JVM 擦除,所以说我们无法使用 TypeInformation.of(Xxx.class)方式指定带有泛型的类型。
       为了可以支持泛型类型,
Flink 引入了 TypeHint 。例如我们需要获取 Tuple2<String, Long> 的类型信息,可以使用如下方式:

八、快速入门

5.2.3 Types

       Flink 中经常使用的类型已经预定义在了 Types 中。它们的 Serializer/Deserializer Comparator 已经定义好了。所以,对于经常使用的类型,既可以使用 TypeHint 指定也可以使用Types 指定。例如 Tuple2<String, Integer> 类型我们可以使用如下方式:

八、快速入门

5.3 代码实现

八、快速入门

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://yjxxt.com/378.html

上一篇: 七、编程模型
下一篇: 九、运行环境
相关推荐
2024-08-03

        Flink 应用程序需要做的第一件事是设置它的执行环境。执行环境是确定程序在本地机器上运行…

862
2024-08-03

         构建一个典型的 Flink 流式程序需要以下几步:  设置执行环境; 从数据源中读取一条或多…

941
2024-08-03

一、分层 API         Flink 自底向上在不同的抽象级别提供了多种 API,并且针对常见的使用场景开…

531
2024-08-03

一、软件栈 Storage 层(存储层):Flink 本身并没有提供分布式文件系统,因此 Flink 的分析大多…

916
2024-08-03

         当前大数据领域主流的流式计算框架有 Apache Storm、Spark Streaming、Apache Flink 三种…

681
2024-08-03

一、事件驱动型应用 1.1 事件驱动型应用 事件驱动型应用是一种软件设计模式,它的工作方式就像在等…

547
发表评论
暂无评论

还没有评论呢,快来抢沙发~

点击联系客服

在线时间:9:00-18:00

客服电话

15201841284

客服邮箱

shanxn@yjxxt.com

扫描二维码

扫码加微信咨询