Spark 是一个分布式数据处理引擎,因此当您处理数据或将其保存在文件系统上时,它会使用其所有执行器来执行任务。
Spark JDBC 速度很慢,因为当您建立 JDBC 连接时,执行器之一会建立到目标数据库的链接,从而导致速度缓慢和失败。
要解决此问题并加快数据写入数据库的速度,您需要使用以下方法之一:
方法一:
在这种方法中,您需要使用 postgres复制命令实用程序以加快写操作的速度。这需要你有psycopg2EMR 集群上的库。
COPY 实用程序的文档是here https://www.postgresql.org/docs/current/sql-copy.html
如果您想了解基准差异以及为什么复制速度更快,请访问here https://stackoverflow.com/questions/46715354/how-does-copy-work-and-why-is-it-so-much-faster-than-insert!
Postgres 还建议使用 COPY 命令进行批量插入。现在如何批量插入 Spark 数据框。
现在,为了实现更快的写入速度,首先将 Spark 数据帧以 csv 格式保存到 EMR 文件系统,并重新分区输出,以便没有文件包含超过 100k 行。
#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)
现在使用 python 读取文件并对每个文件执行复制命令。
import psycopg2
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')
#define a function
def execute_copy(fileName):
con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
cursor = con.cursor()
cursor.copy_from(fileName, 'table_name', sep=",")
con.commit()
con.close()
为了获得额外的速度提升,由于您使用的是 EMR 集群,您可以利用 python 多处理来一次复制多个文件。
from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
print(p.map(execute_copy, [file,file1]))
这是推荐的方法,因为由于连接限制,spark JDBC 无法调整以获得更高的写入速度。
方法2:由于您已经在使用 AWS EMR 集群,因此您始终可以利用 hadoop 功能来更快地执行表写入。
因此,在这里我们将使用 sqoop export 将数据从 emrfs 导出到 postgres 数据库。
#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16
#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16
为什么是sqoop?
因为sqoop根据指定的mapper数量打开与数据库的多个连接。因此,如果您将 -m 指定为 8,则将有 8 个并发连接流,这些连接流会将数据写入 postgres。
另外,有关使用 sqoop 的更多信息,请参阅此AWS Blog https://aws.amazon.com/blogs/big-data/use-sqoop-to-transfer-data-from-amazon-emr-to-amazon-rds/, SQOOP注意事项 https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-sqoop-considerations.html and SQOOP文档 https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_literal_sqoop_export_literal.
如果您可以用代码破解您的方法,那么方法 1 肯定会给您带来您所寻求的性能提升,如果您对 SQOOP 等 hadoop 组件感到满意,那么请使用第二种方法。
希望能帮助到你!