时间语义

Flink中的时间有三种:

  • 事件时间 Event Time. 事件真实发生的时间.
  • 摄入时间 Ingestion time. 事件接入到Flink系统的时间
  • 处理时间 Processing Time. 事件到到当前算子的时间
    举一个夸张点的例子: 有一条记录, 它与11:00:00这个时间点产生. 我们的Flink系统在12:00:00这个时间点接入并进入第一个算子. 在Flink系统中又有很多个算子, 到达最后一个算子的时间为13:00:00.
    那么在这种情况下: 事件时间是11:00:00. 这个是不会变的. 对于第一个算子而言, 这时的摄入时间和处理时间都为12:00:00. 对于最后一个算子而言, 这时这条时间的摄入时间为12:00:00. 处理时间为13:00:00

设置时间语义
我们需要在Job中设置执行时采用哪种时间语义.

1
2
3
4
5
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

Processing Time & Ingestion Time

这两个时间都由Flink系统自己生成.
Processing Time是由每个算子自己生成, 实现起来非常简单, 延迟也是最小的. 但是由于每个时间都是获取的当前算子的时钟, 时钟可能不一致, 并且由于集群中不同机器的执行性能不同, 每个算子也有一定的耗时, 对于第N个算子来说的相同Processing Time, 可能到第N+1个算子上就会有改变. 因此Processing Time在时间窗口下的计算会有不确定性.
Ingestion Time是指事件到底Flink Source的时间. 一个事件在整个处理过程中都使用这个时间. 但是Ingestion Time也还是无法解决事件乱序问题.

这两个时间语义如果对事件进行重新消费, 也不能保证幂等性.

Event Time

事件时间是这个事件真实产生的时间, 发生时伴随其他信息一起写入到时间中.
但是由于在网络中的传输或其他问题, 可能导致事件到底Flink系统时发生乱序、迟到等现象.

真实情况中的数据大概如上图所示, 我们可以知道在Flink中进行处理的时间必然是大于等于事件发生的时间, 也就是事件都应该在这条红色虚线以下.
对于红色虚线上的点, 如上图的红色事件, 在12:10收到了12:20的事件, 这是一条未来的事件, 必须要对这条事件进行处理, 比如忽略或者对事件时间进行修改等, 不然会造成后续计算上的错误.
而对于蓝色的事件, 在12:10收到了11:50的事件, 这个事件是历史事件, 如果使用Flink作为批处理系统或者重置Offset后重刷历史, 这个都属于正常事件.
再来看一下事件时间发生在12:10的一系列事件, 它可以在12:10之后的任一时间到达Flink系统

Watermark

当使用Event Time来进行处理时, 通过上图可知某个时间点的数据会在未来的任意时间到达, 我们需要设置一个界限从而避免无限制的等待, 也就是需要知道我们接入的数据需要何时去触发计算.
watermark是一条特殊的记录, 从代码中可以看到它继承自StreamElement

1
2
3
public final class Watermark extends StreamElement {
...
}

如何生成Watermark:

在Flink中, 可以直接在Source算子上生成Watermark, 也可以在其他算子上生成. 推荐是在Source算子上直接生成, 因为这样可以更加准确.
在Source算子中可以调用SourceContext中的方法直接生成Watermark.

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface SourceFunction<T> extends Function, Serializable {  

void run(SourceContext<T> ctx) throws Exception;

interface SourceContext<T> {

void collect(T element);

void collectWithTimestamp(T element, long timestamp);

void emitWatermark(Watermark mark);
}
}

亦可在其他算子上调用assignTimestampsAndWatermarks(watermarkStrategy)生成, watermarkStrategy中接口中包含了很多的默认方法, 其中只有一个方法需要实现即
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
WaermarkGenerator的代码如下

1
2
3
4
5
6
7
public interface WatermarkGenerator<T> {  
// 每条事件调用一次
void onEvent(T event, long eventTimestamp, WatermarkOutput output);

// 间隔ExecutionConfig setAutoWatermarkInterval(long interval)调用一次该方法
void onPeriodicEmit(WatermarkOutput output);
}

