如何加速spark df.write jdbc到postgres数据库?

2024-05-18

我是 Spark 新手,正在尝试使用 df.write 加速将数据帧的内容(可以有 200k 到 2M 行)附加到 postgres 数据库:

df.write.format('jdbc').options(
      url=psql_url_spark,
      driver=spark_env['PSQL_DRIVER'],
      dbtable="{schema}.{table}".format(schema=schema, table=table),
      user=spark_env['PSQL_USER'],
      password=spark_env['PSQL_PASS'],
      batchsize=2000000,
      queryTimeout=690
      ).mode(mode).save()

我尝试增加批量大小,但这并没有帮助,因为完成这项任务仍然需要大约 4 小时。我还在下面添加了来自 aws emr 的一些快照,显示了有关作业运行方式的更多详细信息。将数据帧保存到 postgres 表的任务仅分配给一个执行器(我觉得很奇怪),加速这一过程是否需要在执行器之间划分此任务?

另外,我读过Spark 的性能调优文档 https://spark.apache.org/docs/2.2.1/sql-programming-guide.html#performance-tuning但增加batchsize, and queryTimeout似乎并没有提高性能。 (我尝试打电话df.cache()在我之前的脚本中df.write,但脚本的运行时间仍然是 4 小时)

此外,我的 aws emr 硬件设置和spark-submit are:

主节点(1):m4.xlarge

核心节点(2):m5.xlarge

spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...

Spark 是一个分布式数据处理引擎,因此当您处理数据或将其保存在文件系统上时,它会使用其所有执行器来执行任务。 Spark JDBC 速度很慢,因为当您建立 JDBC 连接时,执行器之一会建立到目标数据库的链接,从而导致速度缓慢和失败。

要解决此问题并加快数据写入数据库的速度,您需要使用以下方法之一:

方法一:

在这种方法中,您需要使用 postgres复制命令实用程序以加快写操作的速度。这需要你有psycopg2EMR 集群上的库。

COPY 实用程序的文档是here https://www.postgresql.org/docs/current/sql-copy.html

如果您想了解基准差异以及为什么复制速度更快,请访问here https://stackoverflow.com/questions/46715354/how-does-copy-work-and-why-is-it-so-much-faster-than-insert!

Postgres 还建议使用 COPY 命令进行批量插入。现在如何批量插入 Spark 数据框。 现在,为了实现更快的写入速度,首先将 Spark 数据帧以 csv 格式保存到 EMR 文件系统,并重新分区输出,以便没有文件包含超过 100k 行。

#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)

现在使用 python 读取文件并对每个文件执行复制命令。

import psycopg2    
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')

#define a function
def execute_copy(fileName):
    con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
    cursor = con.cursor()
    cursor.copy_from(fileName, 'table_name', sep=",")
    con.commit()
    con.close()

为了获得额外的速度提升,由于您使用的是 EMR 集群,您可以利用 python 多处理来一次复制多个文件。

from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
        print(p.map(execute_copy, [file,file1]))

这是推荐的方法,因为由于连接限制,spark JDBC 无法调整以获得更高的写入速度。

方法2:由于您已经在使用 AWS EMR 集群,因此您始终可以利用 hadoop 功能来更快地执行表写入。 因此,在这里我们将使用 sqoop export 将数据从 emrfs 导出到 postgres 数据库。

#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

为什么是sqoop? 因为sqoop根据指定的mapper数量打开与数据库的多个连接。因此,如果您将 -m 指定为 8,则将有 8 个并发连接流,这些连接流会将数据写入 postgres。

另外,有关使用 sqoop 的更多信息,请参阅此AWS Blog https://aws.amazon.com/blogs/big-data/use-sqoop-to-transfer-data-from-amazon-emr-to-amazon-rds/, SQOOP注意事项 https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-sqoop-considerations.html and SQOOP文档 https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_literal_sqoop_export_literal.

如果您可以用代码破解您的方法,那么方法 1 肯定会给您带来您所寻求的性能提升,如果您对 SQOOP 等 hadoop 组件感到满意,那么请使用第二种方法。

