如何标记一个TPL数据流周期完成?

2024-04-01

给定 TPL 数据流中的以下设置。

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    }));

tileFilder.LinkTo(block);


using (new TraceTimer("Time"))
{
    dirBroadcast.Post(directory);

    block.LinkTo(new ActionBlock<FileInfo>((s) =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);

    }));
    block.Complete();
    block.Completion.Wait();

}

我想知道由于循环的原因我如何才能将其标记为完成。目录被发布到 dirBroadcast 广播器,该广播器发布到 dirfinder,可能会将新目录发回广播器,因此我不能简单地将其标记为完整,因为它会阻止从 dirfinder 添加任何目录。我应该重新设计它以跟踪目录的数量,或者 TPL 中有任何相关内容吗?


如果您的代码的目的是使用某种并行性遍历目录结构,那么我建议不要使用 TPL Dataflow 并使用 Microsoft 的 Reactive Framework。我认为事情变得简单多了。

我就是这样做的。

首先定义一个递归函数来构建目录列表:

Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
    Observable
        .Return(di)
        .Concat(di.GetDirectories()
            .ToObservable()
            .SelectMany(di2 => recurse(di2)))
        .ObserveOn(Scheduler.Default);

这会执行目录的递归并使用默认的 Rx 调度程序,这会导致可观察对象并行运行。

所以通过调用recurse有输入DirectoryInfo我得到了输入目录及其所有子目录的可观察列表。

现在我可以构建一个相当直接的查询来获得我想要的结果:

var query =
    from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
    from fi in di.GetFiles().ToObservable()
    let zxy =
        fi
            .FullName
            .Split('\\')
            .Reverse()
            .Take(3)
            .Reverse()
            .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
            .ToArray()
    let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
    select new FileInfo(Path.Combine(di.FullName, suffix));

现在我可以像这样操作查询:

query
    .Subscribe(s =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);
    });

现在我可能在您的自定义代码中遗漏了一些内容,但如果这是您想要采用的方法,我相信您可以很容易地解决任何逻辑问题。

当子目录和文件用完时,此代码会自动处理完成。

