我正在尝试使用 Spark Streaming 的简单文件流示例(spark-streaming_2.10,版本:1.5.1)
public class DStreamExample {
public static void main(final String[] args) {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
lines.print();
ssc.start();
ssc.awaitTermination();
}
}
当我在单个文件或控制器上运行此代码时,它不会从文件中打印任何内容,我在日志中看到它不断轮询,但没有打印任何内容。我尝试在该程序运行时将文件移动到目录。
我有什么遗漏的吗?我尝试在 RDD 行上应用映射函数,但也不起作用。
The API 文本文件流不应该读取现有目录内容,相反,它的目的是监视给定的 Hadoop 兼容文件系统路径的更改,必须通过将文件从同一文件系统中的另一个位置“移动”到受监视的位置。
简而言之,您正在订阅目录更改并将接收受监控位置内新出现的文件的内容 - 在该状态下,文件出现在监控快照时(在您的情况下为 2000 毫秒的持续时间),并且任何进一步的文件更新都不会到达流中,只有目录更新(新文件)才可以。
模拟更新的方法是在监视会话期间创建新文件:
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class DStreamExample {
public static void main(final String[] args) throws IOException {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
// spawn the thread which will create new file within the monitored directory soon
Runnable r = () -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
FileUtils.write(new File("/opt/test/newfile1"), "whatever");
} catch (IOException e) {
e.printStackTrace();
}
};
new Thread(r).start();
lines.foreachRDD((Function<JavaRDD<String>, Void>) rdd -> {
List<String> lines1 = rdd.collect();
lines1.stream().forEach(l -> System.out.println(l));
return null;
});
ssc.start();
ssc.awaitTermination();
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)