ProcessFunction系列函数

在继续介绍Flink时间和窗口相关操作之前,我们需要先了解一下ProcessFunction系列函数。它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限。如果想获取数据流中Watermark的时间戳,或者使用定时器,需要使用ProcessFunction系列函数。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。

目前,这个系列函数主要包括KeyedProcessFunctionProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessJoinFunctionProcessWindowFunction等多种函数,这些函数各有侧重,但核心功能比较相似,主要包括两点:

  • 状态:我们可以在这些函数中访问和更新Keyed State 。

  • 定时器(Timer):像定闹钟一样设置定时器,我们可以在时间维度上设计更复杂的业务逻辑。

状态的介绍可以参考第六章的内容,本节将重点介绍ProcessFunction系列函数时间功能上的相关特性。

Timer的使用方法

说到时间相关的操作,就不能避开定时器(Timer)。我们可以把Timer理解成一个闹钟,使用前先在Timer中注册一个未来的时间,当这个时间到达,闹钟会“响起”,程序会执行一个回调函数,回调函数中执行一定的业务逻辑。这里以KeyedProcessFunction为例,来介绍Timer的注册和使用。

ProcessFunction有两个重要的方法:processElement()onTimer(),其中processElement函数在源码中的Java签名如下:

// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)

processElement()方法处理数据流中的一条类型为I的元素,并通过Collector<O>输出出来。Context是它区别于FlatMapFunction等普通函数的特色,开发者可以通过Context来获取时间戳,访问TimerService,设置Timer。

ProcessFunction类中另外一个接口是onTimer()方法:

// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

这是一个回调函数,当到了“闹钟”时间,Flink会调用onTimer(),并执行一些业务逻辑。这里也有一个参数OnTimerContext,它实际上是继承了上面的那个Context,与Context几乎相同。

使用Timer的方法主要逻辑为:

  1. processElement()方法中通过Context注册一个未来的时间戳t。这个时间戳的语义可以是Processing Time,也可以是Event Time,根据业务需求来选择。
  2. onTimer()方法中实现一些逻辑,到达t时刻,onTimer()方法被自动调用。

Context中,我们可以获取一个TimerService,这是一个访问时间戳和Timer的接口。我们可以通过Context.timerService.registerProcessingTimeTimer()Context.timerService.registerEventTimeTimer()这两个方法来注册Timer,只需要传入一个时间戳即可。我们可以通过Context.timerService.deleteProcessingTimeTimerContext.timerService.deleteEventTimeTimer来删除之前注册的Timer。此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTimeContext.timerService.currentWatermark。这些方法中,名字带有“ProcessingTime”的方法表示该方法基于Processing Time语义;名字带有“EventTime”或“Watermark”的方法表示该方法基于Event Time语义。

我们只能在KeyedStream上注册Timer。每个Key下可以使用不同的时间戳注册不同的Timer,但是每个Key的每个时间戳只能注册一个Timer。如果想在一个DataStream上应用Timer,可以将所有数据映射到一个伪造的Key上,但这样所有数据会流入一个算子子任务。

我们再次以股票交易场景来解释如何使用Timer。一次股票交易包括:股票代号、时间戳、股票价格、成交量。我们现在想看一支股票未来是否一直连续上涨,如果一直上涨,则发送出一个提示。如果新数据比上次数据价格更高且目前没有注册Timer,则注册一个未来的Timer,如果在这期间价格降低则把刚才注册的Timer删除,如果在这期间价格没有降低,Timer时间到达后触发onTimer(),发送一个提示。下面的代码中,intervalMills表示一个毫秒精度的时间段,如果这个时间段内一支股票价格一直上涨,则会输出文字提示。

// 三个泛型分别为 Key、输入、输出
public static class IncreaseAlertFunction
    extends KeyedProcessFunction<String, StockPrice, String> {

    private long intervalMills;
    // 状态句柄
    private ValueState<Double> lastPrice;
    private ValueState<Long> currentTimer;

    public IncreaseAlertFunction(long intervalMills) throws Exception {
      	this.intervalMills = intervalMills;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 从RuntimeContext中获取状态
        lastPrice = getRuntimeContext().getState(
          new ValueStateDescriptor<Double>("lastPrice", Types.DOUBLE()));
        currentTimer = getRuntimeContext().getState(
          new ValueStateDescriptor<Long>("timer", Types.LONG()));
    }

    @Override
    public void processElement(StockPrice stock, Context context, Collector<String> out) throws Exception {

        // 状态第一次使用时,未做初始化,返回null
        if (null == lastPrice.value()) {
          	// 第一次使用lastPrice,不做任何处理
        } else {
            double prevPrice = lastPrice.value();
            long curTimerTimestamp;
            if (null == currentTimer.value()) {
              	curTimerTimestamp = 0;
            } else {
              	curTimerTimestamp = currentTimer.value();
            }
            if (stock.price < prevPrice) {
                // 如果新流入的股票价格降低,删除Timer,否则该Timer一直保留
                context.timerService().deleteEventTimeTimer(curTimerTimestamp);
                currentTimer.clear();
            } else if (stock.price >= prevPrice && curTimerTimestamp == 0) {
                // 如果新流入的股票价格升高
                // curTimerTimestamp为0表示currentTimer状态中是空的,还没有对应的Timer
                // 新Timer = 当前时间 + interval
                long timerTs = context.timestamp() + intervalMills;

                context.timerService().registerEventTimeTimer(timerTs);
                // 更新currentTimer状态,后续数据会读取currentTimer,做相关判断
                currentTimer.update(timerTs);
            }
        }
        // 更新lastPrice
        lastPrice.update(stock.price);
    }

    @Override
    public void onTimer(long ts, OnTimerContext ctx, Collector<String> out) throws Exception {
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        out.collect(formatter.format(ts) + ", symbol: " + ctx.getCurrentKey() +
                    " monotonically increased for " + intervalMills + " millisecond.");
        // 清空currentTimer状态
        currentTimer.clear();
    }
}

