如果IO可以取代Scala的Future,我们如何创建异步IO任务
首先,我们需要澄清什么是“异步任务。通常async意味着“不阻塞操作系统线程”,但既然你提到Future
,有点模糊。比如说,如果我写:
Future { (1 to 1000000).foreach(println) }
它不会是async,因为它是一个阻塞循环和阻塞输出,但它可能会在不同的操作系统线程上执行,由隐式 ExecutionContext 管理。等效的猫效果代码是:
for {
_ <- IO.shift
_ <- IO.delay { (1 to 1000000).foreach(println) }
} yield ()
(这不是较短的版本)
So,
-
IO.shift
用于更改线程/线程池。Future
每次操作都会执行此操作,但它在性能方面并不是免费的。
-
IO.delay
{ ... }(又名IO { ... }
) does NOT使任何事情异步并执行NOT切换线程。它用于创建简单的IO
来自同步副作用 API 的值
现在,让我们回到真正的异步。这里要理解的是:
每个异步计算都可以表示为一个接受回调的函数。
是否使用返回的APIFuture
或Java的CompletableFuture
,或者像NIO这样的东西CompletionHandler
,这一切都可以转换为回调。这是什么IO.async
用于:您可以转换任何函数接受回调 to an IO
。如果是这样的话:
for {
_ <- IO.async { ... }
_ <- IO(println("Done"))
} yield ()
Done
仅当(并且如果)计算时才会打印...
回电。您可以将其视为阻塞绿色线程,但不阻塞操作系统线程。
So,
-
IO.async
用于转换任何已经异步了计算为IO
.
-
IO.delay
用于转换任何完全同步计算为IO
.
- 具有真正异步计算的代码的行为就像阻塞绿色线程一样。
工作时最接近的类比Future
s 正在创建一个scala.concurrent.Promise
并返回p.future
.
或者当我们使用 unsafeToAsync 或 unsafeToFuture 调用 IO 时会发生异步?
有点。和IO
, nothing除非您调用其中之一(或使用IOApp
)。但是 IO 并不保证您会在不同的操作系统线程上执行,甚至异步执行,除非您明确要求这样做IO.shift
or IO.async
.
您可以保证线程随时切换,例如(IO.shift *> myIO).unsafeRunAsyncAndForget()
。这是可能的,正是因为myIO
除非有要求,否则不会被执行,无论您是否将其作为val myIO
or def myIO
.
但是,您无法神奇地将阻塞操作转换为非阻塞操作。这也是不可能的Future
也不与IO
.
猫效应中的异步和并发有什么意义?他们为什么分开?
Async
and Concurrent
(and Sync
) 是类型类。它们的设计是为了让程序员可以避免被锁定cats.effect.IO
并且可以为您提供支持您选择的任何 API,例如 monix Task 或 Scalaz 8 ZIO,甚至 monad 转换器类型,例如OptionT[Task, *something*]
。 fs2、monix 和 http4s 等库都利用它们,让您有更多选择来使用它们。
Concurrent
在上面添加额外的东西Async
,其中最重要的是.cancelable
and .start
。这些与Future
,因为这根本不支持取消。
.cancelable
是一个版本.async
这还允许您指定一些逻辑来取消您正在包装的操作。一个常见的例子是网络请求 - 如果您对结果不再感兴趣,您可以中止它们而无需等待服务器响应,并且不会在读取响应上浪费任何套接字或处理时间。你可能永远不会直接使用它,但它有它的用处。
但是,如果您无法取消可取消的操作,那么它们有什么用呢?这里的关键观察是您无法从操作本身取消操作。其他人必须做出这个决定,这就会发生同时与操作本身(这是类型类的名称的来源)。那就是那里.start
进来。简而言之,
.start
是绿色线程的显式分叉。
Doing someIO.start
类似于做val t = new Thread(someRunnable); t.start()
,除了现在是绿色的。和Fiber
本质上是一个精简版本Thread
API:你可以做.join
,就像Thread#join()
,但它不会阻塞操作系统线程;和.cancel
,这是安全版本.interrupt()
.
请注意,还有其他方法可以分叉绿色线程。例如,进行并行操作:
val ids: List[Int] = List.range(1, 1000)
def processId(id: Int): IO[Unit] = ???
val processAll: IO[Unit] = ids.parTraverse_(processId)
会将所有 ID 的处理分叉到绿色线程,然后将它们全部连接起来。或者使用.race
:
val fetchFromS3: IO[String] = ???
val fetchFromOtherNode: IO[String] = ???
val fetchWhateverIsFaster = IO.race(fetchFromS3, fetchFromOtherNode).map(_.merge)
将并行执行提取,为您提供第一个完成的结果,并自动取消较慢的提取。所以,做.start
并使用Fiber
不是分叉更多绿色线程的唯一方法,只是最明确的方法。答案是:
IO是绿线程吗?如果是的话,为什么猫效应中有一个 Fiber 对象?据我了解,Fiber 是绿色线程,但文档声称我们可以将 IO 视为绿色线程。