要将 Rx 添加到项目中,请在 NuGet 中查找“Rx-Main”。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何标记一个TPL数据流周期完成? 的相关文章

  • 在一个数据访问层中处理多个连接字符串

    我有一个有趣的困境 我目前有一个数据访问层 它必须与多个域一起使用 并且每个域都有多个数据库存储库 具体取决于所调用的存储过程 目前 我只需使用 SWITCH 语句来确定应用程序正在运行的计算机 并从 Web config 返回适当的连接字
  • 通过引用传递 [C++]、[Qt]

    我写了这样的东西 class Storage public Storage QString key const int value const void add item QString int private QMap
  • 机器Epsilon精度差异

    我正在尝试计算 C 中双精度数和浮点数的机器 epsilon 值 作为学校作业的一部分 我在 Windows 7 64 位中使用 Cygwin 代码如下 include
  • std::list 线程push_back、front、pop_front

    std list 线程安全吗 我假设不是这样 所以我添加了自己的同步机制 我认为我有正确的术语 但我仍然遇到问题 每个函数都由单独的线程调用 Thread1 不能等待 它必须尽可能快 std list
  • 如何从 Visual Studio 将视图导航到其控制器?

    问题是解决方案资源管理器上有 29 个项目 而且项目同时具有 ASP NET MVC 和 ASP NET Web 表单结构 在MVC部分中 Controller文件夹中有大约100个子文件夹 每个文件夹至少有3 4个控制器 视图完全位于不同
  • 如何使从 C# 调用的 C(P/invoke)代码“线程安全”

    我有一些简单的 C 代码 它使用单个全局变量 显然这不是线程安全的 所以当我使用 P invoke 从 C 中的多个线程调用它时 事情就搞砸了 如何为每个线程单独导入此函数 或使其线程安全 我尝试声明变量 declspec thread 但
  • 对类 static constexpr 结构的未定义引用,g++ 与 clang

    这是我的代码 a cp p struct int2 int x y struct Foo static constexpr int bar1 1 static constexpr int2 bar2 1 2 int foo1 return
  • 枚举扩展方法

    在vs2008中 是否可以编写适用于任何枚举的扩展方法 我知道您可以针对特定枚举编写扩展方法 但我希望能够使用单个扩展方法对每个枚举进行处理 这可能吗 是的 只需针对基础进行编码Enum类型 例如 public static void So
  • 需要帮助优化算法 - 两百万以下所有素数的总和

    我正在尝试做一个欧拉计划 http projecteuler net问题 我正在寻找 2 000 000 以下所有素数的总和 这就是我所拥有的 int main int argc char argv unsigned long int su
  • 两个静态变量同名(两个不同的文件),并在任何其他文件中 extern 其中一个

    在一个文件中将变量声明为 static 并在另一个文件中进行 extern 声明 我认为这会在链接时出现错误 因为 extern 变量不会在任何对象中看到 因为在其他文件中声明的变量带有限定符 static 但不知何故 链接器 瑞萨 没有显
  • 两个类可以使用 C++ 互相查看吗?

    所以我有一个 A 类 我想在其中调用一些 B 类函数 所以我包括 b h 但是 在 B 类中 我想调用 A 类函数 如果我包含 a h 它最终会陷入无限循环 对吗 我能做什么呢 仅将成员函数声明放在头文件 h 中 并将成员函数定义放在实现文
  • 实例化类时重写虚拟方法

    我有一个带有一些虚函数的类 让我们假设这是其中之一 public class AClassWhatever protected virtual string DoAThingToAString string inputString retu
  • 空指针与 int 等价

    Bjarne 在 C 编程语言 中写道 空指针与整数零不同 但 0 可以用作空指针的指针初始值设定项 这是否意味着 void voidPointer 0 int zero 0 int castPointer reinterpret cast
  • LINQ:使用 INNER JOIN、Group 和 SUM

    我正在尝试使用 LINQ 执行以下 SQL 最接近的是执行交叉联接和总和计算 我知道必须有更好的方法来编写它 所以我向堆栈团队寻求帮助 SELECT T1 Column1 T1 Column2 SUM T3 Column1 AS Amoun
  • C# 中最小化字符串长度

    我想减少字符串的长度 喜欢 这串 string foo Lorem ipsum dolor sit amet consectetur adipiscing elit Aenean in vehicula nulla Phasellus li
  • C++ 中的参考文献

    我偶尔会在 StackOverflow 上看到代码 询问一些涉及函数的重载歧义 例如 void foo int param 我的问题是 为什么会出现这种情况 或者更确切地说 你什么时候会有 对参考的参考 这与普通的旧参考有何不同 我从未在现
  • C# 使用“?” if else 语句设置值这叫什么

    嘿 我刚刚看到以下声明 return name null name NA 我只是想知道这在 NET 中叫什么 是吗 代表即然后执行此操作 这是一个俗称的 条件运算符 三元运算符 http en wikipedia org wiki Tern
  • DotNetZip:如何提取文件,但忽略zip文件中的路径?

    尝试将文件提取到给定文件夹 忽略 zip 文件中的路径 但似乎没有办法 考虑到其中实现的所有其他好东西 这似乎是一个相当基本的要求 我缺少什么 代码是 using Ionic Zip ZipFile zf Ionic Zip ZipFile
  • 在OpenGL中,我可以在坐标(5, 5)处精确地绘制一个像素吗?

    我所说的 5 5 正是指第五行第五列 我发现使用屏幕坐标来绘制东西非常困难 OpenGL 中的所有坐标都是相对的 通常范围从 1 0 到 1 0 为什么阻止程序员使用屏幕坐标 窗口坐标如此严重 最简单的方法可能是通过以下方式设置投影以匹配渲
  • 如何确定 CultureInfo 实例是否支持拉丁字符

    是否可以确定是否CultureInfo http msdn microsoft com en us library system globalization cultureinfo aspx我正在使用的实例是否基于拉丁字符集 我相信你可以使

随机推荐