从 apache Spark 读取/写入 dynamo 数据库 [关闭]

2024-04-24

我想知道是否有任何 Java 库支持从 apache Spark(Mesos) 读取/写入 dynamo db (AWS),我知道根据本文有一些库支持 EMR Sparkhttps://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/ https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/。请指教。

谢谢 普拉迪普


您可以使用以下命令从 DynamoDB 表中读取项目或将项目写入其中阿帕奇火花 and emr-dynamodb-连接器图书馆。要读取数据,您可以使用javaSparkContext.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);以及将数据写入 DynamoDB:javaPairRDD.saveAsHadoopDataset(jobConf);。以下是一个示例(适用于 EMR 和非 EMR 环境):

public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf()
            .setAppName("DynamoDBApplication")
            .setMaster("local[4]")
            .registerKryoClasses(new Class<?>[]{
                    Class.forName("org.apache.hadoop.io.Text"),
                    Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
            });

    JavaSparkContext sc = new JavaSparkContext(conf);

    JobConf jobConf = getDynamoDbJobConf(sc, "TableNameForRead", "TableNameForWrite");

    // read all items from DynamoDB table with name TableNameForRead
    JavaPairRDD<Text, DynamoDBItemWritable> javaPairRdd = sc.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);
    System.out.println("count: " + javaPairRdd.count());

    // process data in any way, below is just a simple example
    JavaRDD<Map<String, AttributeValue>> javaRDD = javaPairRdd.map(t -> {
        DynamoDBItemWritable item = t._2();
        Map<String, AttributeValue> attrs = item.getItem();
        String hashKey = attrs.get("key").getS();
        Long result = Long.valueOf(attrs.get("resultAttribute").getN());
        System.out.println(String.format("hashKey=%s, result=%d", hashKey, result));
        return attrs;
    });
    System.out.println("count: " + javaRDD.count());

    // update JavaPairRdd in order to store it to DynamoDB, below is just a simple example with updating hashKey
    JavaPairRDD<Text, DynamoDBItemWritable> updatedJavaPairRDD = javaPairRdd.mapToPair(t -> {
        DynamoDBItemWritable item = t._2();
        Map<String, AttributeValue> attrs = item.getItem();
        String hashKey = attrs.get("key").getS();
        String updatedHashKey = hashKey + "_new";
        attrs.get("key").setS(updatedHashKey);
        return new Tuple2<>(t._1(), item);
    });

    // write items to DynamoDB table with name TableNameForWrite
    updatedJavaPairRDD.saveAsHadoopDataset(jobConf);

    sc.stop();
}


private static JobConf getDynamoDbJobConf(JavaSparkContext sc, String tableNameForRead, String tableNameForWrite) {
    final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
    jobConf.set("dynamodb.servicename", "dynamodb");

    jobConf.set("dynamodb.input.tableName", tableNameForRead);
    jobConf.set("dynamodb.output.tableName", tableNameForWrite);

    jobConf.set("dynamodb.awsAccessKeyId", "YOUR_AWS_ACCESS_KEY");
    jobConf.set("dynamodb.awsSecretAccessKey", "YOUR_AWS_SECRET_KEY");
    jobConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
    jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
    jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");

    return jobConf;
}

要运行此代码,您需要以下 Maven 依赖项:

<dependencies>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_2.10</artifactId>
        <version>2.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-emr</artifactId>
        <version>1.11.113</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-dynamodb</artifactId>
        <version>1.11.113</version>
    </dependency>

    <!-- https://github.com/awslabs/emr-dynamodb-connector -->
    <dependency>
        <groupId>com.amazon.emr</groupId>
        <artifactId>emr-dynamodb-hadoop</artifactId>
        <version>4.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.8.0</version>
    </dependency>

</dependencies>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 apache Spark 读取/写入 dynamo 数据库 [关闭] 的相关文章

