异步任务、视频缓冲

2024-05-09

我正在尝试理解 C# 中的任务,但仍然遇到一些问题。我正在尝试创建一个包含视频的应用程序。主要目的是从文件中读取视频(我使用 Emgu.CV)并通过 TCP/IP 发送它以在板上进行处理,然后以流(实时)方式返回。首先,我是连续做的。所以,读一Bitmap、从板上发送-接收以及绘图。但是读取位图并绘制它们需要太多时间。我想要一个传输、接收 FIFO 缓冲区来保存视频帧,以及一个不同的任务来完成发送和接收每个帧的工作。所以我想并行进行。我想我应该创建 3 个任务:

        tasks.Add(Task.Run(() => Video_load(video_path)));
        tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
        tasks.Add(Task.Run(() => VideoDisp_hw(32)));

我想“并行”运行。我应该使用什么类型的对象?并发队列?缓冲区块?或者只是一个列表?

感谢您的建议!我想问一件事。我正在尝试创建一个带有 2 个 TPL 块的简单控制台程序。 1 个块将是 Transform 块(获取消息,即“开始”)并将数据加载到列表中,另一个块将是 ActionBlock(仅从列表中读取数据并打印它们)。下面是代码:

namespace TPL_Dataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            Random randn = new Random();

            var loadData = new TransformBlock<string, List<int>>(async sample_string =>
           {
               List<int> input_data = new List<int>();
               int cnt = 0;

                if (sample_string == "start")
                {
                   Console.WriteLine("Inside loadData");
                   while (cnt < 16)
                   {
                       input_data.Add(randn.Next(1, 255));
                       await Task.Delay(1500);
                       Console.WriteLine("Cnt");
                       cnt++;
                   }
                                    }
                else
                {
                    Console.WriteLine("Not started yet");

                }
            return input_data;
           });


            var PrintData = new ActionBlock<List<int>>(async input_data =>
            {
                while(input_data.Count > 0)
                {


                    Console.WriteLine("output Data = " + input_data.First());
                    await Task.Delay(1000);
                    input_data.RemoveAt(0);
                    
                }
 

              });

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            loadData.LinkTo(PrintData, input_data => input_data.Count() >0  );
            //loadData.LinkTo(PrintData, linkOptions);
            
            loadData.SendAsync("start");
            loadData.Complete();
            PrintData.Completion.Wait();

        }
    }
}

但它似乎以串行方式工作..我做错了什么?我尝试异步执行 while 循环。我想并行做这两件事。当从列表中获得数据时,然后绘制。


你可以使用TransformManyBlock<string, int> https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformmanyblock-2作为生产者区块,以及ActionBlock<int>作为消费者块。这TransformManyBlock将使用接受的构造函数进行实例化Func<string, IEnumerable<int>>代表,并通过了迭代器方法 https://learn.microsoft.com/en-us/dotnet/csharp/iterators (the Produce方法(下例中的方法)会一一生成值:

Random random = new Random();

var producer = new TransformManyBlock<string, int>(Produce);

IEnumerable<int> Produce(string message)
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1, 255);
            Console.WriteLine($"Producing #{value}");
            yield return value;
            Thread.Sleep(1500);
            cnt++;
        }
    }
    else
    {
        yield break;
    }
}

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

producer.LinkTo(consumer, new() { PropagateCompletion = true });

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

不幸的是,生产者必须在产生每个值之间的空闲期间阻塞工作线程(Thread.Sleep(1500);),因为TransformManyBlock目前没有接受的构造函数Func<string, IAsyncEnumerable<int>>。这可能会在下一版本中修复TPL数据流 https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library图书馆。你可以追踪this https://github.com/dotnet/runtime/issues/30863GitHub 问题,了解此功能何时发布。


替代解决方案:您可以保持它们不链接,然后手动将生产者生成的值发送给消费者,而不是显式链接生产者和消费者。在这种情况下,两个块都是ActionBlock https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1s:

Random random = new Random();

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

var producer = new ActionBlock<string>(async message =>
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1, 255);
            Console.WriteLine($"Producing #{value}");
            var accepted = await consumer.SendAsync(value);
            if (!accepted) break; // The consumer has failed
            await Task.Delay(1500);
            cnt++;
        }
    }
});

PropagateCompletion(producer, consumer);

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
    try { await source.Completion.ConfigureAwait(false); } catch { }
    var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
    if (ex != null) target.Fault(ex); else target.Complete();
}

这种方法的主要困难是如何将生产者的完成情况传播给消费者,以便最终两个块都完成。显然你不能使用new DataflowLinkOptions { PropagateCompletion = true }配置,因为块没有显式链接。你也不能Complete手动消费者,因为在这种情况下,它会过早地停止接受来自生产者的值。这个问题的解决方案是PropagateCompletion方法如上例所示。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

异步任务、视频缓冲 的相关文章

