用户自定义函数

我们在Transformations部分中介绍了常用的一些操作,可以发现,使用Flink的算子必须进行自定义,自定义时可以使用Lambda表达式,也可以继承并重写函数类。本节将从源码和案例两方面对用户自定义函数进行一个总结和梳理。

函数类

对于mapflatMapreduce等方法,我们可以实现MapFunctionFlatMapFunctionReduceFunctioninterface接口。这些函数类签名中都有泛型参数,用来定义该函数的输入或输出的数据类型。我们要继承这些类,并重写里面的自定义函数。以flatMap对应的FlatMapFunction为例,它在源码中的定义为:

package org.apache.flink.api.common.functions;

@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
  	void flatMap(T value, Collector<O> out) throws Exception;
}

这是一个函数式接口类,它继承了Flink的Function函数式接口。我们在第二章中提到函数式接口,这正是只有一个抽象函数方法的接口类,其目的是为了方便应用Java Lambda表达式。此外,它还继承了Serializable,以便进行序列化,这是因为这些函数在运行过程中要发送到各个实例上,发送前后要进行序列化和反序列化。需要注意的是,使用这些函数时,一定要保证函数内的所有内容都可以被序列化。如果有一些不能被序列化的内容,或者使用接下来介绍的RichFunction函数类,或者重写Java的序列化和反序列化方法。

进一步观察FlatMapFunction发现,这个函数类有两个泛型T和O,T是输入,O是输出。在继承这个接口类时,要设置好对应的输入和输出数据类型,否则会报错。我们最终其实是要重写虚函数flatMap,函数的两个参数也与输入输出的泛型类型对应。参数valueflatMap的输入,数据类型是T,参数outflatMap的输出,它是一个Collector,从Collector命名可以看出它起着收集的作用,最终输出成一个数据流,我们需要将类型为O的数据写入out

下面的例子继承FlatMapFunction,并实现flatMap,只对长度大于limit的字符串切词:

// 使用FlatMapFunction实现过滤逻辑,只对字符串长度大于 limit 的内容进行词频统计
public static class WordSplitFlatMap implements FlatMapFunction<String, String> {

    private Integer limit;

    public WordSplitFlatMap(Integer limit) {
      	this.limit = limit;
    }

    @Override
    public void flatMap(String input, Collector<String> collector) throws Exception {
      	if (input.length() > limit) {
        		for (String word: input.split(" "))
        			collector.collect(word);
      }
    }
}

DataStream<String> dataStream = senv.fromElements("Hello World", "Hello this is Flink");
DataStream<String> functionStream = dataStream.flatMap(new WordSplitFlatMap(10));

Lambda表达式

当不需要处理非常复杂的业务逻辑时,使用Lambda表达式可能是更好的选择,Lambda表达式能让代码更简洁紧凑。Java和Scala都可以支持Lambda表达式。

Scala的Lambda表达式

我们先看对Lambda表达式支持最好的Scala。对于flatMap,Flink的Scala源码有三种定义,我们先看一下第一种的定义:

def flatMap[O: TypeInformation](fun: (T, Collector[O]) => Unit): DataStream[O] = {...}

flatMap输入是泛型T,输出是泛型O,接收一个名为fun的Lambda表达式,fun形如(T, Collector[O] => {...})。Lambda表达式要将数据写到Collector[O]中。

我们继续以切词为例,程序可以写成下面的样子,flatMap中的内容是一个Lambda表达式。其中的foreach(out.collect)本质上也是一个Lambda表达式。从这个例子可以看出,Scala的无所不在的函数式编程思想。

val lambda = dataStream.flatMap{
  (value: String, out: Collector[String]) => {
    if (value.size > 10) {
      value.split(" ").foreach(out.collect)
    }
  }
}

然后我们看一下源码中Scala的第二种定义:

def flatMap[O: TypeInformation](fun: T => TraversableOnce[O]): DataStream[O] = {...}

与之前的不同,这里的Lambda表达式输入是泛型T,输出是一个TraversableOnce[O],TraversableOnce表示这是一个O组成的列表。与之前使用Collector收集输出不同,这里直接输出一个列表,Flink帮我们将列表做了展平。使用TraversableOnce也导致我们无论如何都要返回一个列表,即使是一个空列表,否则无法匹配函数的定义。总结下来,这种场景的Lambda表达式输入是一个T,无论如何输出都是一个O的列表,即使是一个空列表。

// 只对字符串数量大于15的句子进行处理
val longSentenceWords = dataStream.flatMap {
  input => {
    if (input.size > 15) {
      // 输出是 TraversableOnce 因此返回必须是一个列表
      // 这里将Array[String]转成了Seq[String]
      input.split(" ").toSeq
    } else {
      // 为空时必须返回空列表,否则返回值无法与TraversableOnce匹配!
      Seq.empty
    }
  }
}