随机推荐

  • 每当应用程序运行或不运行时显示通知

    在我的程序中 无论应用程序是否运行 都必须激活通知 我应该将通知方法放在 onCreate 中吗 我的通知就像闹钟一样 请稍微检查一下 public String getCurrentTime Calendar c Calendar get
  • 如何在最新的azure webjob 3.03中指定AzureWebJobsStorage

    我将旧的 azure webjob 代码更新为打包到 3 03 然后它就不起作用了 我设法修复了所有编译时错误 但在本地运行时 它会抛出以下错误 Microsoft Azure WebJobs Host Indexers FunctionI
  • 带有自定义 json 数据的 JsTree

    我在 json 中有这个结构 无法根据请求进行修改 Object url http www google com id 1 name Redirection Rule Object frequency 1 trigger 1 Object
  • 如何使用 Express 在 NodeJS 中的 GET 请求中发出 GET 请求

    基本上 我试图在回调 GET 方法中从 Facebook 获取访问令牌 下面是我的代码 getAccessToken根本没有被调用 正确的实施方法是什么 app get fbcallback function req res var cod
  • React - setState 不更新值

    我正在尝试使用 DidMount 中的 localStorage 值更新状态 但它没有更新 type Props type State id evaluation string class Evaluation extends Compon
  • 在c#中查找编译类的源文件

    我正在寻找一组已编译的 net 程序集中特定类的关联源文件 e g MyAsm Namespace Foo gt C Source foo cs MyAsm Namespace Bar gt C Source Code MoreCode C
  • 如何使用 Google Apps 脚本限制文件的复制/下载/打印访问

    有没有人找到一种方法来限制使用谷歌应用程序脚本复制 下载 打印电子表格的访问权限 背景信息 我创建了一个使用 setShareableByEditors false 限制编辑者共享权限的脚本 唯一的问题是编辑者仍然可以轻松地复制电子表格 然
  • Rails 4 link_to 更大的静态图像

    我的文件存储在app assets images subdirectory image png and app assets images subdirectory image full png In my app views home h
  • lseek() 的复杂度是 O(1) 吗?

    我知道我的问题在这里有答案 QFile 寻道性能 https stackoverflow com questions 6171403 qfile seek performance 但我对这个答案并不完全满意 即使在查看了以下实现之后gene
  • 是否可以将jsp预编译到eclipse中?

    标题很简单 我想知道是否有可能直接在eclipse中看到编译好的jsp 生成的servlet 无需部署到任何服务器上 如果您使用 JSP 我建议购买我的Eclipse http www myeclipseide com 因为它可以编译 JS
  • 显示表格单元格不一致。

    嘿 我想知道为什么会发生这种情况 http jsfiddle net dSVGF http jsfiddle net dSVGF 按钮尚未填充容器 锚确实如此 有什么本质上的不同 两个标签之间的风格 div class table a hr
  • 如何在 SQL 中替换 PIVOT 中的 Null 值

    我有以下代码 我试图用零替换使用枢轴时出现的 Null 我执行了以下操作 但它说 ISNULL 附近的语法不正确 我不确定我做错了什么 有什么建议请 select from tempfinaltable pivot ISNULL sum T
  • 无法更新 android studio 3.1:配置冲突:同步项目期间“armeabi-v7a,x86”

    这是我的构建 gradle 应用程序 文件 apply plugin com android application apply plugin io fabric apply plugin checkstyle def versions a
  • Angular2 ngNoForm 还可以进行角度形式验证

    我有一个遗留后端服务器 它将表单数据作为请求参数进行处理 我们将 angular2 放在前端 我想提交 angular2 表单 以便所有字段都作为请求参数 这样就不必更改旧后端 为此 我有
  • 捆绑安装不起作用

    我正在 Windows 上开发 Ruby on Rails 我们的本地网络出现问题 无法访问https www rubygems org https www rubygems org 好像被屏蔽了什么的 但我可以通过访问它http www
  • Ruby on Rails 3:Devise::LdapAdapter.get_ldap_param 未定义方法错误

    我在跑步 红宝石 1 9 3p0 轨道 3 1 1 设计1 4 9 Devise ldap authenticatable 0 4 10 我正在使用 Devise 通过 LDAP 服务器验证我的 Rails 应用程序 我使用用户名而不是电子
  • Is Type 和 Is Type(object, object) 抛出 TypeException

    我试图断言方法调用返回的对象属于以下类型List
  • EC2 t2.medium 可爆发信用“储蓄”计算

    我正在使用 T2 medium 实例 一天的三分之一的时间我都在做密集的统计计算 并计算出剩下的 2 3 的时间我将以每小时 24 小时的速度 赚取 学分 但这并没有发生 这是我这两天的使用情况 这是我的信用账户 直到昨天下午 6 点我已经
  • 在 Ruby 中模拟 int64 溢出

    我是一名资深程序员 但对 Ruby 还很陌生 我正在尝试移植一种名为 CheckRevision 的算法 用于在登录 Battle net 的在线游戏服务之前检查游戏文件的完整性 该算法使用给定的公式对文件进行 哈希 没有无聊的细节 而是不
  • 从 apache Spark 读取/写入 dynamo 数据库 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想知道是否有任何 Java 库支持从 apache Spark Mesos 读取 写入 dynamo