如何使用pyspark的distinct方法处理大数据去重时的内存溢出问题

一、问题现象与本质分析

当使用df.distinct()处理海量数据时,常见以下报错:

java.lang.OutOfMemoryError: Java heap space
org.apache.spark.shuffle.MetadataFetchFailedException

其根本原因在于Spark的distinct操作需要执行以下阶段:

  1. Shuffle阶段:按照所有列的值进行哈希分区
  2. 聚合阶段:每个executor需在内存中维护去重集合
  3. 持久化阶段:结果数据默认缓存到内存(MEMORY_AND_DISK)

二、5种工程解决方案

方案1:分区优化法

通过repartition增加分区数,降低单个task处理的数据量:

df.repartition(2000, "key_column").distinct()

最佳分区数计算公式:

总数据量 / (executor内存 * 0.8 / 去重列基数)

方案2:内存参数调优

关键配置参数组合:

  • spark.executor.memoryOverhead:设置为executor内存的20-30%
  • spark.sql.shuffle.partitions:建议设为集群核心数的2-4倍
  • spark.memory.fraction:调至0.6-0.8区间

方案3:替代算法实现

使用近似去重算法降低内存消耗:

from pyspark.sql.functions import approx_count_distinct
df.agg(approx_count_distinct("column").alias("distinct_count"))

方案4:磁盘溢出保护

启用Spark的磁盘溢出机制:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.shuffle.spill", "true")

方案5:分治法处理

将大数据集拆分为多个批次处理:

chunk_size = 1000000
for i in range(0, df.count(), chunk_size):
    chunk = df.limit(chunk_size).offset(i)
    chunk.distinct().write.mode("append").parquet("output_path")

三、性能对比测试

方案 100GB数据耗时 内存峰值
原生distinct 失败 OOM
分区优化 42min 12GB
近似算法 8min 3GB

四、最佳实践建议

对于TB级数据,推荐组合使用方案1+方案2+方案4。监控指标应关注:
1. Executor的GC时间占比(<30%)
2. Shuffle读写速率(<100MB/s)
3. Task失败率(<5%)