Chapter 03

转换与聚合

groupBy 聚合、Window 窗口函数、六种 Join 策略、explode 与 pivot,构建复杂数据分析管道

groupBy + agg 聚合

groupBy 将数据按指定列分组,agg 在每组上执行聚合函数。支持一次性计算多个聚合指标。

from pyspark.sql import functions as F

# 基础聚合:按品类统计订单
df_orders.groupBy("category") \
    .agg(
        F.count("order_id").alias("order_cnt"),        # 订单数
        F.countDistinct("user_id").alias("buyer_cnt"),  # 独立买家数
        F.sum("amount").alias("total_revenue"),         # 总收入
        F.avg("amount").alias("avg_order_value"),       # 客单价
        F.max("amount").alias("max_order"),             # 最大订单
        F.min("amount").alias("min_order"),             # 最小订单
        F.stddev("amount").alias("amount_stddev"),      # 标准差
        F.percentile_approx("amount", 0.5).alias("median"),  # 中位数
    ) \
    .orderBy(F.desc("total_revenue")) \
    .show()

Window 函数:分区内的有序计算

Window 函数(窗口函数)在不合并行的情况下,基于"窗口"(一组相关行)计算值。这是处理排名、累计和、移动平均等分析任务的核心工具。

from pyspark.sql.window import Window

# 定义窗口:按用户分区,按时间排序
user_window = Window.partitionBy("user_id").orderBy("order_date")

# 定义滑动窗口(最近 7 天)
rolling_7d = Window.partitionBy("user_id") \
    .orderBy(F.unix_timestamp("order_date")) \
    .rangeBetween(-7 * 86400, Window.currentRow)

df_orders.withColumn("row_num",    F.row_number().over(user_window)) \
         .withColumn("rank",       F.rank().over(user_window)) \       # 有并列排名跳号
         .withColumn("dense_rank", F.dense_rank().over(user_window)) \  # 有并列排名不跳号
         .withColumn("cum_sum",    F.sum("amount").over(user_window)) \  # 累计和
         .withColumn("prev_amount", F.lag("amount", 1).over(user_window)) \  # 上一行
         .withColumn("next_amount", F.lead("amount", 1).over(user_window)) \ # 下一行
         .withColumn("rolling_7d_sum", F.sum("amount").over(rolling_7d))    # 7天滑动窗口

Join 类型详解

Spark 支持所有标准 SQL Join 类型,以及大数据场景特有的 Semi Join 和 Anti Join。

Join 类型说明典型场景
inner两表匹配的行(默认)关联订单和用户信息
left / left_outer左表全部 + 右表匹配行保留所有用户,关联可能不存在的订单
right / right_outer右表全部 + 左表匹配行同上,方向相反
full / full_outer两表全部,不匹配处填 null数据对账、合并两个来源的数据
cross笛卡尔积,所有行的组合生成特征交叉(小表)
left_semi左表中能在右表找到匹配的行(不包含右表列)筛选"在黑名单中"的用户
left_anti左表中找不到匹配的行筛选"不在白名单中"的用户
# 标准 left join
df_result = df_orders.join(
    df_users,
    on=df_orders.user_id == df_users.id,
    how="left"
)

# Anti join:找出没有下单的用户
no_order_users = df_users.join(
    df_orders.select("user_id").distinct(),
    df_users.id == df_orders.user_id,
    how="left_anti"
)

# 广播 Join(小表):避免 Shuffle,显著提升性能
df_orders.join(
    F.broadcast(df_categories),  # 将小表广播到所有 Executor
    on="category_id",
    how="left"
)

explode:展开数组列

当一列是数组类型(如用户的标签列表),用 explode 将其展开为多行,便于聚合分析。

# 示例数据:user_id=1, tags=["vip","mobile","new"]
df_with_tags = df.withColumn("tag", F.explode(F.col("tags")))
# 展开后:user_id=1,tag="vip" / user_id=1,tag="mobile" / user_id=1,tag="new"

# explode_outer:即使数组为空也保留该行(null 填充)
df.withColumn("tag", F.explode_outer(F.col("tags")))

# 统计每个标签的用户数
df_with_tags.groupBy("tag").agg(F.countDistinct("user_id").alias("cnt")).show()

pivot:行转列

pivot 将某列的不同值转换为多列,是报表分析中的常见操作。

# 将每月的销售额从行格式转换为宽格式(每月一列)
monthly_pivot = df_orders \
    .groupBy("category") \
    .pivot("month", ["2024-01", "2024-02", "2024-03"]) \  # 显式指定值(推荐,避免额外扫描)
    .agg(F.sum("amount"))

monthly_pivot.show()
# category | 2024-01 | 2024-02 | 2024-03
# ---------|---------|---------|--------
# 电子产品  |  123456 |  135000 | 141200
# 服装      |   56789 |   61200 |  59800

实战:电商订单多维聚合分析

# 综合实战:计算各品类各月的 GMV、订单量、UV、客单价、环比增长
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Step 1: 基础聚合
monthly = df_orders.groupBy("category", "month") \
    .agg(
        F.sum("amount").alias("gmv"),
        F.count("order_id").alias("orders"),
        F.countDistinct("user_id").alias("uv"),
    ) \
    .withColumn("aov", F.col("gmv") / F.col("orders"))  # 客单价

# Step 2: 计算 GMV 环比增长
cat_month_window = Window.partitionBy("category").orderBy("month")
monthly = monthly \
    .withColumn("prev_gmv", F.lag("gmv", 1).over(cat_month_window)) \
    .withColumn("gmv_growth",
        (F.col("gmv") - F.col("prev_gmv")) / F.col("prev_gmv")
    )

# Step 3: 写出结果
monthly.write.mode("overwrite").parquet("output/monthly_category_stats")

pivot 性能提示:不显式指定 pivot 的值列表时,Spark 需要先执行一次额外的 Job 来收集所有唯一值,然后才能执行转换。对大数据集,始终显式指定值列表以节省一趟全扫描。