通过代码可以看出, 这两个方法都可以实现watermark的生成. 但是watermark如果太多也不是一件很好的事情, 很用可能造成下游算子压力过大. 影响整体性能.

看一个Flink内部的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {  

/** The maximum timestamp encountered so far. */
private long maxTimestamp;

/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;

public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
...
}

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}

该实现中, 定义了一个超时时间, 在onEvent中并没有生成Watermark, 只是进行了内部的计算保存了一个当前时刻的最大事件时间, 而在onPeriodicEmit方法中才真正的生成Watermark
从该实现中, 我们可以看到Flink会定时得到一个Watermark, 这个Watermark的时间是当前最大事件时间减去一个容忍时间.

如果我们设置maxOutOfOrderness为10分钟, 在橙色事件(eventTime为12:10)到来时, 就会生成一个12:00的Watermark, 后续再接收到的紫色事件(12:00)则被认为是迟到事件, 不会参与到后续的计算中. 同样在黄色事件(eventTime为12:20)的事件到来时会生成一个12:10的Watermark, 后续再收到的小于12:10的事件都会被认为是迟到事件

当我们设置的窗口为滚动窗口, 时间大小为10分钟, 容忍时间为10分钟, 时间语义为事件时间时
当黄色事件(eventTime=12:20)的事件到达时会生成一个12:10的Watermark, 这时会触发(12:00-12:10)窗口的计算(因为窗口大小为10分钟), 即计算图中黄色框中部分
当蓝色事件(eventTime=12:30)的事件到达时会生成一个12:20的Watermark, 这时会计算图中蓝色框中的数据, 这部分的数据事件时间都是12:10~12:20.

WindowAssigner

WindowAssigner的作用是对数据进行窗口的划分
来看下这个抽象类的方法:

1
2
3
4
5
6
7
8
public abstract Collection<W> assignWindows(  
T element, long timestamp, WindowAssignerContext context);

public abstract boolean isEventTime();

public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

主要的方法为assignWindow, 对传入的element划分到一个或多个窗口内.
看几个主要的实现类:

  • TumblingProcessingTimeWindows

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public Collection<TimeWindow> assignWindows(  
    Object element, long timestamp, WindowAssignerContext context) {
    final long now = context.getCurrentProcessingTime();
    if (staggerOffset == null) {
    staggerOffset =
    windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
    }
    long start =
    TimeWindow.getWindowStartWithOffset(
    now, (globalOffset + staggerOffset) % size, size);
    return Collections.singletonList(new TimeWindow(start, start + size));
    }

    获取当前的处理时间, 然后计算出当前窗口的开始时间start, 返回一个时间窗口

  • SlidingProcessingTimeWindows

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public Collection<TimeWindow> assignWindows(  
    Object element, long timestamp, WindowAssignerContext context) {
    timestamp = context.getCurrentProcessingTime();
    List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
    long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
    for (long start = lastStart; start > timestamp - size; start -= slide) {
    windows.add(new TimeWindow(start, start + size));
    }
    return windows;
    }

    获取当前的处理时间, 然后根据size和slide计算出一个元素会处于多少个窗口, 然后计算并设置每个窗口的起止时间.

Flink中内置的一些窗口类型:

Flink里面的时间默认从1970年1月1日0点0分开始计算,可以手动指定offset

滑动窗口(Sliding Windows)

1
2
3
4
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

-- 手动指定offset
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))

每5秒计算一次,每次计算的窗口大小为10秒

滚动窗口(Tumbling Windows)

滚动窗口是一种特殊的滑动窗口,步长跟窗口大小一致

1
.window(TumblingEventTimeWindows.of(Time.seconds(5)))

每5秒计算一次,每次窗口大小为5秒

会话窗口(Session Windows)

