在传统的 Java 程序中,main 函数是整个 Project 的唯一执行起点,在 spring(boot) 等框架下,更是可以把很多全局生效的注解写到 main 方法所在的类上,很多 scheduler executor 也可以放到 main 中。

然而,因为分布式 Flink 集群与本地的 standalone flink 环境有很大差距。以及Flink 程序本质是并行和分布式的,执行过程中,一个数据流会像 kafka 一样分区,流数据的处理也会包含多个子任务,子任务之间是相互独立的,并且可能在不同的线程甚至不同的机器、容器中执行。所以实际上我们在 main 里的方法定义,和环境中从 source iterator 中读取的每一份数据所在的执行环境,完全是两个不同的 Runtime (?)。

为什么 standalone 模式的任务表现与 yarn 提交的任务表现不一致?

首先要理解一个 App Master (AM)的概念,AM 包含 Dispatcher,Resource Manager 和 Job Manager ,Dispatcher 负责接收提交的任务,并创建一个 Job Manager,而 Resource Manager 负责分配资源。

当用户提交作业的时候,提交脚本会首先启动一个 Client 进程
负责作业的编译与提交。它首先将用户编写的代码编译为一个 JobGraph,在这个过
程,它还会进行一些检查或优化等工作,例如判断哪些 Operator 可以 Chain 到同一
个 Task 中。然后,Client 将产生的 JobGraph 提交到集群中执行。此时有两种情
况,一种是类似于 Standalone 这种 Session 模式,AM 会预先启动,此时 Client
直接与 Dispatcher 建立连接并提交作业即可。另一种是 Per-Job 模式,AM 不会
预先启动,此时 Client 将首先向资源管理系统( 如 Yarn、K8S)申请资源来启动
AM,然后再向 AM 中的 Dispatcher 提交作业。

这也就是我觉得在 Session 模式下,大家共享了同一个预先启动的 App Master 和 Task Executor,在执行时的模式也是单进程多线程的形态,所以导致了我可以在算子中,读取到在 main 函数中声明的变量或实例;但在 per-job 模式下,就不可以这么做。

初衷是将数据流中,有共性并且可以延迟处理的内容通过缓存承载,然后在定时任务中聚合后再做执行。

第一版 Guava + scheduled thread pool

傻乎乎的以为 flink 任务只是分发的形式,所以采用了 Google 的 Guava 内存缓存,在 source 的每一份 stream 中,然后通过定义在 main 方法中的 executor.scheduleAtFixedRate 做定时任务,妄图拿到所有算子中存下的缓存:

ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10);
executor.scheduleAtFixedRate(new LazyOperationScheduler(), 30, 30, TimeUnit.SECONDS);

在 IDE 中 work,在本地的 standalone 模式节点中也 work,但是通过 yarn 提交到 hadoop 中的集群版本 flink 后,果然这部分代码根本没有执行。

我开始明白,stream handler 中的 memory 和 main 中的 memory 根本是两份不同的东西(确实不是同一份,但是这里还是想错了)。

第二版 Redis

既然不能共享执行环境的内存(缓存),那就用外部的缓存吧。

最终表现和第一版基本一致,在 hanlder 中写 Redis 当然没有问题,但是 web server 形式的定时任务,根本无法在 Flink 中正确执行,甚至连 log 都不会打出。

第三版 fake continuous source + time window

终于意识到,Flink 算子的执行,严格依赖数据流,没有流的存在,任何东西都不会持续执行。

于是设计了一个假的生成数据流推进,然后通过 time window 做定时的聚合,在聚合结果中,做想做的定时任务。

虚假的数据流生成方法:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import static java.lang.Thread.sleep;

public class RandomLongSource extends RichParallelSourceFunction<Long> {

private volatile boolean cancelled = false;
private Random random;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
}

@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (!cancelled) {
Long nextLong = random.nextLong();
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextLong);
}
sleep(30000);
}
}

@Override
public void cancel() {
cancelled = true;
}
}

数据流存储(这里什么也不做,只是把数据缓存下来,只为这份 DataStream 在后续做时间窗口操作):

DataStream<Long> randomLongs = env.addSource(new RandomLongSource())
.map((MapFunction<Long, Long>) s -> s);

然后就是最终实现定时任务的地方,对上面存储的数据流做时间窗口操作,这里是每30秒一次:

randomLongs.timeWindowAll(Time.seconds(30))
.apply(new AllWindowFunction<Long, Object, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Long> iterable, Collector<Object> collector) throws Exception {
long start = System.currentTimeMillis();
logger.info("Start periodic tasks...");
new LazyOperationScheduler().run();
logger.info("periodic task finished! take time: " + (System.currentTimeMillis() - start));
}
});

apply 是一个必须重写的方法,在这里我们可以拿到 iterator 形式的 DataStream,里面是符合窗口条件的数据。但是这里我们其实完全没有去碰这些无用的数据 ,我们只是为了在每30秒的这个时间节点,做自己想做的事,这里沿用之前的 Runnable ,因为每个数据流的处理都是独立的,也不存在节省资源、复用等。

这样一个融合在 Flink 中,依靠多 source (其中有一个 source 是我们模拟的)实现的定时任务就完成了,在集群 flink 中表现比较理想。

当然,如果 source 更复杂,或者定时任务更独立一些,我想我会把它提出去当另一个 job 去做了。

(慢慢感受 flink 的精妙设计