groupBy + agg 聚合
groupBy 将数据按指定列分组,agg 在每组上执行聚合函数。支持一次性计算多个聚合指标。
from pyspark.sql import functions as F
# 基础聚合:按品类统计订单
df_orders.groupBy("category") \
.agg(
F.count("order_id").alias("order_cnt"), # 订单数
F.countDistinct("user_id").alias("buyer_cnt"), # 独立买家数
F.sum("amount").alias("total_revenue"), # 总收入
F.avg("amount").alias("avg_order_value"), # 客单价
F.max("amount").alias("max_order"), # 最大订单
F.min("amount").alias("min_order"), # 最小订单
F.stddev("amount").alias("amount_stddev"), # 标准差
F.percentile_approx("amount", 0.5).alias("median"), # 中位数
) \
.orderBy(F.desc("total_revenue")) \
.show()
Window 函数:分区内的有序计算
Window 函数(窗口函数)在不合并行的情况下,基于"窗口"(一组相关行)计算值。这是处理排名、累计和、移动平均等分析任务的核心工具。
from pyspark.sql.window import Window
# 定义窗口:按用户分区,按时间排序
user_window = Window.partitionBy("user_id").orderBy("order_date")
# 定义滑动窗口(最近 7 天)
rolling_7d = Window.partitionBy("user_id") \
.orderBy(F.unix_timestamp("order_date")) \
.rangeBetween(-7 * 86400, Window.currentRow)
df_orders.withColumn("row_num", F.row_number().over(user_window)) \
.withColumn("rank", F.rank().over(user_window)) \ # 有并列排名跳号
.withColumn("dense_rank", F.dense_rank().over(user_window)) \ # 有并列排名不跳号
.withColumn("cum_sum", F.sum("amount").over(user_window)) \ # 累计和
.withColumn("prev_amount", F.lag("amount", 1).over(user_window)) \ # 上一行
.withColumn("next_amount", F.lead("amount", 1).over(user_window)) \ # 下一行
.withColumn("rolling_7d_sum", F.sum("amount").over(rolling_7d)) # 7天滑动窗口
Join 类型详解
Spark 支持所有标准 SQL Join 类型,以及大数据场景特有的 Semi Join 和 Anti Join。
| Join 类型 | 说明 | 典型场景 |
|---|---|---|
| inner | 两表匹配的行(默认) | 关联订单和用户信息 |
| left / left_outer | 左表全部 + 右表匹配行 | 保留所有用户,关联可能不存在的订单 |
| right / right_outer | 右表全部 + 左表匹配行 | 同上,方向相反 |
| full / full_outer | 两表全部,不匹配处填 null | 数据对账、合并两个来源的数据 |
| cross | 笛卡尔积,所有行的组合 | 生成特征交叉(小表) |
| left_semi | 左表中能在右表找到匹配的行(不包含右表列) | 筛选"在黑名单中"的用户 |
| left_anti | 左表中找不到匹配的行 | 筛选"不在白名单中"的用户 |
# 标准 left join
df_result = df_orders.join(
df_users,
on=df_orders.user_id == df_users.id,
how="left"
)
# Anti join:找出没有下单的用户
no_order_users = df_users.join(
df_orders.select("user_id").distinct(),
df_users.id == df_orders.user_id,
how="left_anti"
)
# 广播 Join(小表):避免 Shuffle,显著提升性能
df_orders.join(
F.broadcast(df_categories), # 将小表广播到所有 Executor
on="category_id",
how="left"
)
explode:展开数组列
当一列是数组类型(如用户的标签列表),用 explode 将其展开为多行,便于聚合分析。
# 示例数据:user_id=1, tags=["vip","mobile","new"]
df_with_tags = df.withColumn("tag", F.explode(F.col("tags")))
# 展开后:user_id=1,tag="vip" / user_id=1,tag="mobile" / user_id=1,tag="new"
# explode_outer:即使数组为空也保留该行(null 填充)
df.withColumn("tag", F.explode_outer(F.col("tags")))
# 统计每个标签的用户数
df_with_tags.groupBy("tag").agg(F.countDistinct("user_id").alias("cnt")).show()
pivot:行转列
pivot 将某列的不同值转换为多列,是报表分析中的常见操作。
# 将每月的销售额从行格式转换为宽格式(每月一列)
monthly_pivot = df_orders \
.groupBy("category") \
.pivot("month", ["2024-01", "2024-02", "2024-03"]) \ # 显式指定值(推荐,避免额外扫描)
.agg(F.sum("amount"))
monthly_pivot.show()
# category | 2024-01 | 2024-02 | 2024-03
# ---------|---------|---------|--------
# 电子产品 | 123456 | 135000 | 141200
# 服装 | 56789 | 61200 | 59800
实战:电商订单多维聚合分析
# 综合实战:计算各品类各月的 GMV、订单量、UV、客单价、环比增长
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Step 1: 基础聚合
monthly = df_orders.groupBy("category", "month") \
.agg(
F.sum("amount").alias("gmv"),
F.count("order_id").alias("orders"),
F.countDistinct("user_id").alias("uv"),
) \
.withColumn("aov", F.col("gmv") / F.col("orders")) # 客单价
# Step 2: 计算 GMV 环比增长
cat_month_window = Window.partitionBy("category").orderBy("month")
monthly = monthly \
.withColumn("prev_gmv", F.lag("gmv", 1).over(cat_month_window)) \
.withColumn("gmv_growth",
(F.col("gmv") - F.col("prev_gmv")) / F.col("prev_gmv")
)
# Step 3: 写出结果
monthly.write.mode("overwrite").parquet("output/monthly_category_stats")
pivot 性能提示:不显式指定 pivot 的值列表时,Spark 需要先执行一次额外的 Job 来收集所有唯一值,然后才能执行转换。对大数据集,始终显式指定值列表以节省一趟全扫描。