Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB

2023-12-30

我希望我的 Spark 应用程序从 DynamoDB 读取表,执行操作,然后将结果写入 DynamoDB。

将表读入 DataFrame

现在,我可以将表从 DynamoDB 读入 SparkhadoopRDD并将其转换为 DataFrame。但是,我必须使用正则表达式来提取值AttributeValue。有更好/更优雅的方式吗?在 AWS API 中找不到任何内容。

package main.scala.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap

import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable

object Tester {

  // {S: 298905396168806365,} 
  def extractValue : (String => String) = (aws:String) => {
    val pat_value = "\\s(.*),".r

    val matcher = pat_value.findFirstMatchIn(aws)
                matcher match {
                case Some(number) => number.group(1).toString
                case None => ""
        }
  }


   def main(args: Array[String]) {
    val spark = SparkSession.builder().getOrCreate()
    val sparkContext = spark.sparkContext

      import spark.implicits._

      // UDF to extract Value from AttributeValue 
      val col_extractValue = udf(extractValue)

  // Configure connection to DynamoDB
  var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
      jobConf_add.set("dynamodb.input.tableName", "MyTable")
      jobConf_add.set("dynamodb.output.tableName", "MyTable")
      jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
      jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")


      // org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
      var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

      // Convert HadoopRDD to RDD
      val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
      case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
      }

      // Convert RDD to DataFrame and extract Values from AttributeValue
      val df_add = rdd_add.toDF()
                  .withColumn("PIN", col_extractValue($"_1"))
                  .withColumn("Address", col_extractValue($"_2"))
                  .select("PIN","Address")
   }
}

将 DataFrame 写入 DynamoDB

stackoverflow 和其他地方的许多答案仅指向博客文章 https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/emr-dynamodb-hadoop github https://github.com/awslabs/emr-dynamodb-connector。这些资源都没有实际演示如何写入 DynamoDB。

我尝试转换 https://stackoverflow.com/questions/43248940/how-to-convert-dataframe-in-spark-to-hadooprdd my DataFrame to RDD[Row]不成功。

df_add.rdd.saveAsHadoopDataset(jobConf_add)

将此 DataFrame 写入 DynamoDB 的步骤是什么? (如果你告诉我如何控制的话,奖励积分overwrite vs putItem ;)

Note: df_add具有相同的架构MyTable在 DynamoDB 中。

EDIT: 我正在遵循以下建议这个答案 https://stackoverflow.com/questions/35733968/simple-rdd-write-to-dynamodb-in-spark指向这篇文章使用 Spark SQL 进行 ETL https://aws.amazon.com/blogs/big-data/using-spark-sql-for-etl/:

// Format table to DynamoDB format
  val output_rdd =  df_add.as[(String,String)].rdd.map(a => {
    var ddbMap = new HashMap[String, AttributeValue]()

    // Field PIN
    var PINValue = new AttributeValue() // New AttributeValue
    PINValue.setS(a._1)                 // Set value of Attribute as String. First element of tuple
    ddbMap.put("PIN", PINValue)         // Add to HashMap

    // Field Address
    var AddValue = new AttributeValue() // New AttributeValue
    AddValue.setS(a._2)                 // Set value of Attribute as String
    ddbMap.put("Address", AddValue)     // Add to HashMap

    var item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
  })             

  output_rdd.saveAsHadoopDataset(jobConf_add) 

然而,现在我得到了java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text尽管遵循文档...您有什么建议吗?

EDIT 2: 仔细阅读这篇文章使用 Spark SQL 进行 ETL https://aws.amazon.com/blogs/big-data/using-spark-sql-for-etl/:

获得 DataFrame 后,执行转换以获得与 DynamoDB 自定义输出格式知道如何编写的类型相匹配的 RDD。自定义输出格式需要一个包含 Text 和DynamoDBItemWritable types.

考虑到这一点,下面的代码正是 AWS 博客文章所建议的,除了我强制转换output_df否则作为 rddsaveAsHadoopDataset不起作用。现在,我得到了Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience。我已是穷途末路了!

      // Format table to DynamoDB format
  val output_df =  df_add.map(a => {
    var ddbMap = new HashMap[String, AttributeValue]()

    // Field PIN
    var PINValue = new AttributeValue() // New AttributeValue
    PINValue.setS(a.get(0).toString())                 // Set value of Attribute as String
    ddbMap.put("PIN", PINValue)         // Add to HashMap

    // Field Address
    var AddValue = new AttributeValue() // New AttributeValue
    AddValue.setS(a.get(1).toString())                 // Set value of Attribute as String
    ddbMap.put("Address", AddValue)     // Add to HashMap

    var item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
  })             

  output_df.rdd.saveAsHadoopDataset(jobConf_add)   

