如何使用pymysql的use_result方法解决内存溢出问题

问题现象与背景

在使用Python的pymysql库与MySQL数据库交互时,use_result()方法常被用于处理大型查询结果集。与默认的store_result()不同,use_result()采用服务器端游标方式,理论上应该能减少客户端内存消耗。但开发者经常遇到意外的内存溢出(OOM)问题,特别是在处理百万级记录时。

根本原因分析

经过深入测试和分析,发现主要问题源自三个关键因素:

  1. 未及时fetch:虽然use_result是流式获取,但长时间不调用fetch方法会导致服务器维持结果集
  2. Python对象累积:即使单条记录很小,大量Python对象的创建也会消耗内存
  3. 连接未关闭:异常情况下连接泄漏会持续占用服务端资源

解决方案

1. 使用生成器模式

def streaming_query(conn, sql):
    with conn.cursor(pymysql.cursors.SSCursor) as cursor:
        cursor.execute(sql)
        while True:
            row = cursor.fetchone()
            if row is None:
                break
            yield row

2. 设置合适的fetch_size

通过cursor.arraysize属性控制每次网络往返获取的记录数:

cursor = conn.cursor(pymysql.cursors.SSCursor)
cursor.arraysize = 1000  # 每次获取1000条

3. 强制内存回收

在处理大量数据时显式调用GC:

import gc
for row in streaming_query(conn, big_query):
    process(row)
    if count % 10000 == 0:
        gc.collect()

性能优化建议

  • 使用LIMITOFFSET分页处理超大数据集
  • 考虑使用UNBUFFERED查询模式
  • 监控服务端的net_write_timeout参数
  • 使用连接池管理数据库连接

底层原理说明

MySQL协议在流式传输模式下,服务端会保持结果集直到客户端显式消费完成或连接关闭。pymysql的use_result()实际上创建了SSDictCursorSSCursor,这两种游标都实现了MySQLCursorStream接口,采用逐行传输机制。

值得注意的是,虽然网络传输是流式的,但Python解释器仍会为每行数据创建完整的PyObject。对于包含BLOB/TEXT字段的记录,这可能导致单行对象就消耗较大内存。

替代方案对比

方案内存占用网络往返适用场景
store_result1次小结果集
use_resultN次大结果集
分页查询M次可分页场景

监控与调试技巧

使用以下方法监控内存使用情况:

import tracemalloc
tracemalloc.start()
# ...执行查询...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

同时建议在MySQL服务端监控SHOW PROCESSLIST,观察查询状态是否为Sending data