在传统的 Java 程序中,main 函数是整个 Project 的唯一执行起点,在 spring(boot) 等框架下,更是可以把很多全局生效的注解写到 main 方法所在的类上,很多 scheduler executor 也可以放到 main 中。

然而,因为分布式 Flink 集群与本地的 standalone flink 环境有很大差距。以及Flink 程序本质是并行和分布式的,执行过程中,一个数据流会像 kafka 一样分区,流数据的处理也会包含多个子任务,子任务之间是相互独立的,并且可能在不同的线程甚至不同的机器、容器中执行。所以实际上我们在 main 里的方法定义,和环境中从 source iterator 中读取的每一份数据所在的执行环境,完全是两个不同的 Runtime (?)。

为什么 standalone 模式的任务表现与 yarn 提交的任务表现不一致?

首先要理解一个 App Master (AM)的概念,AM 包含 Dispatcher,Resource Manager 和 Job Manager ,Dispatcher 负责接收提交的任务,并创建一个 Job Manager,而 Resource Manager 负责分配资源。

当用户提交作业的时候,提交脚本会首先启动一个 Client 进程
负责作业的编译与提交。它首先将用户编写的代码编译为一个 JobGraph,在这个过
程,它还会进行一些检查或优化等工作,例如判断哪些 Operator 可以 Chain 到同一
个 Task 中。然后,Client 将产生的 JobGraph 提交到集群中执行。此时有两种情
况,一种是类似于 Standalone 这种 Session 模式,AM 会预先启动,此时 Client
直接与 Dispatcher 建立连接并提交作业即可。另一种是 Per-Job 模式,AM 不会
预先启动,此时 Client 将首先向资源管理系统( 如 Yarn、K8S)申请资源来启动
AM,然后再向 AM 中的 Dispatcher 提交作业。

这也就是我觉得在 Session 模式下,大家共享了同一个预先启动的 App Master 和 Task Executor,所以导致了我可以在算子中,读取到在 main 函数中声明的变量或实例;但在 per-job 模式下,大家的一切都是独享的,就不可以这么做。

初衷是将数据流中,有共性并且可以延迟处理的内容通过缓存承载,然后在定时任务中聚合后再做执行。

第一版 Guava + scheduled thread pool

傻乎乎的以为 flink 任务只是分发的形式,所以采用了 Google 的 Guava 内存缓存,在 source 的每一份 stream 中,然后通过定义在 main 方法中的 executor.scheduleAtFixedRate 做定时任务,妄图拿到所有算子中存下的缓存:

ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10);
executor.scheduleAtFixedRate(new LazyOperationScheduler(), 30, 30, TimeUnit.SECONDS);

在 IDE 中 work,在本地的 standalone 模式节点中也 work,但是通过 yarn 提交到 hadoop 中的集群版本 flink 后,果然这部分代码根本没有执行。

我开始明白,stream handler 中的 memory 和 main 中的 memory 根本是两份不同的东西(确实不是同一份,但是这里还是想错了)。

第二版 Redis

既然不能共享执行环境的内存(缓存),那就用外部的缓存吧。

最终表现和第一版基本一致,在 hanlder 中写 Redis 当然没有问题,但是 web server 形式的定时任务,根本无法在 Flink 中正确执行,甚至连 log 都不会打出。

第三版 fake continuous source + time window

终于意识到,Flink 算子的执行,严格依赖数据流,没有流的存在,任何东西都不会持续执行。

于是设计了一个假的生成数据流推进,然后通过 time window 做定时的聚合,在聚合结果中,做想做的定时任务。

虚假的数据流生成方法:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import static java.lang.Thread.sleep;

public class RandomLongSource extends RichParallelSourceFunction<Long> {

private volatile boolean cancelled = false;
private Random random;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
}

@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (!cancelled) {
Long nextLong = random.nextLong();
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextLong);
}
sleep(30000);
}
}

@Override
public void cancel() {
cancelled = true;
}
}

数据流存储(这里什么也不做,只是把数据缓存下来,只为这份 DataStream 在后续做时间窗口操作):

DataStream<Long> randomLongs = env.addSource(new RandomLongSource())
.map((MapFunction<Long, Long>) s -> s);

然后就是最终实现定时任务的地方,对上面存储的数据流做时间窗口操作,这里是每30秒一次:

randomLongs.timeWindowAll(Time.seconds(30))
.apply(new AllWindowFunction<Long, Object, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Long> iterable, Collector<Object> collector) throws Exception {
long start = System.currentTimeMillis();
logger.info("Start periodic tasks...");
new LazyOperationScheduler().run();
logger.info("periodic task finished! take time: " + (System.currentTimeMillis() - start));
}
});

apply 是一个必须重写的方法,在这里我们可以拿到 iterator 形式的 DataStream,里面是符合窗口条件的数据。但是这里我们其实完全没有去碰这些无用的数据 ,我们只是为了在每30秒的这个时间节点,做自己想做的事,这里沿用之前的 Runnable ,因为每个数据流的处理都是独立的,也不存在节省资源、复用等。

这样一个融合在 Flink 中,依靠多 source (其中有一个 source 是我们模拟的)实现的定时任务就完成了,在集群 flink 中表现比较理想。

当然,如果 source 更复杂,或者定时任务更独立一些,我想我会把它提出去当另一个 job 去做了。

JDBC 无法连接的问题

本地 flink cluster 没问题,分布式 flink 偶尔复现的问题。

相关的 issue:

最初我发现是因为使用的 flink 版本太旧,以至于不支持自动注册 Driver,在 Task Manager 中 SPI 没有自动 load 或者失效了,于是改成了手动 Class.forname
后来发现直接在 sink 的 open method 中创建连接池,在 close method 中关闭销毁就好了。

冲突还比较多,集中在 google common 和 http client 还有 guava 等依赖里。

正常的 protobuf-java + aliyun-log-log4j-appender, ERROR: java.lang.NoClassDefFoundError: org/apache/http/conn/scheme/SchemeSocketFactory

这个问题是因为 httpclient 版本过低造成的。参考:https://stackoverflow.com/questions/11151519/java-lang-noclassdeffounderror-org-apache-http-conn-scheme-schemesocketfactory

把 protobuf-java 去掉,并且 exclude aliyun-log-log4j-appender 中的 httpclient,把 phoenix-core 中的 httpclient 也忽略掉。手动添加一个高版本的 httpclient 。

(在反复安装依赖的过程中,发现 Malformed reply from SOCKS server 的问题,把本地的 proxy/vpn 关掉)

因为对全流程 trace 和实时日志统计要求比较高,所以 flink 自带 ui 和停掉任务后汇聚各个节点的日志再看,对我来说是无法接收的。而修改环境变量是会影响所有的 Flink 任务。所以这里选择了自定义 Flink conf。

复制一份新的 flink conf,注意文件的软链接问题,在 flink conf 中的 log4j.properties 中做修改。在启动之前, export FLINK_CONF_DIR 为自定义的 conf,在声明的环境变量下提交任务。