我跟踪了“Using Spark SQL for ETL”链接,发现了相同的“非法循环引用”异常。 该异常的解决方案非常简单(但我花了 2 天时间才弄清楚),如下所示。关键点是在数据帧的 RDD 上使用映射函数,而不是数据帧本身。

val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "<myTableName>")
ddbConf.set("dynamodb.throughput.write.percent", "1.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")


val df_ddb =  spark.read.option("header","true").parquet("<myInputFile>")
val schema_ddb = df_ddb.dtypes

var ddbInsertFormattedRDD = df_ddb.rdd.map(a => {
    val ddbMap = new HashMap[String, AttributeValue]()

    for (i <- 0 to schema_ddb.length - 1) {
        val value = a.get(i)
        if (value != null) {
            val att = new AttributeValue()
            att.setS(value.toString)
            ddbMap.put(schema_ddb(i)._1, att)
        }
    }

    val item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
}
)

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB 的相关文章

  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • 为什么 Databricks Connect Test 无法在 Mac 上运行?

    我已经阅读了配置文档databricks connect但运行时仍然出现以下错误databricks connect test 来自终端的错误 java lang NoSuchMethodError org apache spark int
  • 逆变方法参数类型

    wiki 逆变方法参数类型 https en wikipedia org wiki Covariance and contravariance 28computer science 29 Contravariant method argum
  • 宏:knownDirectSubclasses 被嵌套类型破坏?

    我有一个宏 它枚举密封特征的直接子类型 import scala reflect macros Context import language experimental macros object Checker def apply A U
  • Scala Array.apply 有何魔力

    来自 scala 2 10 4 的 array scala Array定义为 final class Array T length Int extends java io Serializable with java lang Clonea
  • 最小重复子串

    我正在看 Perl代码高尔夫页面 http www perlmonks org node id 82878 不要问为什么 并遇到了这个 第 3 洞 最小重复图案 编写一个子例程 它接受一个字符串 该字符串可能包含 重复模式 并返回最小的重复
  • Spark SQL如何读取压缩的csv文件?

    我尝试过使用apispark read csv读取带有扩展名的压缩 csv 文件bz or gzip 有效 但在源代码中我没有找到任何可以声明的选项参数codec type 即使在这个link https github com databr
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 如何在 Scala 中打印任何内容的列表?

    目前我有一个打印整数的方法 def printList args List Int Unit args foreach println 我如何修改它 使其足够灵活 可以打印任何内容的列表 您不需要专用的方法 所需的功能已经在集合类中 pri
  • 为什么《Scala 中的函数式编程》一书的“无异常处理错误”一章中没有提到“scala.util.Try”?

    在 Scala 中的函数式编程 一书中的 无异常处理错误 一章中 作者给出 从函数体抛出异常的问题 Use Option如果我们不关心实际的异常 Use Either如果我们关心实际的异常 But scala util Try没有提到 从我
  • 在 Spark 2.1.0 中启用 _metadata 文件

    Spark 2 1 0 中保存空 Parquet 文件似乎已损坏 因为无法再次读入它们 由于模式推断错误 我发现从 Spark 2 0 开始 写入 parquet 文件时默认禁用写入 metadata 文件 但我找不到重新启用此功能的配置设
  • Scala:如何在超类上实现克隆方法,并在子类中使用它?

    我可能会以错误的方式处理这个问题 但我想要一个像这样的对象 class MyDataStructure def myClone val clone new MyDataStructure do stuff to make clone the
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF
  • Scala 解析器组合器的运算符优先级

    我正在研究需要考虑运算符优先级的解析逻辑 我的需求并不太复杂 首先 我需要乘法和除法比加法和减法具有更高的优先级 例如 1 2 3 应视为 1 2 3 这是一个简单的例子 但你明白了 我需要将更多自定义标记添加到优先级逻辑中 我可以根据此处
  • 在 Spark 中计算逻辑回归系数的标准误差

    我知道这个问题之前已经被问过here https stackoverflow com questions 37816701 calculating standard error of estimate wald chi square sta

