Chapter 04

Spark SQL

用熟悉的 SQL 语法操控 PB 级数据,Hive Metastore 集成、复杂类型查询、200+ 内置函数全解析

spark.sql():执行 SQL

Spark SQL 允许直接编写 SQL 查询,返回 DataFrame。SQL 与 DataFrame API 在底层走同一个 Catalyst 优化器,性能没有差异。选择哪种写法完全取决于团队习惯和可读性。

# SQL 和 DataFrame API 完全等价,底层优化相同
# DataFrame API 写法:
result_df = df_orders.filter(F.col("amount") > 100) \
    .groupBy("category") \
    .agg(F.sum("amount").alias("total"))

# Spark SQL 写法(先注册视图):
df_orders.createOrReplaceTempView("orders")
result_sql = spark.sql("""
    SELECT category, SUM(amount) AS total
    FROM orders
    WHERE amount > 100
    GROUP BY category
    ORDER BY total DESC
""")

# 两者结果完全相同
result_df.show()
result_sql.show()

视图管理

# 全局临时视图(跨 Session)
df_orders.createOrReplaceGlobalTempView("orders_global")
spark.sql("SELECT * FROM global_temp.orders_global LIMIT 5")

# 持久化为 Hive 表
df_orders.write.mode("overwrite").saveAsTable("dw.orders")
spark.sql("SELECT * FROM dw.orders LIMIT 5")  # 下次启动 Spark 仍可访问

Catalog API

Catalog 是 Spark 的元数据管理接口,可以列出数据库、表、列,以及检查/刷新表缓存。

# 列出所有数据库
spark.catalog.listDatabases()

# 切换数据库
spark.catalog.setCurrentDatabase("dw")

# 列出当前数据库的所有表
spark.catalog.listTables()

# 列出表的列信息
spark.catalog.listColumns("orders")

# 检查表是否已缓存
spark.catalog.isCached("orders")

# 刷新表(更新底层文件变更后调用)
spark.catalog.refreshTable("dw.orders")

复杂类型操作:struct / array / map

struct 嵌套结构体

# 访问嵌套字段:用 . 符号或 getField()
df.select("address.city")
df.select(F.col("address").getField("city"))
df.select(F.col("address.city"), F.col("address.zipcode"))

# SQL 中访问 struct 字段
spark.sql("SELECT user.name, user.address.city FROM users")

array 数组操作

# array 函数和常用操作
df.withColumn("tag_count", F.size("tags")) \       # 数组长度
  .withColumn("has_vip", F.array_contains("tags", "vip")) \  # 是否包含
  .withColumn("first_tag", F.element_at("tags", 1)) \   # 第1个元素(从1开始)
  .withColumn("sorted_tags", F.array_sort("tags")) \   # 排序
  .withColumn("unique_tags", F.array_distinct("tags"))  # 去重

# 两个数组的并集 / 交集
F.array_union(F.col("tags1"), F.col("tags2"))
F.array_intersect(F.col("tags1"), F.col("tags2"))

map 字典操作

# map 类型操作
df.select(
    F.map_keys("properties"),           # 获取所有 key
    F.map_values("properties"),         # 获取所有 value
    F.col("properties")["color"],      # 按 key 取值
    F.element_at(F.col("properties"), "color"),  # 等价写法
    F.map_contains_key(F.col("properties"), "color"),  # Spark 3.3+
)

内置函数库(200+)

Spark 内置了 200+ 个 SQL 函数,覆盖字符串、日期时间、数学、数组、JSON、加密等各类场景:

分类常用函数
字符串concat, concat_ws, substring, trim, ltrim, rtrim, upper, lower, regexp_extract, regexp_replace, split, length, lpad, rpad
日期时间current_date, current_timestamp, date_add, date_sub, datediff, to_date, to_timestamp, date_format, year, month, dayofweek, unix_timestamp, from_unixtime
数学abs, round, ceil, floor, sqrt, log, pow, rand, greatest, least, percentile_approx
条件when/otherwise, coalesce, nullif, ifnull, isnull, isnan, nvl
JSONget_json_object, json_tuple, from_json, to_json, schema_of_json
聚合collect_list, collect_set, first, last, approx_count_distinct, corr, covar_pop
# 典型用法示例
df.withColumn("full_name", F.concat_ws(" ", "first_name", "last_name")) \
  .withColumn("age_bucket",
      F.when(F.col("age") < 18, "少年")
       .when(F.col("age") < 30, "青年")
       .when(F.col("age") < 50, "中年")
       .otherwise("老年")
  ) \
  .withColumn("city", F.get_json_object("user_json", "$.address.city"))

Hive 集成

在启用 HiveSupport 的 SparkSession 中,Spark 使用 Hive Metastore 存储表元数据,可以读写 Hive 表,兼容 HiveQL 语法。

spark = SparkSession.builder \
    .appName("HiveIntegration") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://metastore-host:9083") \
    .enableHiveSupport() \
    .getOrCreate()

# 直接查询 Hive 表
spark.sql("SHOW DATABASES")
spark.sql("USE data_warehouse")
spark.sql("SHOW TABLES")

# 读取 Hive 分区表
df = spark.sql("SELECT * FROM dw.orders WHERE dt = '2024-01-01'")

# 将结果写回 Hive 表(动态分区写入)
spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
result.write.mode("overwrite") \
    .partitionBy("dt") \
    .insertInto("dw.orders_daily")

实战:TPC-H 基准数据集 SQL 分析

TPC-H 是数据库行业标准基准测试,包含订单、客户、供应商等 8 张表,22 个复杂查询。以 Q3(运输优先查询)为例:

# TPC-H Q3:查找未交付、收入最高的前10个订单(按市场细分过滤)
spark.sql("""
    SELECT
        l.l_orderkey,
        SUM(l.l_extendedprice * (1 - l.l_discount)) AS revenue,
        o.o_orderdate,
        o.o_shippriority
    FROM customer c
    JOIN orders o   ON c.c_custkey = o.o_custkey
    JOIN lineitem l ON l.l_orderkey = o.o_orderkey
    WHERE
        c.c_mktsegment = 'BUILDING'
        AND o.o_orderdate < DATE '1995-03-15'
        AND l.l_shipdate  > DATE '1995-03-15'
    GROUP BY l.l_orderkey, o.o_orderdate, o.o_shippriority
    ORDER BY revenue DESC, o.o_orderdate
    LIMIT 10
""").show()

Catalyst 优化器:Spark SQL 的查询优化器,会自动对 SQL 进行谓词下推(Predicate Pushdown)、列裁剪(Column Pruning)、常量折叠(Constant Folding)等优化,通常比手写的 RDD 代码更高效。