在主逻辑里,通过下面的process()算子调用KeyedProcessFunction

DataStream<StockPrice> inputStream = ...

DataStream<String> warnings = inputStream
                .keyBy(stock -> stock.symbol)
                // 调用process函数
                .process(new IncreaseAlertFunction(3000));

Checkpoint时,Timer也会随其他状态数据一起保存起来。如果使用Processing Time语义设置一些Timer,重启时这个时间戳已经过期,那些回调函数会立刻被调用执行。

侧输出

ProcessFunction的另一大特色功能是可以将一部分数据发送到另外一个流中,而且输出到的两个流数据类型可以不一样。这个功能被称为为侧输出(Side Output)。我们通过OutputTag<T>来标记另外一个数据流:

OutputTag<StockPrice> highVolumeOutput = 
  new OutputTag<StockPrice>("high-volume-trade"){};

ProcessFunction中,我么可以使用Context.output方法将某类数据过滤出来。OutputTag是这个方法的第一个参数,用来表示输出到哪个数据流。

public static class SideOutputFunction 
  extends KeyedProcessFunction<String, StockPrice, String> {
    @Override
    public void processElement(StockPrice stock, Context context, Collector<String> out) throws Exception {
        if (stock.volume > 100) {
          	context.output(highVolumeOutput, stock);
        } else {
          	out.collect("normal tick data");
        }
    }
}

在主逻辑中,通过下面的方法先调用ProcessFunction,再获取侧输出:

DataStream<StockPrice> inputStream = ...

SingleOutputStreamOperator<String> mainStream = inputStream
    .keyBy(stock -> stock.symbol)
    // 调用process函数,包含侧输出逻辑
    .process(new SideOutputFunction());

DataStream<StockPrice> sideOutputStream = mainStream.getSideOutput(highVolumeOutput);

其中,SingleOutputStreamOperatorDataStream的一种,它只有一种输出。下面是它在Flink源码中的定义:

public class SingleOutputStreamOperator<T> extends DataStream<T> {
  	...
}

这个例子中,KeyedProcessFunction的输出类型是String,而SideOutput的输出类型是StockPrice,两者可以不同。

在两个流上使用ProcessFunction

我们在DataStream API部分曾提到使用connect()将两个数据流的合并,如果想从更细的粒度在两个数据流进行一些操作,可以使用CoProcessFunctionKeyedCoProcessFunction。这两个函数都有processElement1()processElement2()方法,分别对第一个数据流和第二个数据流的每个元素进行处理。第一个数据流类型、第二个数据流类型和经过函数处理后的输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现两个数据流上的Join:

  • 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。
  • processElement1()方法处理第一个数据流,更新状态a。
  • processElement2()方法处理第二个数据流,根据状态a中的数据,生成相应的输出。

我们这次将股票价格结合媒体评价两个数据流一起讨论,假设对于某支股票有一个媒体评价数据流,媒体评价数据流包含了对该支股票的正负评价。两支数据流一起流入KeyedCoProcessFunctionprocessElement2()方法处理流入的媒体数据,将媒体评价更新到状态mediaState上,processElement1()方法处理流入的股票交易数据,获取mediaState状态,生成到新的数据流。两个方法分别处理两个数据流,共享一个状态,通过状态来通信。

在主逻辑中,我们将两个数据流connect(),然后按照股票代号进行keyBy(),进而使用process()

// 读入股票数据流
DataStream<StockPrice> stockStream = ...

// 读入媒体评价数据流
DataStream<Media> mediaStream = ...

DataStream<StockPrice> joinStream = stockStream.connect(mediaStream)
    .keyBy("symbol", "symbol")
    // 调用process函数
    .process(new JoinStockMediaProcessFunction());

KeyedCoProcessFunction的具体实现:

/**
  * 四个泛型:Key,第一个流类型,第二个流类型,输出。
  */
public static class JoinStockMediaProcessFunction extends KeyedCoProcessFunction<String, StockPrice, Media, StockPrice> {
    // mediaState
    private ValueState<String> mediaState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 从RuntimeContext中获取状态
        mediaState = getRuntimeContext().getState(
          	new ValueStateDescriptor<String>("mediaStatusState", Types.STRING));
    }

    @Override
    public void processElement1(StockPrice stock, Context context, Collector<StockPrice> collector) throws Exception {
        String mediaStatus = mediaState.value();
        if (null != mediaStatus) {
            stock.mediaStatus = mediaStatus;
            collector.collect(stock);
        }
    }

    @Override
    public void processElement2(Media media, Context context, Collector<StockPrice> collector) throws Exception {
        // 第二个流更新mediaState
        mediaState.update(media.status);
    }
}

这个例子比较简单,没有使用Timer,实际的业务场景中状态一般用到Timer将过期的状态清除。两个数据流的中间数据放在状态中,为避免状态的无限增长,需要使用Timer清除过期数据。

很多互联网APP的机器学习样本拼接都可能依赖这个函数来实现:服务端的机器学习特征是实时生成的,用户在APP上的行为是交互后产生的,两者属于两个不同的数据流,用户行为是机器学习所需要标注的正负样本,因此可以按照这个逻辑来将两个数据流拼接起来,通过拼接更快得到下一轮机器学习的样本数据。

使用Event Time时,两个数据流必须都设置好Watermark,只设置一个流的Event Time和Watermark,无法在CoProcessFunctionKeyedCoProcessFunction中使用Timer功能,因为process算子无法确定自己应该以怎样的时间来处理数据。