这一篇文章,我们来深入了解一下Hadoop中的FileSystem类,它是与Hadoop的某一文件系统进行交互的API。虽然我们主要聚焦于HDFS实例,即DistributedFileSystem,但总体来说,还是应该集成FileSystem抽象类,并编写代码,使其在不同文件系统中可移植。这对测试你编写的程序非常重要,例如,我们可以使用本地文件系统中的存储数据快速进行测试。
从Hadoop URL读取数据
要从hadoop文件系统中读取文件,最简单的方法是使用java.net.URL对象打开数据流,从中读取数据,看下列代码:
InputStream in = null;
try{
in = new URL("hdfs://host/path").openStream();
//省略其他次要代码
}finally{
IOUtils.closeStream(in);
}
让Java程序能够识别Hadoop中的hdfs URL方案还需要一些额外的工作。这里采用的方法是通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHanderFactory()方法。每个Java虚拟机只能调用一次这个方法,因此通常在静态方法中调用。这个限制意味着如果程序的其他组件(如不受你控制的第三方组件)已经声明一个URLStreamHandlerFactory实例,你将无法使用这种方法从Hadoop中读取数据。
下面展示的程序以标准输出方式显示Hadoop文件系统中的文件,类似于Unix中的cat指令。
范例1,通过URLStreamHandler实例以标准输出方式显示Hadoop文件系统中的文件。
public class URLCat{
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHanderFactory());
}
public static void main(String[] args) throws Exception{
InputStream in = null;
try{
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
我们可以调用Hadoop中简洁的IOUtils类,并在finally中关闭数据流,同时也可以在输入流和输出流之间复制数据(上述代码为System.out)。copyBytes方法的最后两个参数,第一个设置用于复制的缓冲区大小,第二个设置复制结束后是否关闭数据流。这里我们选择自行关闭输入流,因而System.out不必关闭输入流。
下面列举一个运行示例:
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On th top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
通过FileSystem API读取数据
Hadoop文件系统中通过Hadoop Path对象来代表文件,可以将路径视为一个Hadoop文件系统URI,如hdfs://localhost/user/tom/quangle.txt
FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS。获取FileSystem实例有以下几个静态工厂的方法。
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf,String user) throws IOException
Configuration对象封装了客户端或服务器的配置,通过配置文件读取类路径来实现(例如etc/hadoop/core-site.xml)。第一个方法返回的是默认文件系统(core-site.xml中指定的,如果没有限定,则使用默认的本地文件系统配置)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。第三,作为给定用户来访问文件系统,对安全来说至关重要。
在某些情况下,你可能希望获取本地文件系统的运用实例,此时可以使用getLocal()方法更为便捷:
public static LocalFileSystem getLocal(Configuration conf) throws IOExcetion
有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStrem open(Path f, int bufferSize) throws IOException
第一个方法使用默认的缓冲区大小为4KB
我们重写范例1,在上面的一段代码中。
范例2,直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件。
public class FileSystemCat{
public static void main(String[] args) throws Excetion{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = null;
try{
in = fs.open(new Path(uri));
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
程序运行的结果如下:
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumptty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
FSDataInputStream对象
实际上,FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。
package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream implements Seekable,PositionedReadable{}
Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos方法)的查询方法:
public interface Seekable{
void seek(long pos) throws IOException;
long getPos() throws IOExcetion;
}
调用seek()来定位大于文件长度的位置会引发IOException异常。与java.io.InputStream的skip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。
范例3,是范例2的扩展,它将一个文件写入标准输出2次,在一次写完之后,定位到文件的起始位置再次以流的方式读取该文件并输出。
范例3:
public class FileSystemDoubleCat{
public static void main(String[] args) throws Exception{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
FSDataInputStream in = null;
try{
in = fs.open(new Path(uri));
IOUtils.copyBytes(in,System.out,4096,false);
in.seek(0);
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
运行结果如下:
% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account if his Beaver Hat.
The Quangle Wangle sat,
But his face you could not see,
On account if his Beaver Hat.
FSDataInputStream类实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分:
public interface PositionedReadable{
public int read(long position, byte[] buffer, int offset, int length) throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}
read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏移量offset处。返回值是实际读到的字节数,调用者需要检查这个值,它有可能小于指定的length长度。readFully()方法将指定length长度的字节数的数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常。
所有这些方法会保留文件当前偏移量,并且是线程安全的,因此它们提供了在读文件的主体时,访问文件其他部分的便利方法。
最后请牢记,seek()方法是一个相对高开销的操作,需要谨慎使用,建议用流数据来构建应用的访问模式(例如MapReduce),而非执行大量seek()方法。
--摘自《Hadoop权威指南》
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)