Spark中的rollup

在对数据进行小计或合计运算时,rollup和cube一样,算是常用的操作了。Spark的DataFrame提供了rollup函数支持此功能。

假设准备了如下数据:

trait SalesDataFrameFixture extends DataFrameFixture
with SparkSqlSupport {
  implicit class StringFuncs(str: String) {
    def toTimestamp = new Timestamp(Date.valueOf(str).getTime)
  }

  import sqlContext.implicits._

  val sales = Seq(
    (1, "Widget Co", 1000.00, 0.00, "广东省", "深圳市", "2014-02-01".toTimestamp),
    (2, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-11".toTimestamp),
    (3, "Acme Widgets", 1000.00, 500.00, "四川省", "绵阳市", "2014-02-12".toTimestamp),
    (4, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-13".toTimestamp),
    (5, "Widget Co", 1000.00, 0.00, "广东省", "广州市", "2015-01-01".toTimestamp),
    (6, "Acme Widgets", 1000.00, 500.00, "四川省", "泸州市", "2015-01-11".toTimestamp),
    (7, "Widgetry", 1000.00, 200.00, "四川省", "成都市", "2015-02-11".toTimestamp),
    (8, "Widgets R Us", 3000.00, 0.0, "四川省", "绵阳市", "2015-02-19".toTimestamp),
    (9, "Widgets R Us", 2000.00, 0.0, "广东省", "深圳市", "2015-02-20".toTimestamp),
    (10, "Ye Olde Widgete", 3000.00, 0.0, "广东省", "深圳市", "2015-02-28".toTimestamp),
    (11, "Ye Olde Widgete", 3000.00, 0.0, "广东省", "广州市", "2015-02-28".toTimestamp)
  )

  val saleDF = sqlContext.sparkContext.parallelize(sales, 4).toDF("id", "name", "sales", "discount", "province", "city", "saleDate")
}

注册临时表,并执行SQL语句:

saleDF.registerTempTable("sales")

val dataFrame = sqlContext.sql("select province,city,sales from sales")
dataFrame.show

执行的结果如下:

| province |city | sales |
|----------|-----|-------|
|     广东省| 深圳市|1000.0|
|     四川省| 成都市|1000.0|
|     四川省| 绵阳市|1000.0|
|     四川省| 成都市|1000.0|
|     广东省| 广州市|1000.0|
|     四川省| 泸州市|1000.0|
|     四川省| 成都市|1000.0|
|     四川省| 绵阳市|3000.0|
|     广东省| 深圳市|2000.0|
|     广东省| 深圳市|3000.0|
|     广东省| 广州市|3000.0|

对该DataFrame执行rollup:

val resultDF = dataFrame.rollup($"province", $"city").agg(Map("sales" -> "sum"))
resultDF.show

在这个例子中,rollup操作相当于对dataFrame中的province与city进行分组,并在此基础上针对sales进行求和运算,故而获得的结果为:

|province|city|sum(sales)|
|--------|----|----------|
|    null|null|   18000.0|
|     广东省|null|   10000.0|
|     广东省| 深圳市|    6000.0|
|     四川省|null|    8000.0|
|     四川省| 成都市|    3000.0|
|     四川省| 绵阳市|    4000.0|
|     广东省| 广州市|    4000.0|
|     四川省| 泸州市|    1000.0|

操作非常简单,然而遗憾地是并不符合我们产品的场景,因为我们需要根据某些元数据直接组装为Spark SQL的sql语句。在Spark的hiveContext中,支持这样的语法:

hiveContext.sql("select province, city, sum(sales) from sales group by province, city with rollup")

可惜,SQLContext并不支持这一功能。我在Spark User Mailing List中咨询了这个问题。Intel的Cheng Hao(Spark的一位非常活跃的contributer)告诉了我为何不支持的原因。因为在Spark SQL 1.x版本中,对SQL语法的解析采用了Scala的Parser机制。这种实现方式较弱,对语法的解析支持不够。Spark的Issue #5080尝试提供此功能,然而并没有被合并到Master中。Spark并不希望在1.x版本的SQLParser中添加新的关键字,它的计划是在Spark 2.0中用HQL Parser来替代目前较为简陋的SQL Parser。

如果希望在sql中使用rollup,那么有三个选择:

  • 使用HQLContext;
  • pull #5080的代码,自己建立一个Spark的分支;
  • 等待Spark 2.0版本发布。
2015-12-30 21:1775SparkScala