Hello,flink
简介
Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算
是一个大数据的引擎,有状态的数据计算。
关键是数据流
为什么选择Flink
-
流数据更真实的反映了我们的生活方式
真实的情况是,数据是源源不断的,比如订单、弹幕、日志、用户行为、聊天信息等。
但是如果要是来一条处理一条,对机器的性能要求会比较高,比较好的方法是让它攒一波再处理。
-
传统的数据架构师基于有限数据集的
spark streaming可以做这个事情,但是还是有一定的延迟(秒级)
现在我们要做的是做到毫秒级,并且低延迟、高吞吐,这就是flink要做的。
-
我们的目标
-
低延迟
-
高吞吐
-
结果的准确性和良好的容错性
分布式系统在传输的过程中有可能会乱序,导致数据出现一些问题。
容错性:有一个节点挂了之后,系统要能够会滚到一个比较近的状态
-
flink的应用
电商和市场营销
数据报表、广告投放、业务流程需要
物联网
传感器实时数据采集和显示、实时报警、交通运输业
电信业
基站流量调配
银行和金融业
实时结算和通知推送,实时检测异常行为
数据特别大的时候计算起来会很慢,用hive,sql语句甚至可能达到几个小时那么多
我们可以变通一下,只要来一个数据,我们就把数据算一下。
数据处理的结构
传统的数据处理架构
所有发过来的数据都是一个事件
优点
- 实时性很好
缺点
- 就是并发性太差了
数据越来越多的时候该怎么办?
分析处理
数据显暂时放在业务数据库里面,需要数据的时候先ETL数据清洗,然后再放在数据仓库里面,然后再响应
优点
- 不用做联表查询
缺点
- 实时性没有那么好
有状态的流式处理
直接把数据放在本地内存中,用本地内存的状态,代替了表,在高并发的时候可以做集群。
如果想要保持数据持久化,设置一个周期检查点(CheckPoint),定期缓存。
缺点
- 分布式情况下,因为网络的原因,数据的顺序就有了一些问题
第二代流处理
lambda架构
用两套系统(流处理、批处理),同时保证低延迟和结果准确
流处理保证速度,把处理结果放在结果表中,同时数据也做一个批处理,最后把batch和speed表结合,得到结果。
用户看到的结果是很快的有一个数据的结果(不准确),隔一段时间之后,再获取精确的结果
缺点
- 需要维护两套系统
- 两个表算的不一样,实际开发的话会很麻烦
流处理低延迟,但是结果不是很准确,批处理结果稳定,延迟比较少
第三代流处理
Flink
flink的特点
- 事件驱动
要有一个程序一直运行着,监听数据的到来。
-
基于流的世界观
在flink的世界观中,一切都是流组成的,离线数据是有界的流
实时数据是一个没有界限的流,这就是所谓的有界流和无界流
-
分层api
- 越往上越抽象,表达含义越简明,使用越方便
- 越底层越具体,表达能力越丰富,使用越灵活
SQL和Table API还在丰富和发展过程中,阿里的blink做的还挺强。data stream api层用的最多,批处理用的是data set api
如果data stream还不够用的话,可以用processFunction api来处理。
- 精确一次(exactly-once)的状态一致性保证
- 低延迟,每秒处理百万个事件,毫秒级延迟
- 与众多常用的存储系统的连接
- 高可用,动态沱镇,实现7*24小时全天候运行
flink vs spark streaming
流(stream)和微批(micro-batching)
数据模型
- spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
- flink基本数据模型是数据流,以及事件(Event)序列
运行时架构
- spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
- flink是标准的流执行模式,一个事件在一个节点处理完后可以直接法网下一个节点进行处理
简单操作
scale版 配置maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-learning-01</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 主要做编译-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 用来打包的 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<!-- 后缀,表示把依赖都打进去 -->
<descriptorRefs>
<descriptorRef>jar-width-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
java版依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-learning-01</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
</project>
批处理
// 批处理
public class WordCount {
public static void main(String[] args) throws Exception{
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
String inputPath = "/Users/cjp/bilibili-workspace/flink-learning-01/src/main/resources/hello.txt";
DataSource<String> inputDataSet = env.readTextFile(inputPath);
// 对数据集进行处理,按照空格分词展开,转换成(word,1)二元组进行统计
DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyflatMapper()).groupBy(0) // 按照第一个位置的word分组
.sum(1);// 将第二个位置上的数据求和
resultSet.print();
}
// 自定义类,实现flatMapFunction接口
public static class MyflatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 按空格分词
String[] words = value.split(" ");
// 遍历所有word,包成二元组输出
for(String word:words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
流处理
public class StreamWordCount {
public static void main(String[] args) throws Exception{
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置执行线程
/*
前面的标记就是线程数
2> (javascript,1)
1> (hello,1)
2> (vue,1)
3> (golang,1)
1> (hello,2)
1> (hello,3)
1> (java,1)
1> (hello,4)
*/
env.setParallelism(3);
// 从文件中读取数据
// String inputPath = "/Users/cjp/bilibili-workspace/flink-learning-01/src/main/resources/hello.txt";
// DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 用parameter tool工具从程序启动参数中提取配置项
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
// 从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(host, port);
// 基于数据流进行转换计算
SingleOutputStreamOperator resultStream = inputDataStream.flatMap(new WordCount.MyflatMapper())
// 按照当前key的hash code进行重分区
.keyBy(0)
.sum(1);
resultStream.print();
// 执行任务
env.execute();
}
}
设置启动参数
控制台开启7777端口,模拟业务中不断的发送数据
➜ flink-learning-01 nc -lk 7777
hello
你好,我叫陈家鹏 哈哈
这是一个linux的端口 可以用来发送数据
控制台输出如下
1> (这是一个linux的端�端口,1)
3> (可以用来发送数据,1)
部署
Standalone模式
不依赖于其他的工具,可以自己单独部署
安装
解压缩flink-1.10.1-bin-scala_2.12.tgz(http://archive.apache.org/dist/flink/flink-1.10.1/flink-1.10.1-bin-scala_2.12.tgz),进入到conf目录中
Bin
├── bash-java-utils.jar
├── config.sh
├── find-flink-home.sh
├── flink # 最重要的命令,如果想要在集群上提交一个作业,停止,取消,都可以用flink命令
├── flink-console.sh
├── flink-daemon.sh
├── historyserver.sh
├── jobmanager.sh # 管理作业,比较核心
├── kubernetes-entry.sh
├── kubernetes-session.sh
├── mesos-appmaster-job.sh
├── mesos-appmaster.sh
├── mesos-taskmanager.sh
├── pyflink-gateway-server.sh
├── pyflink-shell.sh
├── sql-client.sh
├── standalone-job.sh
├── start-cluster.sh # 启动集群
├── start-scala-shell.sh
├── start-zookeeper-quorum.sh
├── stop-cluster.sh # 停止集群
├── stop-zookeeper-quorum.sh
├── taskmanager.sh
├── yarn-session.sh
└── zookeeper.sh
conf
.
├── flink-conf.yaml
├── log4j-cli.properties
├── log4j-console.properties
├── log4j-yarn-session.properties
├── log4j.properties
├── logback-console.xml
├── logback-yarn.xml
├── logback.xml
├── masters
├── slaves
├── sql-client-defaults.yaml
└── zoo.cfg
1)修改flink/conf/flink-conf.yaml文件
jobmanager.heap.size: 1024m # job manager的堆内存大小(因为jobmanager运行在jvm上面)
taskmanager.memory.process.size: 1728m # 当前taskmanager整个内存的内存的总大小
# taskmanager既有堆内存也有堆外内存,flink是有状态的流式计算,这些状态是放在内存中的
taskmanager.numberOfTaskSlots: 1# 不同的插槽,不同的slot上可以运行不同的线程
parallelism.default: 1 # 默认并行度,默认用一个线程执行
# numberOfTaskSlots和parallelism的区别,
# numberOfTaskSlots是表示最大的能力,并且是针对一个taskmanager
# parallelism是实际跑多少个,针对总体
2)cat masters
# 提交job的入口,jobmanager地址
localhost:8081
3)cat slaves
# taskmanager地址
localhost
4)运行
切到bin
./start-cluster.sh
available和配置文件中的taskmanager.numberOfTaskSlots: 1一致
命令提交job
1)提交
./bin/flink run -c com.atguigu.ec.StreamWordCount -p 3 /Users/cjp/bilibili-workspace/flink-learning-01/target/flink-learning-01-1.0-SNAPSHOT.jar --host localhost --port 7777
# c 启动类
# p 并行度
# 后面的就是启动参数
2)查看所有job
./bin/flink list
3)取消
./bin/flink cancel job_id
k8s和yarn部署
yarn
yarn有两种模式,分别为Session-Cluster和Per-Job-Cluster两种模式
- Session-Cluster
Session-Cluster模式需要先启动集群,然后提交作业,接着会想yarn申请一块空尽之后,资源永远保持不变,如果资源满了,下一个作业就无法提交,只能等到yarn中的启动一个作业执行完之后,释放了资源,下一个作业才会正常提交。
所有的作业共享Dispather和ResourceManager共享资源,适合规模小,执行时间短的作业
在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都会向这里提交,这个flink集群会常驻在yarn集群中,除非手工停止。
- Per-Job-Cluster
一个job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,知道作业执行完成,一个作业的失败与否,并不会影响下一个作业的正常提交和运行,独享Dispatcher和ResourceManager,按需接受资源申请,适合规模大长时间运行的作业
每次提交都会创建一个新的flink集群,任务之间相互独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失