使用 slick/scala 进行流式传输

2024-04-09

我正在研究 scala/slick 流,并试图了解它是如何工作的。这是我的测试代码

    val bigdata = TableQuery[BigData] 
    val x = db.stream(bigdata.result.transactionally.withStatementParameters(fetchSize = 100)).foreach {
      (tuple: (Int, UUID)) =>
        println(tuple._1 + " " + tuple._2)
        Thread.sleep(50)//emulating slow consumer.
    }

    Await.result(x, 100000 seconds)

当代码运行时,我启用了 postgresql 查询日志来了解幕后发生的事情。我发现每 100 个元素就会发生一次重新查询

2015-11-06 15:03:24 IST [24379-3] postgres@scala_test 日志:从 S_2/C_3 执行获取:从“bigdata”x2 选择 x2."id", x2."data" 2015-11-06 15:03:29 IST [24379-4] postgres@scala_test 日志:执行

fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:34 IST [24379-5] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:39 IST [24379-6] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:44 IST [24379-7] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:49 IST [24379-8] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2

然而,它看起来像是在获取整个数据集。我期待一个带有偏移量的查询。

ie SELECT * FROM bigdata  LIMIT 100 OFFSET 500

看起来一切都被查询并且发送数据是部分发送的。

然后,当上面的流正在运行时,我将新的数据集插入到同一个表中。

串流之前

SELECT count(*) FROM bigdata -> 500

然后插入几行

SELECT count(*) FROM bigdata  -> 700

但流在 500 处停止。这似乎表明新数据从未被提取和流回。关于流媒体如何在 slick 中工作的任何想法。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 slick/scala 进行流式传输 的相关文章

  • Either 相当于受检查的异常吗?

    从 Scala 开始并阅读有关Either我很自然地将新概念与我所知道的东西 在本例中来自 Java 进行比较 与之前有什么区别吗concept检查异常和Either 在这两种情况下 失败的可能性在方法中明确注释 throws或返回Eith
  • scala sbt 在多项目上测试运行设置和清理命令一次

    我知道我可以通过修改 testOptions 在 sbt 中添加设置和清理代码以用于测试阶段 例如 val embedMongoTestSettings Seq Setting Seq testOptions in Test Tests S
  • 不使用 CPU 时 Cassandra 超时

    我使用 Phantom DSL 和 Datastax Cassandra 驱动程序时遇到 Cassandra 超时 然而 Cassandra 似乎并没有超载 以下是我得到的异常 com datastax driver core except
  • 为什么我的 Project Euler Problem 12 算法这么慢?

    我已经在 Scala 中为 PE P12 创建了解决方案 但速度非常非常慢 有人可以告诉我为什么吗 如何优化这个 calculateDevisors 简单的方法和calculateNumberOfDivisors 除数函数具有相同的速度 i
  • 如何在Dotty中使用given?

    我在看Dotty下的文档Contextual Abstractions页面 我看到了Given Instances 给定实例 或者简单地 给定 定义了 规范 值 用于合成给定子句的参数的某些类型 例子 trait Ord T def com
  • 将 Scala Dataframe 写入 CSV 文件时应用 UTF8 编码

    在 Spark2 Scala 中将数据帧写入 CSV 文件时如何正确应用 UTF8 编码 我正在使用这个 df repartition 1 write mode SaveMode Overwrite format csv option he
  • Scala 组合器解析器 - 区分数字字符串和变量字符串

    我正在做 Cay Horstmann 的组合器解析器练习 我想知道区分代表数字的字符串和代表匹配语句中变量的字符串的最佳方法 def factor Parser ExprTree wholeNumber expr ident case a
  • 新式(“内联”)宏需要 scala.meta

    我刚刚更新到 scala meta 2 0 0 M1 和最新的 scala 2 12 3 现在宏不再编译 我所做的唯一更改是将元版本从 1 8 0 更改为 2 0 0 M1 错误 新式 内联 宏需要 scala meta 有谁知道是否有快速
  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • Scala:具有复杂结构的树插入尾递归

    我正在 scala 中创建自定义对象树 并且我的插入方法引发堆栈溢出 因为它不是尾递归 但是 我不太清楚如何使其尾递归 我见过使用 累加器 变量的相关示例 但它们要么是只能相乘和覆盖的整数之类的东西 要么是我在适应树时遇到困难的列表 这是我
  • 宏:knownDirectSubclasses 被嵌套类型破坏?

    我有一个宏 它枚举密封特征的直接子类型 import scala reflect macros Context import language experimental macros object Checker def apply A U
  • Scala 中值类的隐式 Json 格式化程序

    我有许多值类组成了一个更大的对象案例类 final case class TopLevel foo Foo bar Bar final case class Foo foo String extends AnyVal final case
  • 如何发现 Scala 远程 Actor 已死亡?

    在 Scala 中 当另一个 远程 actor 终止时 可以通过设置 trapExit 标志并以第二个 actor 作为参数调用 link 方法来通知一个 actor 在这种情况下 当远程参与者通过调用 exit 结束其工作时 第一个参与者
  • Scala 如何忽略 Java 的检查异常?

    例如如果调用 JavaThread sleep这会抛出一个已检查的InterruptedException来自 Scala 源文件 然后不需要将调用包含在 Scala 中try catch Scala 如何删除将调用包围在 a 中的规则tr
  • Java 表达式树 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有相当于 net的 LINQ 下的表达式树JVM 我想实现一些类似 LINQ 的代码结构Scala
  • Scala:如何在超类上实现克隆方法,并在子类中使用它?

    我可能会以错误的方式处理这个问题 但我想要一个像这样的对象 class MyDataStructure def myClone val clone new MyDataStructure do stuff to make clone the
  • Scala 模式匹配变量绑定

    为什么提取器返回时不能以 样式绑定变量Option
  • Scala 解析器组合器的运算符优先级

    我正在研究需要考虑运算符优先级的解析逻辑 我的需求并不太复杂 首先 我需要乘法和除法比加法和减法具有更高的优先级 例如 1 2 3 应视为 1 2 3 这是一个简单的例子 但你明白了 我需要将更多自定义标记添加到优先级逻辑中 我可以根据此处
  • 在 Scala 中,使用“_”和使用命名标识符有什么区别?

    为什么当我尝试使用时会出现错误 而不是使用命名标识符 scala gt res0 res25 List Int List 1 2 3 4 5 scala gt res0 map gt item toString

