HDFS中读取数据——Java接口实现

2023-05-16

这一篇文章,我们来深入了解一下Hadoop中的FileSystem类,它是与Hadoop的某一文件系统进行交互的API。虽然我们主要聚焦于HDFS实例,即DistributedFileSystem,但总体来说,还是应该集成FileSystem抽象类,并编写代码,使其在不同文件系统中可移植。这对测试你编写的程序非常重要,例如,我们可以使用本地文件系统中的存储数据快速进行测试。

从Hadoop URL读取数据

要从hadoop文件系统中读取文件,最简单的方法是使用java.net.URL对象打开数据流,从中读取数据,看下列代码:

InputStream in = null;
try{
    in = new URL("hdfs://host/path").openStream();
    //省略其他次要代码
}finally{
    IOUtils.closeStream(in);
}

让Java程序能够识别Hadoop中的hdfs URL方案还需要一些额外的工作。这里采用的方法是通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHanderFactory()方法。每个Java虚拟机只能调用一次这个方法,因此通常在静态方法中调用。这个限制意味着如果程序的其他组件(如不受你控制的第三方组件)已经声明一个URLStreamHandlerFactory实例,你将无法使用这种方法从Hadoop中读取数据。

下面展示的程序以标准输出方式显示Hadoop文件系统中的文件,类似于Unix中的cat指令。

范例1,通过URLStreamHandler实例以标准输出方式显示Hadoop文件系统中的文件。

public class URLCat{
    static{
        URL.setURLStreamHandlerFactory(new FsUrlStreamHanderFactory());
    }

    public static void main(String[] args) throws Exception{
        InputStream in = null;
        try{
            in = new URL(args[0]).openStream();
            IOUtils.copyBytes(in,System.out,4096,false);
        }finally{
            IOUtils.closeStream(in);
        }
    }
}

我们可以调用Hadoop中简洁的IOUtils类,并在finally中关闭数据流,同时也可以在输入流和输出流之间复制数据(上述代码为System.out)。copyBytes方法的最后两个参数,第一个设置用于复制的缓冲区大小,第二个设置复制结束后是否关闭数据流。这里我们选择自行关闭输入流,因而System.out不必关闭输入流。

下面列举一个运行示例:

% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
    On th top of the Crumpetty Tree
    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.

 

通过FileSystem API读取数据

Hadoop文件系统中通过Hadoop Path对象来代表文件,可以将路径视为一个Hadoop文件系统URI,如hdfs://localhost/user/tom/quangle.txt

FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS。获取FileSystem实例有以下几个静态工厂的方法。

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf,String user) throws IOException

 Configuration对象封装了客户端或服务器的配置,通过配置文件读取类路径来实现(例如etc/hadoop/core-site.xml)。第一个方法返回的是默认文件系统(core-site.xml中指定的,如果没有限定,则使用默认的本地文件系统配置)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。第三,作为给定用户来访问文件系统,对安全来说至关重要。

在某些情况下,你可能希望获取本地文件系统的运用实例,此时可以使用getLocal()方法更为便捷:

public static LocalFileSystem getLocal(Configuration conf) throws IOExcetion

 有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStrem open(Path f, int bufferSize) throws IOException

第一个方法使用默认的缓冲区大小为4KB 

我们重写范例1,在上面的一段代码中。

范例2,直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件。

public class FileSystemCat{
    public static void main(String[] args) throws Excetion{
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),conf);
        InputStream in = null;
        
        try{
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in,System.out,4096,false);
        }finally{
            IOUtils.closeStream(in);
        }
    }
}

程序运行的结果如下:

% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumptty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

FSDataInputStream对象

实际上,FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。

package org.apache.hadoop.fs;

public class FSDataInputStream extends DataInputStream implements Seekable,PositionedReadable{}

Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos方法)的查询方法:

public interface Seekable{
    void seek(long pos) throws IOException;
    long getPos() throws IOExcetion;
}

 调用seek()来定位大于文件长度的位置会引发IOException异常。与java.io.InputStream的skip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。

范例3,是范例2的扩展,它将一个文件写入标准输出2次,在一次写完之后,定位到文件的起始位置再次以流的方式读取该文件并输出。

范例3:

public class FileSystemDoubleCat{
    
    public static void main(String[] args) throws Exception{
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),conf);
        FSDataInputStream in = null;
        
        try{
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in,System.out,4096,false);
            in.seek(0);
            IOUtils.copyBytes(in,System.out,4096,false);
        }finally{
            IOUtils.closeStream(in);
        }
    }
}

运行结果如下:

% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account if his Beaver Hat.
The Quangle Wangle sat,
But his face you could not see,
On account if his Beaver Hat.

FSDataInputStream类实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分:

public interface PositionedReadable{
    public int read(long position, byte[] buffer, int offset, int length) throws IOException;
    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
    public void readFully(long position, byte[] buffer) throws IOException;
} 

 read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏移量offset处。返回值是实际读到的字节数,调用者需要检查这个值,它有可能小于指定的length长度。readFully()方法将指定length长度的字节数的数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常。

所有这些方法会保留文件当前偏移量,并且是线程安全的,因此它们提供了在读文件的主体时,访问文件其他部分的便利方法。

最后请牢记,seek()方法是一个相对高开销的操作,需要谨慎使用,建议用流数据来构建应用的访问模式(例如MapReduce),而非执行大量seek()方法。

 

--摘自《Hadoop权威指南》

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