随机推荐

  • 哪些语言特性不能用 lambda 来定义?

    看起来 lambda 几乎可以用于任何事情 即使它看起来更复杂 但它确实有其局限性 lambda 未涵盖哪些用例 lambda 即函数 本身并不是很有趣 这是 JavaScript 中的一个函数 function id x return x
  • 如何在magento中获取特定页面的URL

    我想在 Magento 中获取页面的 URL 键 例如 我有一个名为 What s New 的 CMS 页面 其标识符 或 URL 键 为 whats new 因此它的正确 URL 是http mysite com whats new 目前
  • Gem 未安装,显示连接错误

    当我在创建新的 gemset 后尝试安装像捆绑器或 rake 这样的 gem 时 它没有安装 但如果我安装旧版本 它对我来说工作正常 这是安装gem时的错误日志 gem install rake Fetching rake 0 9 2 2
  • 如何使用 Nexus One 运行层次结构视图

    我正在尝试使用层次结构查看器在 Nexus One 上调试 Android 应用程序 我在桌面上启动层次结构视图 在 设备 下 我看到我的手机 但是当我单击 启动服务器 时 我看到 版本 2 协议 2 当我启动层次结构查看器时 终端上的 v
  • webpack 创建 CSS 组件范围

    我目前正在学习如何使用 webpack 并且在 CSS 文件方面遇到了一些困难 基本上 我使用 AngularJs 1 5 和 TypeScript 创建两个组件 它们每个都有一个模板 其中包含带有 button 类的 div 元素 每个组
  • onClick 使用 jQuery .animate 转到页面底部

    我有一个表 其中最后一列包含操作按钮 可在表下打开另一个部分 当该部分打开时 正文页面保留在按下操作列中的按钮的位置 我需要使用 jQuery 最好使用 animate 它将 html 页面滚动到表格下打开的部分 小提琴示例 http js
  • 如何在PHP中输出简单的ascii表?

    我有一些数据 例如 Array 0 gt Array a gt largeeeerrrrr b gt 0 c gt 47 d gt 0 1 gt Array a gt bla b gt 1 c gt 0 d gt 0 2 gt Array
  • 在 VB6 IDE 中工作时卸载 COM 控件

    我日常工作的一部分是维护和扩展遗留的 VB6 应用程序 通用引擎是用 C C 编写的 VB6 使用这些函数来提高性能 当谈到异步编程时 C 接口是不够的 我们依靠 COM 控件来向 VB6 触发事件 我的问题是 当我在 VB6 中注册该控件
  • 如何区分缺少的反序列化字段和空字段?

    我想用Serde https serde rs 将一些 JSON 解析为 HTTP PATCH 请求的一部分 由于 PATCH 请求不传递整个对象 仅传递要更新的相关数据 因此我需要能够区分未传递的值和显式设置为的值null 以及存在的值
  • 使用 Office 365 登录/凭据作为单点登录

    我可以使用 Office 365 登录 凭据作为单点登录吗 实际上 我需要使用 Office 365 登录详细信息登录我的 Web 应用程序 此外 我想在我的 Web 应用程序中使用以下 url 凭据进行单点登录 https portal
  • 通用相关类型可能寿命不够长

    采取以下示例 feature generic associated types allow incomplete features trait Produce type CustomError lt a gt fn produce lt a
  • 为什么 UINavigationBar 背景图像重复?

    图像尺寸为 640 X 44 适用于 iPad 肖像 由于某种原因 它显示为图案图像而不是拉伸的 iOS 6 尝试以下方法来拉伸图像 load the background image navbar png UIImage imageNav
  • Laravel 5.4,重命名用户表列

    所以今天我尝试修改我的 laravel 项目中的默认身份验证 首先 Composer 1 4 2 和 Laravel 5 4 27 也意味着所有依赖项 都是最新的 我用以下方法验证了这一点 composer self update comp
  • 专为 iOS7 设计的 Storyboard 在 4 英寸设备上的 iOS 6 上无法全屏显示

    我正在设计我的 iPhone 应用程序storyboard and auto layout 一切正常iOS 7在 4 英寸和 3 5 英寸设备上 On iOS 6 1该应用程序始终在 3 5 英寸设备上运行 即使在 4 英寸设备上运行 也会
  • kubernetes pod 内存 - java gc 日志

    在 kubernetes 仪表板上 有一个 pod 其中内存使用情况 字节 显示为904 38Mi 该 Pod 包含运行的 Java 应用程序 Xms512m Xmx1024m 以及 kubernetes 部署文件 gt requests
  • 动画 CALayer 背景颜色并更新模型值

    我想要制作动画backgroundColor更改我的 UIView 中的子层 在tintColorDidChange 我需要多次从图层的当前背景颜色到新的色调颜色进行动画处理 每次使用不同的色调颜色 因此背景颜色的模型值需要更新 我不能使用
  • 如何在兼容浏览器的 JavaScript 中撤消和重做事件?

    我有一个 T 恤定制设计软件工具 必须为可拖动的文本添加重做和撤消事件 http wordpress tshirtecommerce com design online product id 17 http wordpress tshirt
  • PDF 文本和坐标解析 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我目前正在使用 PDF Box 来解析 pdf 并试图弄清楚如何检索有关文本的数据 例如字体 粗体 大
  • 除非以管理员身份运行,否则 stack ghci 无法加载 pthread

    当我跑步时stack ghci在一个全新的项目上 它无法加载 stack new repro simple Downloading template simple to create project repro in repro 省略附加输
  • Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB

    我希望我的 Spark 应用程序从 DynamoDB 读取表 执行操作 然后将结果写入 DynamoDB 将表读入 DataFrame 现在 我可以将表从 DynamoDB 读入 SparkhadoopRDD并将其转换为 DataFrame