Spark:自定义Transformer

Transformer

当我们从数据源读取了一些数据之后, 我们经常需要对这些数据进行一些预处理(类型转换,增删改等等). Spark的mllib提供了一些数据转换的类和函数,但是有时候并不能满足我们的需求,所以我们需要自给自足。当然,我们可以自己构建这些类和函数,如果可以利用Spark提供的一些接口,当然就事半功倍了。其中一个方法就是extends Transformer。关于TransformerPipeline 的介绍请戳官方文档.

一个最简单的非abstract的继承Transformer的类定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TypeTransformer extends Transformer{
override val uid: String = Identifiable.randomUID("typeTransformer")
override def transformSchema(schema: StructType): StructType = {
// 检查输入和输出是否符合要求, 比如数据类型.
// 返回transform之后的schema
}
override def transform(dataset: Dataset[_]): DataFrame = {
// 将输入的dataset(一般来说是DataFrame类型的)转换成新的dataset并返回
}
override def copy(extra: ParamMap): Transformer = defaultCopy(extra) // 暂时不知道有什么用
}

然后这个TypeTransformer就可以应用到Pipeline里面啦. 这就是我们的类要继承自Transformer的原因了, 因为Pipeline中的stage要么是Transformer,要么是Estimator. 而后者一般用于机器学习方法, 前者就是功能如其名了.

为了让我们这个TypeTransformer更像Spark中别的Transformer, 我们还需要添加一些方法, 常用的就是:

1
2
3
4
5
6
7
// 定义需要的Param
final val inputCol= new Param[String](this, "inputCol", "The input column")
final val outputCol = new Param[String](this, "outputCol", "The output column")
// 设置需要transform的column的名字
def setInputCol(value: String): this.type = set(inputCol, value)
//设置transformed的column的名字
def setOutputCol(value: String): this.type = set(outputCol, value)

接下来就要使用这个TypeTransformer了:

1
2
3
4
// 创建实例并设置好参数
val typeTransformer = new TypeTransformer().setInputCol(v1).setOutputCol(v2)
// 应用到数据上
typeTransformer.fit(someDataFrame).transform(someDataFrame) // 得到新的dataFrame

这就完了??
好像这个TypeTransformer一次只能处理一列啊, 没错, 就是这样!
好蠢啊○| ̄|_
当然了, 我们可以添加一个方法接收多个inputCol嘛, 不过为了简单介绍一下Pipeline的用法, 我们可以把n个TypeTransformer应用到Pipeline中,每个TypeTransformer的输出都是下一个的输入. 代码如下:

1
2
3
val ypeTransformers: Array[PipelineStage] = Array(typeTransformer1, typeTransformer2, typeTransformer3)
val typeTransformPipeline = new Pipeline().setStages(typeTransformers)
val result = typeTransformPipeline.fit(dataFrame).transform(dataFrame)

结果就是3个typeTransformer的顺序调用后的结果.