进行keyBy之后,这组数据如果超过一定时长后没有新的数据产生则会触发窗口计算,这个窗口内的时间长度无法确定,数据数量也无法确定

1
2
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))

一个user如果超过1分钟没有数据则触发计算

Trigger

Trigger的作用是来计算窗口内的元素是否需要被计算.
首先来看一下Trigger类的几个主要方法:

1
2
3
4
5
6
7
8
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)  
throws Exception;

public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception;

public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception;

这三个方法的作用从名称就可以看出来, 分别是当一条数据到来时调用, 当Processing Time触发时调用, 当Event Time触发时调用.
先看一下返回结果, 返回结果都是TriggerResult这个类. 这个类是一个枚举类, 包含了以下几个值:

  • CONTINUE
  • FIRE
  • PURGE
  • FIRE_AND_PURGE
    这几个枚举的作用表明了后续的计算方式.
    如果是CONTINUE, 则表示不进行处理.
    如果是FIRE, 则表示需要进行计算
    如果是PURGE, 则表示需要清空当前窗口内的元素. 不会触发计算
    如果是FIRE_AND_PURGE, 则表示计算的同时也清空窗口内的元素.

Flink内置了以下几个内置的触发器:

  • CountTrigger
    当事件条数达到设定的阈值后触发
  • DeltaTrigger
    预先给定一个DeltaFunction和阈值, 每条事件到达后都会根据DeltaFunction进行计算, 如果计算结果超过阈值, 则触发计算
  • ProcessingTimeTrigger
    当处理时间超过窗口结束时间时触发
  • EventTimeTrigger
    当事件事件(Watermark)超过窗口结束时间时触发
  • ContinuousProcessingTimeTrigger
    给定一个时间间隔, 按照处理时间连续触发
  • ContinuousEventTimeTrigger
    给定一个时间间隔, 按照事件事件连续触发
  • PurgingTrigger
    包装其他的触发器, 使其触发之后, 清除窗口内的数据和状态

通过代码来看一下具体的实现:
先来看一下CountTrigger的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override  
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}

private final long maxCount;

private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}

可以看到在CountTrigger的onEventTimeonProcessingTime方法中都没有做任何逻辑处理, 直接返回CONTINUE.
onElement方法中, 做了一个计数器, 当条数超过阈值后, 首先将计数器进行清零, 然后触发计算.
再看一下ProcessingTimeTrigger的代码具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)  
throws Exception {
return TriggerResult.CONTINUE;
}

public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}

public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}

onEventTime方法中不做任何处理, 直接返回CONTINUE
onElement中向context中注册一个ProcessingTimeTimer, 触发的事件为当前window的最大时间
当context中注册的ProcessingTimeTimer到时后, 会调用onProcessingTime方法, 这个方法直接返回FIRE

自定义触发器

有时官方提供的这些触发器可能无法满足我们的需求, 我们可以自己来实现一些自定义的触发器, 从上面的几个源码中, 我们可以看到主要需要实现的三个方法.onElement, onProcessingTime,onEventTime.
假如我们需要实现一个如下的触发器: 每10s触发一次, 并且如果10s内的数据量超过100条,则进行触发, 触发后重新计时, 按照ProcessingTime处理
这个类似于上面两个CountTriggerProcessingTimeTrigger的结合. 可以写出如下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class CustomTrigger<W extends Window> extends Trigger<Object, W> {  

// 触发的条数
private final long size;
// 触发的间隔时长
private final long interval;

private static final long serialVersionUID = 1L;
// 条数计数器
private final ReducingStateDescriptor<Long> countStateDesc = new ReducingStateDescriptor<>("count", new ReduceSum(), LongSerializer.INSTANCE);
// 时间计数器,保存下一次触发的时间
private final ReducingStateDescriptor<Long> timeStateDesc = new ReducingStateDescriptor<>("fire-interval", new ReduceMin(), LongSerializer.INSTANCE);

public CustomTrigger(long size, long interval) {
this.size = size;
this.interval = interval;
}

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
// 获取当前的条数
ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
// 获取下次的触发时间
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
// 每条数据 counter + 1
count.add(1L);
if (count.get() >= size) {
// 满足条数的触发条件,先清零条数计数器
count.clear();
// 满足条数时也需要清除时间的触发器
ctx.deleteProcessingTimeTimer(fireTimestamp.get());
fireTimestamp.clear();
// fire 触发计算
return TriggerResult.FIRE;
}
// 如果条数没有达到阈值,并且下次触发时间为空,则注册下次的触发时间
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long nextFireTimestamp = timestamp + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
}
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
// 获取当前的条数
ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
// 获取设置的触发时间
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
count.clear();
fireTimestamp.clear();
return TriggerResult.FIRE;
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
long timestamp = fireTimestamp.get();
ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear();
count.clear();
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(W window,
OnMergeContext ctx) {
ctx.mergePartitionedState(timeStateDesc);
ctx.mergePartitionedState(countStateDesc);
}

}