随机推荐

  • 通过 cocoa pods 创建的 .xcworkspace 中的 .xcodeproj 文件被禁用

    我浏览互联网没有找到答案 所以这就是问题所在 我有一个项目 我在其中使用了 cocoa pods 因此通过 cocoa pods 为我创建的工作区文件打开它 但是在将我的操作系统 全新安装 更新到 MacOS Sierra 后 当我在 xc
  • Haskell Cabal 包 - 找不到 Paths_ 模块

    我正在开发一个 Haskell 项目 Happstack 服务器 Blaze HTML 前端作为主要库 我想添加一个静态数据目录 看起来你可以使用 Cabal 使用自动生成的Path
  • 在 django Rest 框架中实现角色

    我正在构建一个 API 应该拥有以下类型的用户 super user 创建 管理管理员 admin 管理事件 模型 和事件参与者 participants 参加活动 受管理员邀请参加活动 另外我想让每种类型的用户都有电话号码字段 I tri
  • 有没有办法设置一个变量一次并在多个地方使用它而不给它模块级别的范围?

    我有一个循环将用户窗体控件添加到集合中 由于多个地方都需要该集合 因此我将其放入模块中并在需要时调用它 这意味着该集合仅在需要时才位于内存中 但这也意味着我每次想要使用它时都会运行一个循环 I could已给出集合模块级别范围并在第一次需要
  • 类型“Promise”上不存在属性“finally”

    我试图对承诺使用finally 方法 但我不断收到此错误 Property finally does not exist on type Promise
  • 向 TextField 添加提示

    我想添加一个TextField带有集成的提示文本 用户提示 占位符 直到用户输入文本 当 TextField 获得焦点时 提示文本会消失 如果 TextField 失去焦点且未输入任何文本 则提示文本会重新出现 我最初认为这将是 Vaadi
  • Inno Setup:允许用户只选择可以安装软件的驱动器?

    我可以允许用户只选择要安装软件的驱动器吗 例如 他们可以选择C or D drive C Software D Software 但用户不能指定任何其他内容 就像他们不能选择安装下面的软件一样Downloads or MyDocumnets
  • 如何停止 CTE 中的递归?

    我有一个数据库表 如下所示 ID PredecessorID Data 43b1e103 d8c6 40f9 b031 e5d9ef18a739 null 55f6951b 5ed3 46c8 9ad5 64e496cb521a 43b1e
  • 为什么 hibernate 在 SAVE 之前执行 SELECT?

    为什么 hibernate 在保存对象之前要进行选择 我在互联网上找不到有用的信息 这是每次保存之前的正常行为吗 我发现这个话题 选择 hibernateTemplate save 的查询运行 https stackoverflow com
  • 如何在没有 SyncAdapter 的 Android 上实现帐户

    我正在利用内置帐户系统 使用 AccountManager API 为 Android 应用程序实现一个登录系统 在 Android 2 2 上一切都很好 但在 Android 2 1 上不包含 SyncAdapter 会导致帐户设置屏幕中
  • 新的 Windows 应用程序 - 什么语言?

    我们目前正处于开发 Windows 桌面应用程序的前期阶段 但当听到有关 Windows 8 Silverlight WPF Jupiter 的所有最新讨论时 我不知道该相信什么了 现在用WPF启动一个新项目是不是有问题 我应该切换到 Si
  • 检查多个位置的值并仅在源唯一时返回匹配项

    假设我有一个清单Vendors 阿斯达 乐购 Spar 我有一个清单Sources 或者这个类比中的供应商 家乐氏 Kellogg 吉百利 Cadbury 雀巢 Nestle 强生 Johnsons 帮宝适 Pampers Simple 等
  • Git推送更新远程服务器信息失败

    当我尝试将新分支推送到远程源时 出现以下错误 我能够在现有分支上推送提交 并且现有分支上没有问题 git push u origin master1 Fetching remote heads refs refs tags refs hea
  • Ajax 将文件上传到内容类型为 Multipart 的 GoLang 服务器

    我正在尝试使用多部分表单将音频文件上传到 Golang 服务器 然而 Go 返回错误 multipart NextPart bufio buffer full 我相信这表明我的 Javascript 请求中存在不属于多部分格式的内容 这是我
  • Java 中的递归下降解析器

    我想在序言中说这是我三年级编程语言课的家庭作业 我正在寻求一些帮助 我的作业如下 截止日期 2013年2月22日晚上11点55分提交 请将以下内容上传到CMS 1 源代码2 程序执行的屏幕截图 包括您使用的输入文件 使用您喜欢的任何编程语言
  • Firebase 云消息传递 requireInteraction 不起作用

    参考 https github com firebase quickstart js tree master messaging https github com firebase quickstart js tree master mes
  • 调整双 JTable 中列大小的问题

    我正在创建一个包含 2 个 JTable 的自定义组件 一个作为主数据网格 另一个作为始终可见的摘要栏 我已经提出了这个解决方案 但是调整列大小并没有按应有的方式工作 任何想法我做错了什么 import java awt BorderLay
  • C 编程:正向变量参数列表

    我正在尝试编写一个函数 它接受可变数量的参数 如 printf 执行一些操作 然后将变量列表传递给 printf 我不知道如何做到这一点 因为它似乎必须将它们推入堆栈 大约是这样的 http pastie org 694844 http p
  • C++ 中的默认初始化

    我有一个关于 C 中默认初始化的问题 我被告知非 POD 对象将自动初始化 但我对下面的代码感到困惑 为什么当我使用指针时 变量 i 被初始化为 0 但是当我声明局部变量时 却不是 我使用 g 作为编译器 class INT public
  • 异步任务、视频缓冲

    我正在尝试理解 C 中的任务 但仍然遇到一些问题 我正在尝试创建一个包含视频的应用程序 主要目的是从文件中读取视频 我使用 Emgu CV 并通过 TCP IP 发送它以在板上进行处理 然后以流 实时 方式返回 首先 我是连续做的 所以 读