HDFS 自定义实现函数将文件追加到末尾的问题

2023-05-16

HDFS 自定义实现函数将文件追加到末尾的问题:

在这里插入图片描述

一、实验环境:

  • Ubuntu16.04
  • Hadoop2.7.1 伪分布式(只有一个DN
  • Eclipse

二、解决方案

Java代码:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 判断路径是否存在
     */
    public static boolean test(Configuration conf, String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 复制文件到指定路径
     * 若路径已存在,则进行覆盖
     */
    public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
        fs.copyFromLocalFile(false, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 追加文件内容
     */
    public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        /* 创建一个文件读入流 */
        FileInputStream in = new FileInputStream(localFilePath);
        /* 创建一个文件输出流,输出的内容将追加到文件末尾 */
        FSDataOutputStream out = fs.append(remotePath);
        /* 读写文件内容 */
        byte[] data = new byte[1024];
        int read = -1;
        while ( (read = in.read(data)) > 0 ) {
            out.write(data, 0, read);
        }
        out.close();
        in.close();
        fs.close();
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    	conf.set("fs.default.name","hdfs://localhost:9000");
        String localFilePath = "/home/hadoop/text.txt";    // 本地路径
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS路径
        String choice = "append";    // 若文件存在则追加到文件末尾
//      String choice = "overwrite";    // 若文件存在则覆盖

        try {
            /* 判断文件是否存在 */
            Boolean fileExists = false;
            if (HDFSApi.test(conf, remoteFilePath)) {
                fileExists = true;
                System.out.println(remoteFilePath + " 已存在.");
            } else {
                System.out.println(remoteFilePath + " 不存在.");
            }
            /* 进行处理 */
            if ( !fileExists) { // 文件不存在,则上传
                HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath);
                System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
            } else if ( choice.equals("overwrite") ) {    // 选择覆盖
                HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath);
                System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
            } else if ( choice.equals("append") ) {   // 选择追加
                HDFSApi.appendToFile(conf, localFilePath, remoteFilePath);
                System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

报错信息:Failed to replace a bad datanode the existing pipeline to no more good datanodes begin g available to try.

在这里插入图片描述

直观判定为文件在pineline传输中DN被认为是坏的数据节点,需要新的好的数据节点来确保文件在pineline中传输正常。

官网说明:hdfs-default.xml配置文件

如果写入管道中存在数据节点/网络故障,DFSClient 将尝试从管道中删除失败的数据节点,然后继续使用其余数据节点进行写入。因此,管道中的数据节点数会减少。该功能是向管道添加新的数据节点。这是用于启用/禁用该功能的站点范围的属性(dfs.client.block.write.replace-datanode-on-failure.policy)。当集群大小非常小时(例如 3 个节点或更少),集群管理员可能希望在默认配置文件中将策略设置为 NEVER 或禁用此功能。否则,用户可能会遇到异常高的管道故障率,因为无法找到新的数据节点进行替换。

而且,仅当 dfs.client.block.write.replace-datanode-on-failure.enable 的值为 true 时,才使用此属性。ALWAYS:删除现有数据节点时,始终添加新的数据节点。NEVER:从不添加新的数据节点。默认值:让 r 作为复制编号。设 n 为现有数据节点的数量。仅当 r 大于或等于 3 且 (1) floor(r/2) 大于或等于 n 时,才添加新的数据节点;或 (2) r 大于 n,并且块被hflushed/appended。

在这里插入图片描述

方法一:在Java代码main函数中加入以下两行代码:

conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER"); 
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");

方法二:在hdfs-site.xml中加入以下代码:

<property>
	<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
	<value>NEVER</value>
</property>

三、注意点

一般来说,如果集群中DN个数小于等于3 (本机器采用伪分布式模式,只有一个DN,但是为了测试方便,直接开启即可)都不建议开启

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

HDFS 自定义实现函数将文件追加到末尾的问题 的相关文章

  • Hadoop 2.x——如何配置辅助名称节点?

    我有一个旧的 Hadoop 安装 我希望将其更新到 Hadoop 2 旧的设置 我有一个 HADOOP HOME conf masters 文件 指定 辅助名称节点 浏览 Hadoop 2 文档 我找不到任何提及 masters 文件 或者
  • 是否可以将数据导入Hive表而不复制数据

    我将日志文件以文本形式存储在 HDFS 中 当我将日志文件加载到 Hive 表中时 所有文件都会被复制 我可以避免所有文本数据存储两次吗 编辑 我通过以下命令加载它 LOAD DATA INPATH user logs mylogfile
  • 解析数百万个小 XML 文件

    我有 1000 万个小 XML 文件 300KB 500KB 我在 Mapreduce 中使用 Mahaout 的 XML 输入格式来读取数据 并使用 SAX 解析器进行解析 但处理速度非常慢 使用输入文件的压缩 lzo 有助于提高性能吗
  • Hadoop - 重新启动数据节点和任务跟踪器

    我想关闭单个数据节点和任务跟踪器 以便我在mapred site xml中所做的一些新更改生效 例如mapred reduce child java opts等 我该怎么做 但是 我不想关闭整个集群 因为我有正在运行的活动作业 另外 如何确
  • HDFS如何计算可用块?

    假设块大小为 128MB 则集群有 10GB 因此大约 80 个可用块 假设我创建了 10 个小文件 这些文件总共占用磁盘上 128MB 块文件 校验和 复制 和 10 个 HDFS 块 如果我想向HDFS添加另一个小文件 那么HDFS使用
  • 如何将位于 HDFS 上的类型安全配置文件添加到 Spark-Submit(集群模式)?

    我有一个 Spark Spark 1 5 2 应用程序 它将数据从 Kafka 流式传输到 HDFS 我的应用程序包含两个 Typesafe 配置文件来配置某些内容 例如 Kafka 主题等 现在我想在集群中使用spark submit 集
  • 为什么map任务总是运行在单节点上

    我有一个具有 4 个节点的完全分布式 Hadoop 集群 当我将作业提交给 Jobtracker 时 Jobtracker 认为 12 个映射任务对我的工作来说很酷 但奇怪的事情发生了 这 12 个映射任务始终在单个节点上运行 而不是在整个
  • Hadoop 顺序数据访问

    根据 Hadoop 权威指南 HDFS 是一个文件系统 设计用于存储非常大的文件 流式或顺序数据访问模式 什么是流式或顺序数据访问 它如何减少磁盘的寻道时间 这并不是 Hadoop 特有的 顺序访问模式是指按顺序读取数据 通常是从开始到结束
  • HBase如何实现对HDFS的随机访问?

    鉴于HBase是一个数据库 其文件存储在HDFS中 那么它如何实现对HDFS中单个数据的随机访问呢 这是通过什么方法实现的呢 From Apache HBase 参考指南 http hbase apache org book archite
  • Cat 文件与 HDFS 中的模式不匹配?

    我正在尝试 cat 与 hadoop HDFS 中的以下模式不匹配的文件 hdfs dfs cat gz 如何捕获所有不以 gz 结尾的文件 编辑 抱歉 但我需要在 Hadoop 中管理文件 显然 hdfs 附带的命令非常少 编辑2 所有文
  • Curl下载到HDFS

    我有这个代码 curl o fileName csv url xargs hdfs dfs moveFromLocal 1 somePath 当我执行此代码时 curl 将请求中的值放入 fileName csv 中 该文件将移动到 HDF
  • 连接到 Hive 时使用 Spark 进行 Kinit

    我正在尝试从独立的 Spark 连接到 Hive hadoop 集群具有 kerberos 身份验证 有人可以让我知道如何在 Spark 程序中执行 kinit 我可以连接到配置单元吗 更新 我的 Spark 与 Hadoop 位于不同的集
  • Namenode高可用客户端请求

    谁能告诉我 如果我使用java应用程序请求一些文件上传 下载操作到带有Namenode HA设置的HDFS 这个请求首先去哪里 我的意思是客户端如何知道哪个名称节点处于活动状态 如果您提供一些工作流程类型图或详细解释请求步骤 从开始到结束
  • MiniDFSCluster UnsatisfiedLinkError org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

    做时 new MiniDFSCluster Builder config build 我得到这个异常 java lang UnsatisfiedLinkError org apache hadoop io nativeio NativeIO
  • 使用Spring批处理从HDFS读取文件

    我必须编写一个 Spring 批处理 它将从 HDFS 读取文件并更新 MySQL DB 中的数据 HDFS 中的源文件包含一些 CSV 格式的报告数据 有人能给我举一个从 HDFS 读取文件的例子吗 Thanks The FlatFile
  • 如何从hdfs读取文件[重复]

    这个问题在这里已经有答案了 我在 project1目录下的hadoop文件系统中有一个文本文件名mr txt 我需要编写 python 代码来读取文本文件的第一行 而不将 mr txt 文件下载到本地 但我无法从 hdfs 打开 mr tx
  • 从 HDFS 传出文件

    我想将文件从 HDFS 传输到另一台服务器的本地文件系统 该服务器不在 hadoop 集群中 而是在网络中 我本可以这样做 hadoop fs copyToLocal
  • 运行 Sqoop 导入和导出时如何找到最佳映射器数量?

    我正在使用 Sqoop 版本 1 4 2 和 Oracle 数据库 运行 Sqoop 命令时 例如这样 sqoop import fs
  • 使用 Java API 在 Hadoop 中移动文件?

    我想使用 Java API 在 HDFS 中移动文件 我想不出办法做到这一点 FileSystem 类似乎只想允许在本地文件系统之间移动 但我想将它们保留在 HDFS 中并将它们移动到那里 我错过了一些基本的东西吗 我能想到的唯一方法是从输
  • 如何使用 python 从 Azure Data Lake Gen 2 读取文件

    我有一个文件位于 Azure Data Lake gen 2 文件系统中 我想读取文件的内容并进行一些低级更改 即从记录中的一些字段中删除一些字符 更明确地说 有些字段的最后一个字符也为反斜杠 由于该值包含在文本限定符 中 因此字段值会转义

随机推荐