class ReduceSum implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}


class ReduceMin implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}

主要的实现的两个方法:onElement, onProcessingTime.
onElement方法中, 每一条记录进入条数计数器加一, 当超过阈值时清空两个状态变量, 同时取消下次的ProcessingTime触发时间. 如果没有达到阈值, 并且下次下次触发时间还未设置时, 计算得到下次触发时间注册到context中.
onProcessingTime方法中, 清空两个状态变量后进行触发.

这里的返回结果都是FIRE, 不会清空窗口内的元素, 如果需要清空可以修改为FIRE_AND_PURGE 或者使用PurgingTrigger类来进行封装.
看一下PurgingTrigger的代码

1
2
3
4
5
6
7
8
9
10
11
12
private Trigger<T, W> nestedTrigger;  

private PurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}

@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
throws Exception {
TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

调用包装的其他触发器, 如果是结果FIRE则返回FIRE_AND_PURGE

WindowOperator

触发器Trigger只是返回了一个是否要触发计算的结果, 谁来调用了触发器以及进行后续的计算呢? 就是WindowOperator.
WindowedStream中, 会进行WindowOperator的构建.

1
2
3
4
5
6
7
8
9
10
11
12
public WindowOperator(  
WindowAssigner<? super IN, W> windowAssigner,
TypeSerializer<W> windowSerializer,
KeySelector<IN, K> keySelector,
TypeSerializer<K> keySerializer,
StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
InternalWindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
long allowedLateness,
OutputTag<IN> lateDataOutputTag) {
...
}

在WindowOperator类中, 可以看到包含了计算所需要的信息, 窗口如何进行划分, key的选择器, 窗口的计算函数, 触发器等.
在这个类中, 也有三个方法:

1
2
3
4
5
public void processElement(StreamRecord<IN> element) throws Exception {}

public void onEventTime(InternalTimer<K, W> timer) throws Exception {}

public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {}

先来看一下processElement方法, 我们将源码简化一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void processElement(StreamRecord<IN> element) throws Exception {  
// 先调用窗口划分器, 进行窗口的划分
final Collection<W> elementWindows =
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);

// if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;

final K key = this.<K>getKeyedStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {
...
} else {
for (W window : elementWindows) {
// 先判断窗口是否已经过期, 如果过期则不进行后续的处理
if (isWindowLate(window)) {
continue;
}
...
// 调用触发器, 得到是否要计算的结果
TriggerResult triggerResult = triggerContext.onElement(element);

if (triggerResult.isFire()) {
// 获取窗口的元素
ACC contents = windowState.get();
if (contents == null) {
continue;
}
// 进行计算
emitWindowContents(window, contents);
}

if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
...
}
}

protected boolean isWindowLate(W window) {
return (windowAssigner.isEventTime()
&& (cleanupTime(window) <= internalTimerService.currentWatermark()));
}

private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
// 调用函数进行计算
userFunction.process(
triggerContext.key, window, processContext, contents, timestampedCollector);
}