Spark Scala 流式 CSV

2024-02-04

我是 Spark/Scala 的新手。我知道如何加载 CSV 文件:

    sqlContext.read.format("csv")

以及如何读取文本流和文件流:

    scc.textFileStream("""file:///c:\path\filename""");
    scc.fileStream[LongWritable, Text, TextInputFormat](...)

但如何阅读文本以 CSV 格式传输格式?谢谢,利维


干得好:

val ssc = new StreamingContext(sparkConf, Seconds(5))


    // Create the FileInputDStream on the directory
    val lines = ssc.textFileStream("file:///C:/foo/bar")

    lines.foreachRDD(rdd => {
        if (!rdd.isEmpty()) {
          println("RDD row count: " + rdd.count())
         // Now you can convert this RDD to DataFrame/DataSet and perform business logic.  

        }
      }
    })

    ssc.start()
    ssc.awaitTermination()
  } 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Scala 流式 CSV 的相关文章

  • 如何在 T-SQL 中将 CSV 转换为记录集?

    在我的存储过程中 我传递一个过滤器 使用 WHERE Column IN 子句 作为参数 参数值以 CSV 形式给出 将此 CSV 转换为记录集的最佳方法是什么 例子 SELECT FROM Employee WHERE Name IN J
  • 使用 Akka 玩 2.5 - 找不到参数超时的隐式值:akka.util.Timeout

    我正在尝试使用 Play 2 5 测试 Akka 但遇到了一个似乎无法解决的编译错误 我正在关注 Play 文档中的此页面 https playframework com documentation 2 5 x ScalaAkka http
  • 总分配超过堆内存的 95.00%(960,285,889 字节)- pyspark 错误

    我用 python 2 7 编写了一个脚本 使用 pyspark 将 csv 转换为 parquet 和其他内容 当我在小数据上运行脚本时 它运行良好 但是当我在更大的数据 250GB 上运行脚本时 我遇到了以下错误 总分配超过堆内存的 9
  • 如何发现 Scala 远程 Actor 已死亡?

    在 Scala 中 当另一个 远程 actor 终止时 可以通过设置 trapExit 标志并以第二个 actor 作为参数调用 link 方法来通知一个 actor 在这种情况下 当远程参与者通过调用 exit 结束其工作时 第一个参与者
  • CSV 解析 - Swift 4

    我正在尝试解析 CSV 但遇到一些问题 下面是我用来解析 CSV 的代码 let fileURL Bundle main url forResource test application data Sheet 1 withExtension
  • 使用 Spray-json 解析简单数组

    我正在尝试 但失败了 了解 Spray json 如何将 json feed 转换为对象 如果我有一个简单的 key gt value json feed 那么它似乎可以正常工作 但是我想要读取的数据出现在如下列表中 name John a
  • 通用特征的隐式转换

    我正在实现一个数据结构 并希望用户能够使用任何类型作为密钥 只要他提供一个合适的密钥类型来包装它 我有这个关键类型的特质 这个想法是进行从基类型到键类型的隐式转换 反之亦然 实际上 只使用基类型 该特征看起来像这样 trait Key T
  • Scala 如何忽略 Java 的检查异常?

    例如如果调用 JavaThread sleep这会抛出一个已检查的InterruptedException来自 Scala 源文件 然后不需要将调用包含在 Scala 中try catch Scala 如何删除将调用包围在 a 中的规则tr
  • 有谁知道一种更快的方法来执行 String.Split() 吗?

    我正在读取 CSV 文件的每一行 并且需要获取每一列中的各个值 所以现在我只是使用 values line Split delimiter where line是保存由分隔符分隔的值的字符串 衡量我的表现ReadNextRow我注意到它花费
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • Spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

    据我了解 distinct 哈希分区 RDD 来识别唯一键 但它是否针对仅移动每个分区的不同元组进行了优化 想象一个具有以下分区的 RDD 1 2 2 1 4 2 2 1 3 3 5 4 5 5 5 在此 RDD 上的不同键上 所有重复键
  • 对 Scala Not Null 特征的库支持

    Notice 从 Scala 2 11 开始 NotNull已弃用 据我了解 如果您希望引用类型不可为空 则必须混合魔法NotNull特征 编译器会自动阻止你输入null 可以值在里面 看到这个邮件列表线程 http www nabble
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将
  • Scala 中的 Shapeless 结构编程:如何正确使用 SYB 实现?

    我想使用SYB http research microsoft com en us um people simonpj papers hmap 实施于无形图书馆 https github com milessabin shapeless编写
  • AWK:递归下降 CSV 解析器

    响应一个BASH 中的递归下降 CSV 解析器 https codereview stackexchange com questions 11727 need some advice or help with translation and
  • 懒惰背景下的变革与行动

    正如 Learning Spark 闪电般快速的大数据分析 一书中提到的 由于 Spark 计算 RDD 的方式不同 转换和操作也有所不同 在对惰性进行一些解释之后 我发现转换和操作都是惰性地进行的 那么问题来了 这句话的意思是什么 对比
  • 使用 on_bad_lines 将 pandas.read_csv 中的无效行写入文件

    我有一个 CSV 文件 我正在使用 Python 来解析该文件 我发现文件中的某些行具有不同的列数 001 Snow Jon 19801201 002 Crom Jake 19920103 003 Wise Frank 19880303 l
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • 如何为 Spark RDD 中的元素分配唯一的连续编号

    我有一个数据集 user product review 并希望将其输入到 mllib 的 ALS 算法中 该算法需要用户和产品是数字 而我的是字符串用户名和字符串SKU 现在 我获取不同的用户和 SKU 然后在 Spark 外部为它们分配数

随机推荐