一、简单聚合 1.1 数据准备 1 2 3 4 5 6 7 8 import org.apache .spark .sql .functions ._ val spark = SparkSession.builder ().appName ("aggregations" ).master ("local[2]" ).getOrCreate () val empDF = spark.read .json ("/usr/file/json/emp.json" ) empDF.createOrReplaceTempView ("emp" ) empDF.show ()
注:emp.json可以从本仓库的resources 目录下载。
1.2 count 1 2 empDF.select (count ("ename" )).show ()
1.3 countDistinct 1 2 empDF.select (countDistinct ("deptno" )).show ()
1.4 approx_count_distinct 通常在使用大型数据集时,你可能关注的只是近似值而不是准确值,这时可以使用approx_count_distinct函数,并可以使用第二个参数指定最大允许误差。
1 empDF .select(approx_count_distinct ("ename" ,0 .1 )).show()
1.5 first & last 获取DataFrame中指定列的第一个值或者最后一个值。
1 empDF.select (first ("ename" ),last ("job" )).show ()
1.6 min & max 获取DataFrame中指定列的最小值或者最大值。
1 empDF.select (min("sal"),max("sal")).show ()
1.7 sum & sumDistinct 求和以及求指定列所有不相同的值的和。
1 2 empDF.select (sum ("sal" )).show () empDF.select (sumDistinct ("sal" )).show ()
1.8 avg 内置的求平均数的函数。
1 empDF.select (avg("sal")).show ()
1.9 数学函数 Spark SQL中还支持多种数学聚合函数,用于通常的数学计算,以下是一些常用的例子:
1 2 3 4 5 6 7 8 empDF.select (var_pop ("sal" ), var_samp ("sal" ), stddev_pop ("sal" ), stddev_samp ("sal" )).show () empDF.select (skewness ("sal" ), kurtosis ("sal" )).show () empDF.select (corr ("empno" , "sal" ), covar_samp ("empno" , "sal" ),covar_pop ("empno" , "sal" )).show ()
1.10 聚合数据到集合 1 2 3 4 5 6 7 8 scala> empDF.agg(collect_set("job"), collect_list("ename")).show() 输出: +--------------------+--------------------+ | collect_set(job)| collect_list(ename)| +--------------------+--------------------+ |[MANAGER, SALESMA...|[SMITH, ALLEN, WA...| +--------------------+ --------------------+
二、分组聚合 2.1 简单分组 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 empDF.groupBy("deptno" , "job" ).count().show() //等价SQL spark.sql("SELECT deptno, job, count(*) FROM emp GROUP BY deptno, job" ).show() 输出: +------+---------+-----+ |deptno | job |count | +------+---------+-----+ | 10 |PRESIDENT | 1 | | 30 | CLERK | 1 | | 10 | MANAGER | 1 | | 30 | MANAGER | 1 | | 20 | CLERK | 2 | | 30 | SALESMAN | 4 | | 20 | ANALYST | 2 | | 10 | CLERK | 1 | | 20 | MANAGER | 1 | +------+---------+-----+
2.2 分组聚合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 empDF.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show() empDF.groupBy("deptno").agg("ename"->"count","sal"->"sum").show() spark.sql("SELECT deptno, count(ename) ,sum(sal) FROM emp GROUP BY deptno").show() 输出: +------+----+------+ |deptno|人数|总工资| +------+----+------+ | 10| 3|8750.0| | 30| 6|9400.0| | 20| 5|9375.0| +------+----+------+
三、自定义聚合函数 Scala提供了两种自定义聚合函数的方法,分别如下:
有类型的自定义聚合函数,主要适用于DataSet;
无类型的自定义聚合函数,主要适用于DataFrame。
以下分别使用两种方式来自定义一个求平均值的聚合函数,这里以计算员工平均工资为例。两种自定义方式分别如下:
3.1 有类型的自定义函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Encoder , Encoders , SparkSession , functions}case class Emp (ename: String , comm: scala.Option [Double ], deptno: Long , empno: Long , hiredate: String , job: String , mgr: scala.Option [Long ], sal: Double )case class SumAndCount (var sum: Double , var count: Long )object MyAverage extends Aggregator [Emp , SumAndCount , Double ] { override def zero : SumAndCount = SumAndCount (0 , 0 ) override def reduce (avg: SumAndCount , emp: Emp ): SumAndCount = { avg.sum += emp.sal avg.count += 1 avg } override def merge (avg1: SumAndCount , avg2: SumAndCount ): SumAndCount = { avg1.sum += avg2.sum avg1.count += avg2.count avg1 } override def finish (reduction: SumAndCount ): Double = reduction.sum / reduction.count override def bufferEncoder : Encoder [SumAndCount ] = Encoders .product override def outputEncoder : Encoder [Double ] = Encoders .scalaDouble } object SparkSqlApp { def main (args: Array [String ]): Unit = { val spark = SparkSession .builder().appName("Spark-SQL" ).master("local[2]" ).getOrCreate() import spark.implicits._ val ds = spark.read.json("file/emp.json" ).as[Emp ] val myAvg = ds.select(MyAverage .toColumn.name("average_sal" )).first() val avg = ds.select(functions.avg(ds.col("sal" ))).first().get(0 ) println("自定义average函数 : " + myAvg) println("内置的average函数 : " + avg) } }
自定义聚合函数需要实现的方法比�多,这里以绘图的方式来演示其执行流程,以及每个方法的作用:
关于zero
,reduce
,merge
,finish
方法的作用在上图都有说明,这里解释一下中间类型和输出类型的编码转换,这个写法比较固定,基本上就是两种情况:
自定义类型Case Class或者元组就使用Encoders.product
方法;
基本类型就使用其对应名称的方法,如scalaByte
,scalaFloat
,scalaShort
等,示例如下:
1 2 override def bufferEncoder : Encoder [SumAndCount ] = Encoders .productoverride def outputEncoder : Encoder [Double ] = Encoders .scalaDouble
3.2 无类型的自定义聚合函数 理解了有类型的自定义聚合函数后,无类型的定义方式也基本相同,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import org.apache.spark.sql.expressions.{MutableAggregationBuffer , UserDefinedAggregateFunction }import org.apache.spark.sql.types._import org.apache.spark.sql.{Row , SparkSession }object MyAverage extends UserDefinedAggregateFunction { def inputSchema : StructType = StructType (StructField ("MyInputColumn" , LongType ) :: Nil ) def bufferSchema : StructType = { StructType (StructField ("sum" , LongType ) :: StructField ("MyCount" , LongType ) :: Nil ) } def dataType : DataType = DoubleType def deterministic : Boolean = true def initialize (buffer: MutableAggregationBuffer ): Unit = { buffer(0 ) = 0 L buffer(1 ) = 0 L } def update (buffer: MutableAggregationBuffer , input: Row ): Unit = { if (!input.isNullAt(0 )) { buffer(0 ) = buffer.getLong(0 ) + input.getLong(0 ) buffer(1 ) = buffer.getLong(1 ) + 1 } } def merge (buffer1: MutableAggregationBuffer , buffer2: Row ): Unit = { buffer1(0 ) = buffer1.getLong(0 ) + buffer2.getLong(0 ) buffer1(1 ) = buffer1.getLong(1 ) + buffer2.getLong(1 ) } def evaluate (buffer: Row ): Double = buffer.getLong(0 ).toDouble / buffer.getLong(1 ) } object SparkSqlApp { def main (args: Array [String ]): Unit = { val spark = SparkSession .builder().appName("Spark-SQL" ).master("local[2]" ).getOrCreate() spark.udf.register("myAverage" , MyAverage ) val df = spark.read.json("file/emp.json" ) df.createOrReplaceTempView("emp" ) val myAvg = spark.sql("SELECT myAverage(sal) as avg_sal FROM emp" ).first() val avg = spark.sql("SELECT avg(sal) as avg_sal FROM emp" ).first() println("自定义average函数 : " + myAvg) println("内置的average函数 : " + avg) } }
参考资料
Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02