随机推荐

  • 在android中使用Google Drive api获取在Google Drive上创建的文件的大小

    我创建了应用程序 使用该应用程序用户将能够从 Google 驱动器获取列表中的所有文件并能够下载它 现在 用户可以下载已上传到驱动器的文件 并带有进度计数 但我无法获取在 Goolge 驱动器上创建的文件的大小 例如文档 演示文稿 电子表格
  • Android UI 测试期间“未找到测试”

    如果我有时想通过右键单击测试然后选择运行来运行单个测试 测试结果将显示 未找到测试 并显示与 线程 main java lang NoClassDefFoundError 中的异常 相关的错误 我发现这种情况只发生过几次 为什么会发生这种情
  • C++迭代器和反向迭代器

    我正在写一个iterator 实际上是const iterator对于我当前的对象 我还想创建一个reverse const iterator also 我环顾四周 想看看如何做到这一点 然后我偶然发现this http www cplus
  • 在jsp页面中使用log4j的正确方法是什么

    我的意思是 我希望记录器名称反映 source jsp 文件 无论它是否包含在另一个文件中或编译为类或其他文件 首先 导入所需的包 即 then the jsppagename jsp根据您使用的服务器 可能会发生变化 然后 在 jsp 内
  • GZip 算法如何工作? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 GZip 文件压缩算法如何工作 如果有人有这方面的任何文档 我想阅读它 如果您仍在寻找更详细的概述 我已经在以下位置编写了 gzip de
  • Django 对象.update_or_create

    我有一个在 celery 中运行的 period task 来查询最新的加密货币价格 但由于某种原因 每次想要显示数据时 我没有得到更新的记录 我只是得到新的记录 而旧的记录由于某种原因被保留 tasks py periodic task
  • NavigationView如何处理动态标题内容

    我有一个非常标准的 NavigationView 当我在标题中使用静态布局 如下所示 时 效果非常好
  • 约束布局 - 具有最大宽度的两个视图

    我想创建一个布局 使用约束布局 如下所示 在不同的语言中 Button1 可能比 Button2 大 我怎样才能做到这一点 我只能在包含两个按钮的约束内使用 LinearLayout 来实现此目的 但我尝试仅使用布局 Thanks Upda
  • 如果主体参数以“@”开头,则发出 PowerShell POST 请求

    我想在 PowerShell 中发出 POST 请求 以下是 Postman 中的正文详细信息 type login username email protected cdn cgi l email protection password
  • 生成数字数组中有效的数字组合

    我正在尝试从数字数组中生成所有有效的数字组合 假设我们有以下内容 let arr 1 2 9 4 7 我们需要输出这样的内容 1 2 9 4 7 1 2 9 47 1 2 94 7 1 2 947 1 29 4 7 1 29 47 1 29
  • 我无法在我的 Visual C Express Edition 2008 中汇编电影 (MMX) 指令

    当我尝试编译时movd指令显示错误为 error A2085 instruction or register not accepted in current CPU mode 我的代码如下 386 model flat c code add
  • 我怎样才能让我的verilog移位器更通用?

    这里我有一个移位器 但现在它最多只能工作 3 位 我一直在寻找 但不知道如何让它工作最多 8 位 module shifter a b out input 7 0 a b output 7 0 out wire 7 0 out1 out2
  • 扩展 Eloquent 的类的构造函数

    我刚刚启动了一个新网站 我想使用 Eloquent 在为数据库播种的过程中 我注意到 如果我在扩展 eloquent 的模型上包含任何类型的构造函数 则会添加空行 例如 运行此播种器
  • 如何解决三向多态关联?

    首先我要说我正在使用 MySQL 不是事务型 并且这是无法更改的 另外 为了简洁和清晰起见 我简化了此处的表格 在此示例中 课程 由其内部属性和外部属性及其自己的属性 阅读 组成 阅读 有其自己的关键依赖属性和三个不同的外部属性 阅读源 我
  • 如何在 SQL Server 2008 中存储特定列的列值?

    基本上我正在映射字段 正如你所看到的GridView 2我选择了特定的列名称 让我们考虑第一条记录 即1 id Column0 For id我已选择Column0 所以在数据库中我想在 id 列下存储列值 1 2 3 4 像下面这样 id
  • 使用属性和不访问 ivars 之间的区别

    使用属性或直接访问 ivars 的特定性能和行为差异 对于全局变量 使用它有什么区别 interface myClass UIImageView myView void loadView super loadView myView UIIm
  • 如何动态添加 mixin 作为基类而不出现 MRO 错误?

    说我有课A B and C Class A and B都是 Class 的 mixin 类C class A object pass class B object pass class C object A B pass 这在实例化 C 类
  • 在angularjs中克隆html元素

    我正在尝试在 angularjs 中实现拖放系统 我希望在拖动开始时克隆拖动的对象 但是我不知道如何在 angularjs 中克隆元素及其范围和链接控制器 有什么建议么 不建议使用 Angular 来克隆 DOM 元素 通常是通过拖放完成的
  • 如何比较两个 pandas 数据帧并删除一个文件上的重复项而不附加其他文件中的数据[重复]

    这个问题在这里已经有答案了 我正在尝试使用 pandas 数据框比较两个 csv 文件 其中一个是每天都会附加数据的主表 test master csv 第二个是每日报告 test daily csv 其中包含我想要附加到 test mas
  • 使用 slick/scala 进行流式传输

    我正在研究 scala slick 流 并试图了解它是如何工作的 这是我的测试代码 val bigdata TableQuery BigData val x db stream bigdata result transactionally