在使用Lambda表达式时,我们应该逐渐学会使用IntelliJ Idea的类型检查和匹配功能。比如在本例中,如果返回值不是一个TraversableOnce,那么IntelliJ Idea会将该行标红,告知我们输入或输出的类型不匹配。

此外,还有第三种只针对Scala的Lambda表达式使用方法。Flink为了保持Java和Scala API的一致性,一些Scala独有的特性没有被放入标准的API,而是集成到了一个扩展包中。这种API支持类型匹配的偏函数(Partial Function),结合case关键字,能够在语义上更好地描述数据类型:

val data: DataStream[(String, Long, Double)] = ...
data.flatMapWith {
  case (symbol, timestamp, price) => // ...
}

使用这种API时,需要添加引用:

import org.apache.flink.streaming.api.scala.extensions._

这种方式给输入定义了变量名和类型,方便阅读者阅读代码,同时也保留了函数式编程的简洁。Spark的大多数算子默认都支持此功能,Flink没有默认支持此功能,而是将这个功能放到了扩展包里,对于Spark用户来说,迁移到Flink时需要注意这个区别。此外mapWithfilterWithkeyingByreduceWith也分别是其他算子相对应的接口。

使用flatMapWith,之前的切词可以实现为:

val flatMapWith = dataStream.flatMapWith {
  case (sentence: String) => {
    if (sentence.size > 15) {
      sentence.split(" ").toSeq
    } else {
      Seq.empty
    }
  }
}

Java

再来看看Java,因为一些遗留问题,它的Lambda表达式使用起来有一些区别。

第二章中提到,Java有类型擦除问题,void flatMap(IN value, Collector<OUT> out)编译成了void flatMap(IN value, Collector out),擦除了泛型信息,Flink无法自动获取返回类型,如果不做其他操作,会抛出异常。

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
    Otherwise the type has to be specified explicitly using type information.

这种情况下,根据报错提示,或者使用一个类实现FlatMapFunction(包括匿名类),或者添加类型信息。这个类型信息,正是数据类型和序列化章节中所介绍的数据类型。

DataStream<String> words = dataStream.flatMap (
    (String input, Collector<String> collector) -> {
        for (String word : input.split(" ")) {
          	collector.collect(word);
        }
    })
  	// 提供类型信息以解决类型擦除问题
  	.returns(Types.STRING);

通过这里对Scala和Java的对比不难发现,Scala更灵活,Java更严谨,各有优势。

Rich函数类

在上面两种自定义方法的基础上,Flink还提供了RichFunction函数类。从名称上来看,这种函数类在普通的函数类上增加了Rich前缀,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函数类,Rich函数类增加了:

  • open()方法:Flink在算子调用前会执行这个方法,可以用来进行一些初始化工作。
  • close()方法:Flink在算子最后一次调用结束后执行这个方法,可以用来释放一些资源。
  • getRuntimeContext()方法:获取运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据。

我们可以看一下源码中的函数签名:

public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT>

它既实现了FlatMapFunction接口类,又继承了AbstractRichFunction。其中AbstractRichFunction是一个抽象类,有一个成员变量RuntimeContext,有openclosegetRuntimeContext等方法。

我们尝试继承并实现RichFlatMapFunction`,并使用一个累加器。首先简单介绍累加器的概念:在单机环境下,我们可以用一个for循环做累加统计,但是在分布式计算环境下,计算是分布在多台节点上的,每个节点处理一部分数据,因此单纯循环无法满足计算,累加器是大数据框架帮我们实现的一种机制,允许我们在多节点上进行累加统计。

// 实现RichFlatMapFunction类
// 添加了累加器 Accumulator
public static class WordSplitRichFlatMap extends RichFlatMapFunction<String, String> {

    private int limit;

    // 创建一个累加器
    private IntCounter numOfLines = new IntCounter(0);

    public WordSplitRichFlatMap(Integer limit) {
      	this.limit = limit;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 在RuntimeContext中注册累加器
        getRuntimeContext().addAccumulator("num-of-lines", this.numOfLines);
    }

    @Override
    public void flatMap(String input, Collector<String> collector) throws Exception {

        // 运行过程中调用累加器
        this.numOfLines.add(1);

        if(input.length() > limit) {
            for (String word: input.split(" "))
            collector.collect(word);
        }
    }
}

在主逻辑中获取作业执行的结果,得到累加器中的值。

// 获取作业执行结果
JobExecutionResult jobExecutionResult = senv.execute("basic flatMap transformation");
// 执行结束后 获取累加器的结果
Integer lines = jobExecutionResult.getAccumulatorResult("num-of-lines");
System.out.println("num of lines: " + lines);

累加器是RichFunction函数类提供的众多功能之一,RichFunction函数类最具特色的功能是第六章将要介绍的有状态计算。