HDFS中读取数据——Java接口实现 的相关文章

  • Hibernate注解放置问题

    我有一个我认为很简单的问题 我见过两种方式的例子 问题是 为什么我不能将注释放在字段上 让我举一个例子 Entity Table name widget public class Widget private Integer id Id G
  • 使用 JPA Criteria API 进行分页的总行数

    我正在系统中为实体实现 高级搜索 功能 以便用户可以使用该实体的属性上的多个条件 eq ne gt lt 等 来搜索该实体 我正在使用 JPA 的 Criteria API 动态生成 Criteria 查询 然后使用setFirstResu
  • 这个函数(for循环)空间复杂度是O(1)还是O(n)?

    public void check 10 for string i list Integer a hashtable get i if a gt 10 hashtable remove i 这是 O 1 还是 O n 我猜测 O n 但不是
  • Android在排序列表时忽略大小写

    我有一个名为路径的列表 我目前正在使用以下代码对字符串进行排序 java util Collections sort path 这工作正常 它对我的 列表进行排序 但是它以不同的方式处理第一个字母的情况 即它用大写字母对列表进行排序 然后用
  • JAVA - Xuggler - 组合 MP3 音频文件和 MP4 电影时播放视频

    使用 JAVA 和 Xuggler 以下代码组合 MP3 音频文件和 MP4 电影文件并输出组合的 mp4 文件 我希望在合并音频和视频文件时应自动播放输出视频文件 String inputVideoFilePath in mp4 Stri
  • 如何在不超过最大值的情况下增加变量?

    我正在为学校开发一个简单的视频游戏程序 我创建了一个方法 如果调用该方法 玩家将获得 15 点生命值 我必须将生命值保持在最大值 100 并且由于我目前的编程能力有限 我正在做这样的事情 public void getHealed if h
  • 如何模拟从抽象类继承的受保护子类方法?

    如何使用 Mockito 或 PowerMock 模拟由子类实现但从抽象超类继承的受保护方法 换句话说 我想在模拟 doSomethingElse 的同时测试 doSomething 方法 抽象超类 public abstract clas
  • 匿名类上的 NotSerializedException

    我有一个用于过滤项目的界面 public interface KeyValFilter extends Serializable public static final long serialVersionUID 7069537470113
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF
  • 在游戏视图下添加 admob

    我一直试图将 admob 放在我的游戏视图下 这是我的代码 public class HoodStarGame extends AndroidApplication Override public void onCreate Bundle
  • 如何在 Java 中测试一个类是否正确实现了 Serialized(不仅仅是 Serialized 的实例)

    我正在实现一个可序列化的类 因此它是一个与 RMI 一起使用的值对象 但我需要测试一下 有没有办法轻松做到这一点 澄清 我正在实现该类 因此在类定义中添加 Serialized 很简单 我需要手动序列化 反序列化它以查看它是否有效 我找到了
  • IntelliJ - 调试模式 - 在程序内存中搜索文本

    我正在与无证的第三方库合作 我知道有一定的String存储在库深处的某个字段中的某处 我可以预测的动态值 但我想从库的 API 中获取它 有没有一种方法可以通过以下方式进行搜索 类似于全文搜索 full程序内存处于调试模式并在某个断点处停止
  • 如何知道抛出了哪个异常

    我正在对我们的代码库进行审查 有很多这样的陈述 try doSomething catch Exception e 但我想要一种方法来知道 doSomething 抛出了哪个异常 在 doSomething 的实现中没有 throw 语句
  • 如何在JSTL中调​​用java方法? [复制]

    这个问题在这里已经有答案了 这可能是重复的问题 我只想调用不是 getter 或 setter 方法的方法例如 xyz 类的 makeCall someObj stringvalue Java类 Class XYZ public Strin
  • Cucumber Java 与 Spring Boot 集成 - Spring @Autowired 抛出 NullPointer 异常

    我正在为 Spring boot 应用程序编写 cucumber java 单元测试来测试每个功能 当我与 Spring Boot 集成时 Autowired 类抛出 NullPointer 异常 Spring Boot应用程序类 Spri
  • 游戏内的java.awt.Robot?

    我正在尝试使用下面的代码来模拟击键 当我打开记事本时 它工作正常 但当我打开我想使用它的游戏时 它没有执行任何操作 所以按键似乎不起作用 我尝试模拟鼠标移动和点击 这些动作确实有效 有谁知道如何解决这个问题 我发现这个问题 如何在游戏中使用
  • Java中的Object类是什么?

    什么是或什么类型private Object obj Object http download oracle com javase 6 docs api java lang Object html是Java继承层次结构中每个类的最终祖先 从
  • 具有特定参数的 Spring AOP 切入点

    我需要创建一个我觉得很难描述的方面 所以让我指出一下想法 com x y 包 或任何子包 中的任何方法 一个方法参数是接口 javax portlet PortletRequest 的实现 该方法中可能有更多参数 它们可以是任何顺序 我需要
  • 如何从 Maven 存储库引用本机 DLL?

    如果 JAR 附带 Maven 存储库中的本机 DLL 我需要在 pom xml 中放入什么才能将该 DLL 放入打包中 更具体地举个例子Jacob http search maven org artifactdetails 7Cnet s
  • 带有 Maven Wrapper 的 Java 17 导致无法识别的 VM 选项“MaxPermSize=512m”

    I use OpenJDK 17 https jdk java net 17 使用 Maven Wrapper 3 8 2 从春季初始化 https start spring io Maven项目 JAR打包 Java 17 Spring

随机推荐