希望能帮助到你!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何加速spark df.write jdbc到postgres数据库? 的相关文章

  • PostgreSQL Age() 函数:在不同月份登陆时出现不同/意外的结果

    今天 我在 PostgreSQL 9 6 中运行此查询时遇到了无法解释的结果 SELECT age 2018 06 30 2018 05 19 AS one age 2018 07 01 2018 05 20 AS two 两列的预期结果
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 数据库错误:值对于类型字符变化来说太长(100)

    我有一个 Django 网站 运行我们几年前在内部构建的迷你 CMS 它使用 postgresql 保存简单的标题和一段文本时 出现以下错误 value too long for type character varying 100 奇怪的
  • 为什么 Spark 没有使用本地计算机上的所有核心

    当我在 Spark Shell 中或作为作业运行一些 Apache Spark 示例时 我无法在单台计算机上实现完全的核心利用率 例如 var textColumn sc textFile home someuser largefile t
  • SPARK SQL - 当时的情况

    我是 SPARK SQL 的新手 SPARK SQL 中是否有相当于 CASE WHEN CONDITION THEN 0 ELSE 1 END 的内容 select case when 1 1 then 1 else 0 end from
  • 是否有可能在 postgres 中捕获外键违规

    我正在尝试将数据插入具有外键约束的表中 如果我插入的行中存在约束违规 我想丢弃该数据 问题是每次我违反约束时 postgres 都会返回一个错误 我是否可以在插入语句中添加一些语句 例如 ON FOREIGN KEY CONSTRAINT
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • 如何使用PostGIS将多边形数据转换为线段

    我在 PostgreSQL PostGIS 中有一个多边形数据表 现在我需要将此多边形数据转换为其相应的线段 谁能告诉我如何使用 PostGIS 查询进行转换 提前致谢 一般来说 将多边形转换为线可能并不简单 因为没有一对一的映射 http
  • 更改迁移中的自动​​增量值(PostgreSQL 和 SQLite3)

    我有一个托管在 Heroku 上的项目 想要更改表的自动增量起始值 我在本地使用 SQLite3 Heroku 使用 PostgreSQL 这是我在迁移中所拥有的 class CreateMytable lt ActiveRecord Mi
  • Spark Scala 将列从一个数据帧复制到另一个数据帧

    我有一个原始数据框的修改版本 我在其上进行了聚类 现在我想将预测列恢复为原始 DF 索引没问题 因此匹配 我该怎么做 使用这段代码我得到一个错误 println Predicted dfWithOutput show println Ori
  • 如何使用 typeorm 在 postgres 中保存 json 对象数组

    我正在尝试在 postgres 中保存 jsonb 类型的对象数组 Entity Column type jsonb array true nullable true testJson object 我在邮递员中发送的json testJs
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • postgreSql 中特定时间后表更新

    我已经在 postgres 中创建了表 现在我想在特定时间 例如 1 小时 后更新一行 我看到很多问题 例如 https dba stackexchange com questions 56424 column auto updated a
  • wal_keep_segments 为什么是最小值而不是最大值?

    根据docs http www postgresql org docs current static runtime config replication html wal keep segments integer 指定过去日志的最小数量
  • 无法“安装”plpython3u - postgresql

    我正在尝试在 postgresql 中使用 python 语言 像这样的事情 create or replace function test a integer returns integer as if a 2 0 return even
  • 如何使用 SparkR 1.6.0 写入 JDBC 源?

    使用 SparkR 1 6 0 我可以使用以下代码从 JDBC 源读取数据 jdbc url lt jdbc mysql localhost 3306 dashboard user
  • Spark KMeans 无法处理大数据吗?

    KMeans 有几个参数training http spark apache org docs latest api python pyspark mllib html highlight kmeans pyspark mllib clus
  • 比较两个 postgres 转储文件

    如何比较 postgres 转储文件 我有两个转储文件 dump1 和 dump2 我想比较这两个转储文件 任何帮助将不胜感激 谢谢 如果使用 Windows 则可以使用 Beyond Compare 如果使用 linux fedora 则
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • Postgres 中的输出 Inserted.id 等效项

    我是 PostgreSQL 新手 正在尝试将 mssql 脚本转换为 Postgres 对于合并语句 我们可以使用冲突更新插入或不执行任何操作 但我使用下面的语句 不确定这是否是正确的方法 MSSQL代码 Declare tab2 New

