Flink命令行界面
本文于2068天之前发表,文中内容可能已经过时。
Flink提供命令行界面(CLI)来运行打包为JAR文件的程序,并控制它们的执行。CLI是任何Flink设置的一部分,可在本地单节点设置和分布式设置中使用。它位于<flink-home>/bin/flink
默认情况下,并连接到从同一安装目录启动的正在运行的Flink主服务器(JobManager)。
使用命令行界面的先决条件是Flink主机(JobManager)已启动(通过 <flink-home>/bin/start-cluster.sh
)或YARN环境可用。
例子
运行没有参数的示例程序:
1
./bin/flink run ./examples/batch/WordCount.jar
使用输入和结果文件的参数运行示例程序:
1
2./bin/flink run ./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out运行带有并行性的示例程序16以及输入和结果文件的参数:
1
2./bin/flink run -p 16 ./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out运行禁用flink log输出的示例程序:
1
./bin/flink run -q ./examples/batch/WordCount.jar
以分离模式运行示例程序:
1
./bin/flink run -d ./examples/batch/WordCount.jar
在特定的JobManager上运行示例程序:
1
2
3./bin/flink run -m myJMHost:8081 \
./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out以特定类作为入口点运行示例程序:
1
2
3./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out使用具有2个TaskManagers 的每作业YARN群集运行示例程序:
1
2
3./bin/flink run -m yarn-cluster -yn 2 \
./examples/batch/WordCount.jar \
--input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out以Word为单位显示WordCount示例程序的优化执行计划:
1
2./bin/flink info ./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out列出计划和正在运行的作业(包括其JobID):
1
./bin/flink list
列出预定作业(包括其作业ID):
1
./bin/flink list -s
列出正在运行的作业(包括其作业ID):
1
./bin/flink list -r
列出所有现有工作(包括其作业ID):
1
./bin/flink list -a
列出在Flink YARN会话中运行Flink作业:
1
./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
取消工作:
1
./bin/flink cancel <jobID>
使用保存点取消作业:
1
./bin/flink cancel -s [targetDirectory] <jobID>
停止工作(仅限流处理工作):
1
./bin/flink stop <jobID>
修改正在运行的作业(仅限流式处理作业):. / bin/flink modify -p
注意:取消和停止(流处理)作业的区别如下:
在取消呼叫中,作业中的算子立即接收cancel()
方法调用以尽快取消它们。如果算子在取消呼叫后没有停止,Flink将开始定期中断线程,直到它停止。
“停止”呼叫是一种更优雅的方式来停止正在运行的流处理作业。Stop仅适用于使用实现StoppableFunction
接口的源的作业。当用户请求停止作业时,所有源都将接收stop()
方法调用。该工作将继续运行,直到所有资源正常关闭。这允许作业完成处理所有飞行数据。
保存点
保存点通过命令行客户端控制:
触发保存点
1 | ./bin/flink savepoint <jobId> [savepointDirectory] |
这将触发具有ID的作业的保存点jobId
,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。
此外,您可以选择指定目标文件系统目录以存储保存点。该目录需要可由JobManager访问。
如果未指定目标目录,则需要配置默认目录。否则,触发保存点将失败。
使用YARN触发保存点
1 | ./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId> |
这将触发具有ID jobId
和YARN应用程序ID 的作业的保存点yarnAppId
,并返回创建的保存点的路径。
其他所有内容与上面触发保存点部分中描述的相同。
使用保存点取消
您可以自动触发保存点并取消作业。
1 | ./bin/flink cancel -s [savepointDirectory] <jobID> |
如果未配置保存点目录,则需要为Flink安装配置默认保存点目录。
只有保存点成功,才会取消该作业。
恢复保存点
1 | ./bin/flink run -s <savepointPath> ... |
run命令有一个保存点标志来提交作业,该作业从保存点恢复其状态。savepoint trigger命令返回保存点路径。
默认情况下,我们尝试将所有保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业恢复的保存点状态,可以设置allowNonRestoredState
标志。如果在触发保存点并且仍想使用保存点时从程序中删除了作为程序一部分的 算子,则需要允许此 算子操作。
1 | ./bin/flink run -s <savepointPath> -n ... |
如果您的程序删除了属于保存点的 算子,这将非常有用。
配置保存点
1 | ./bin/flink savepoint -d <savepointPath> |
在给定路径处理保存点。savepoint trigger命令返回保存点路径。
如果使用自定义状态实例(例如自定义还原状态或RocksDB状态),则必须指定触发保存点的程序JAR的路径,以便使用用户代码类加载器处理保存点:
1 | ./bin/flink savepoint -d <savepointPath> -j <jarFile> |
否则,你会遇到一个ClassNotFoundException
。
用法
命令行语法如下:
1 | ./flink <ACTION> [OPTIONS] [ARGUMENTS] |