spark.sql():执行 SQL
Spark SQL 允许直接编写 SQL 查询,返回 DataFrame。SQL 与 DataFrame API 在底层走同一个 Catalyst 优化器,性能没有差异。选择哪种写法完全取决于团队习惯和可读性。
# SQL 和 DataFrame API 完全等价,底层优化相同
# DataFrame API 写法:
result_df = df_orders.filter(F.col("amount") > 100) \
.groupBy("category") \
.agg(F.sum("amount").alias("total"))
# Spark SQL 写法(先注册视图):
df_orders.createOrReplaceTempView("orders")
result_sql = spark.sql("""
SELECT category, SUM(amount) AS total
FROM orders
WHERE amount > 100
GROUP BY category
ORDER BY total DESC
""")
# 两者结果完全相同
result_df.show()
result_sql.show()
视图管理
- createOrReplaceTempView 创建会话级别的临时视图,只在当前 SparkSession 中有效。进程退出后自动销毁。
-
createOrReplaceGlobalTempView
创建全局临时视图,在同一 SparkContext 的所有 Session 中可见。通过
global_temp.view_name访问。适合多用户共享的临时中间表。 - saveAsTable 将 DataFrame 持久化为 Hive/Spark Metastore 中的表(永久存储),可跨 Session 访问。
# 全局临时视图(跨 Session)
df_orders.createOrReplaceGlobalTempView("orders_global")
spark.sql("SELECT * FROM global_temp.orders_global LIMIT 5")
# 持久化为 Hive 表
df_orders.write.mode("overwrite").saveAsTable("dw.orders")
spark.sql("SELECT * FROM dw.orders LIMIT 5") # 下次启动 Spark 仍可访问
Catalog API
Catalog 是 Spark 的元数据管理接口,可以列出数据库、表、列,以及检查/刷新表缓存。
# 列出所有数据库
spark.catalog.listDatabases()
# 切换数据库
spark.catalog.setCurrentDatabase("dw")
# 列出当前数据库的所有表
spark.catalog.listTables()
# 列出表的列信息
spark.catalog.listColumns("orders")
# 检查表是否已缓存
spark.catalog.isCached("orders")
# 刷新表(更新底层文件变更后调用)
spark.catalog.refreshTable("dw.orders")
复杂类型操作:struct / array / map
struct 嵌套结构体
# 访问嵌套字段:用 . 符号或 getField()
df.select("address.city")
df.select(F.col("address").getField("city"))
df.select(F.col("address.city"), F.col("address.zipcode"))
# SQL 中访问 struct 字段
spark.sql("SELECT user.name, user.address.city FROM users")
array 数组操作
# array 函数和常用操作
df.withColumn("tag_count", F.size("tags")) \ # 数组长度
.withColumn("has_vip", F.array_contains("tags", "vip")) \ # 是否包含
.withColumn("first_tag", F.element_at("tags", 1)) \ # 第1个元素(从1开始)
.withColumn("sorted_tags", F.array_sort("tags")) \ # 排序
.withColumn("unique_tags", F.array_distinct("tags")) # 去重
# 两个数组的并集 / 交集
F.array_union(F.col("tags1"), F.col("tags2"))
F.array_intersect(F.col("tags1"), F.col("tags2"))
map 字典操作
# map 类型操作
df.select(
F.map_keys("properties"), # 获取所有 key
F.map_values("properties"), # 获取所有 value
F.col("properties")["color"], # 按 key 取值
F.element_at(F.col("properties"), "color"), # 等价写法
F.map_contains_key(F.col("properties"), "color"), # Spark 3.3+
)
内置函数库(200+)
Spark 内置了 200+ 个 SQL 函数,覆盖字符串、日期时间、数学、数组、JSON、加密等各类场景:
| 分类 | 常用函数 |
|---|---|
| 字符串 | concat, concat_ws, substring, trim, ltrim, rtrim, upper, lower, regexp_extract, regexp_replace, split, length, lpad, rpad |
| 日期时间 | current_date, current_timestamp, date_add, date_sub, datediff, to_date, to_timestamp, date_format, year, month, dayofweek, unix_timestamp, from_unixtime |
| 数学 | abs, round, ceil, floor, sqrt, log, pow, rand, greatest, least, percentile_approx |
| 条件 | when/otherwise, coalesce, nullif, ifnull, isnull, isnan, nvl |
| JSON | get_json_object, json_tuple, from_json, to_json, schema_of_json |
| 聚合 | collect_list, collect_set, first, last, approx_count_distinct, corr, covar_pop |
# 典型用法示例
df.withColumn("full_name", F.concat_ws(" ", "first_name", "last_name")) \
.withColumn("age_bucket",
F.when(F.col("age") < 18, "少年")
.when(F.col("age") < 30, "青年")
.when(F.col("age") < 50, "中年")
.otherwise("老年")
) \
.withColumn("city", F.get_json_object("user_json", "$.address.city"))
Hive 集成
在启用 HiveSupport 的 SparkSession 中,Spark 使用 Hive Metastore 存储表元数据,可以读写 Hive 表,兼容 HiveQL 语法。
spark = SparkSession.builder \
.appName("HiveIntegration") \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://metastore-host:9083") \
.enableHiveSupport() \
.getOrCreate()
# 直接查询 Hive 表
spark.sql("SHOW DATABASES")
spark.sql("USE data_warehouse")
spark.sql("SHOW TABLES")
# 读取 Hive 分区表
df = spark.sql("SELECT * FROM dw.orders WHERE dt = '2024-01-01'")
# 将结果写回 Hive 表(动态分区写入)
spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
result.write.mode("overwrite") \
.partitionBy("dt") \
.insertInto("dw.orders_daily")
实战:TPC-H 基准数据集 SQL 分析
TPC-H 是数据库行业标准基准测试,包含订单、客户、供应商等 8 张表,22 个复杂查询。以 Q3(运输优先查询)为例:
# TPC-H Q3:查找未交付、收入最高的前10个订单(按市场细分过滤)
spark.sql("""
SELECT
l.l_orderkey,
SUM(l.l_extendedprice * (1 - l.l_discount)) AS revenue,
o.o_orderdate,
o.o_shippriority
FROM customer c
JOIN orders o ON c.c_custkey = o.o_custkey
JOIN lineitem l ON l.l_orderkey = o.o_orderkey
WHERE
c.c_mktsegment = 'BUILDING'
AND o.o_orderdate < DATE '1995-03-15'
AND l.l_shipdate > DATE '1995-03-15'
GROUP BY l.l_orderkey, o.o_orderdate, o.o_shippriority
ORDER BY revenue DESC, o.o_orderdate
LIMIT 10
""").show()
Catalyst 优化器:Spark SQL 的查询优化器,会自动对 SQL 进行谓词下推(Predicate Pushdown)、列裁剪(Column Pruning)、常量折叠(Constant Folding)等优化,通常比手写的 RDD 代码更高效。