随机推荐

  • 将 python2.7 与 Emacs 24.3 和 python-mode.el 一起使用

    我是 Emacs 新手 我正在尝试设置我的 python 环境 到目前为止 我已经了解到在 python 缓冲区中使用 python mode el C c C c将当前缓冲区的内容加载到交互式 python shell 中 显然使用了什么
  • jquery 插件“uploadify”-从上传脚本返回响应的方法?

    我的标题代码 document ready function sampleFile uploadify uploader include uploadify uploadify swf script add list php scriptD
  • Android 中的列表视图分页

    我有一个列表视图 其中显示了 50 个元素 我决定对视图进行分页 以便视图的每个部分都有 10 个元素 然后单击 下一个 按钮以获取下一个 10 个元素 如何设置10个数据 我关注这篇文章http rakhi577 wordpress co
  • Java-如何将黑白图像加载到二进制中?

    我在 FSE 模式下使用 Java 和 swing 我想将完全黑白图像加载为二进制格式 最好是二维数组 并将其用于基于掩码的每像素碰撞检测 我什至不知道从哪里开始 过去一个小时我一直在研究 但没有找到任何相关的东西 只需将其读入Buffer
  • Nodejs 调试生产中的错误

    我有一个在生产环境中运行的 Nodejs 脚本 我不太可能 千分之一 遇到这样的错误 TypeError value is out of bounds at checkInt buffer js 1009 11 at Buffer writ
  • 是通过指针传递的吗?

    void func char buf buf 我应该称之为按指针传递还是仅按值传递 值是指针类型 在这种情况下 传入的原始指针会被更改吗 这是按值传递 void func char b b new char 4 int main char
  • QSerialPort 中的 readAll() 不包括最后发送的响应

    我正在使用 Qt 来控制串行设备 如果我向串行设备发送命令 我会执行类似的操作serial gt write command r n 我制作了一个按钮 它将纯文本小部件内的文本更改为串行端口的响应 为了获得串口的响应 我使用serial g
  • Cassandra CQL v3.0 和复合类型

    我正在浏览以下文档CQLv3 0 http www datastax com docs 1 1 references cql index 我们是否应该在更新中指定复合键并选择 a b 1 以防万一comparator or key vali
  • Quantmod 的简单功能不再起作用

    我明天要交论文 我收到了一条关于 quantmod 的非常奇怪的错误消息 这是我在过去几周使用这个包时从未遇到过的 我无法导入特定于道琼斯指数 DJI 的数据 我收到以下错误消息 getSymbols DJI src yahoo from
  • 列出 R 数据文件的内容而不加载

    我有时用print load myDataFile RData 当我加载数据文件时列出它的内容 有没有办法列出内容而不加载数据文件中包含的对象 我认为如果不加载对象就无法做到这一点 解决方案可能是使用包装器将 R 对象保存到save 该函数
  • 独立滚动矩阵的行

    我有一个矩阵 准确地说 是 2d numpy ndarray A np array 4 0 0 1 2 3 0 0 5 我想滚动每一行A根据另一个数组中的滚动值独立地 r np array 2 0 1 也就是说 我想这样做 print np
  • 为什么这行带有“await”的代码会触发微任务队列处理?

    以下引用是我理解微任务队列处理的主要参考 当 JS 堆栈清空时 就会处理微任务 承诺使用 杰克 阿奇博尔德 https twitter com jaffathecake status 954653170986311680 这对我来说没有意义
  • 将按钮文本放在一行上

    我的按钮文本在 safari 中显示在一行上 即使在初次单击后 但是在 google chrome 上 当您第一次到达该按 钮时 我的按钮将显示在一行上 但是当您浏览更多帖子并再次遇到 加载更多 按钮时 文本搞砸了 这只发生在谷歌浏览器上
  • 使用 NewtonSoft 在一行中生成 JSON 对象

    我正在使用 JSON 库牛顿软件 http nuget org packages newtonsoft json生成 JSON 字符串 JObject out JObject FromObject new typ photos return
  • Heroku Rails 应用程序级别不记录日志

    我在 Heroku 上有一个 Rails 应用程序 它没有在应用程序级别进行日志记录 当前版本 红宝石1 9 3 导轨3 1 3 在 config environment development rb 中有以下几行 config logge
  • 防止exe文件上传到网站

    有人可以告诉我如何防止 exe 文件在网站上上传 即使 exe 文件位于 zip 文件内 新文件夹中的 exe 文件 然后压缩并上传新文件夹 允许用户上传文件 如果是 ZIP 并通过解压存档并评估其内容来进行服务器端检查
  • JavaScript 将键添加到数组中的每个值

    我下面有这个数组 它由一个简单的数组组成 我想要完成的是放一把钥匙id在每个数组值前面以实现类似的效果 id a id b id c id d 有没有一种简单的方法可以做到这一点 任何帮助将不胜感激 谢谢 var test a b c d
  • 使用Python请求登录Google帐户

    在多个登录页面上 需要谷歌登录才能继续 我想用requestspython 中的库以便让我自己登录 通常这很容易使用requests库 但是我无法让它工作 我不确定这是否是由于 Google 做出的一些限制 也许我需要使用他们的 API 或
  • 将字符串解析为 argv/argc

    C 中是否有一种方法可以解析一段文本并获取 argv 和 argc 的值 就像文本已传递到命令行上的应用程序一样 这不必在 Windows 上工作 只需在 Linux 上工作 我也不关心参数的引用 我很惊讶没有人使用标准 POSIX 功能提
  • 如何加速spark df.write jdbc到postgres数据库?

    我是 Spark 新手 正在尝试使用 df write 加速将数据帧的内容 可以有 200k 到 2M 行 附加到 postgres 数据库 df write format jdbc options url psql url spark d