Elixir 向所有订阅者直播

2024-01-11

我正在尝试在 Elixir 中实现一个无线电服务器

一个进程始终在工作并读取文件(mp3)并发布到主题“:radio”,当前用于测试目的,当它完成时会重新开始

每个连接订阅主题“:radio”

我不明白如何将块发送到所有订阅的连接,连接在 2 或 3 个块后关闭

defmodule Plugtest do
  import Plug.Conn

  def init(opts), do: opts

  def start() do
    Plug.Adapters.Cowboy.http(Plugtest, [])
    {:ok, _pid} = PubSub.start_link()
    spawn(fn -> stream_from_file("./song.mp3", 128) end)
  end

  def call(conn, _opts) do
    conn = conn
    |> send_chunked(200)
    |> put_resp_content_type("audio/mpeg")

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
#    File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only
  end

  defp send_chunk_to_connection(conn) do
    receive do
      {:radio_data, data} ->
        IO.inspect "* #{inspect self()} * [ #{inspect conn.owner} ] [ #{inspect data} ]"
#        Enum.into(data, conn) # not working TODO send chunk to connection
        {:ok, conn} = chunk(conn, data)
        send_chunk_to_connection(conn)
    end
  end

  defp stream_from_file(fpath, bytes) do
    File.stream!(fpath, [], bytes)
    |> Enum.each(fn chunk ->
      PubSub.publish(:radio, {:radio_data, chunk})
    end)
    stream_from_file(fpath, bytes)
  end

end

堆栈跟踪 :

[error] Process #PID<0.274.0> raised an exception
        ** (MatchError) no match of right hand side value: {:error, :closed}    
            (plugtest) lib/plugtest.ex:26: Plugtest.send_chunk_to_connection/1

依赖项:

  defp deps do
    [{:plug, "~> 1.0"}, {:cowboy, "~> 1.0"}, {:pubsub, "~> 0.0.2"}]
  end

在 @maxdec 评论后编辑

defmodule Plugtest do
  import Plug.Conn

  @file_path "./song.mp3"
  @port 4000
  @chunk_size 128

  def init(opts), do: opts

  def start() do
    Plug.Adapters.Cowboy.http Plugtest, [], port: @port
    {:ok, _pid} = PubSub.start_link()
    spawn fn ->
        stream_from_file(@file_path, @chunk_size)
    end
  end

  def call(conn, _opts) do
    conn = conn
    |> send_chunked(200)
    |> put_resp_content_type("audio/mpeg")

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
#    File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only
    conn
  end
  defp send_chunk_to_connection(conn) do
    receive do
      {:radio_data, data} ->
        case chunk(conn, data) do
          {:ok, conn} -> send_chunk_to_connection(conn)
          {:error, err} -> IO.puts err # do nothing, as something went wrong (client disconnection or something else...)
        end
    end
  end

  defp stream_from_file(fpath, bytes) do
    File.stream!(fpath, [], bytes)
    |> Enum.each(fn chunk ->
      PubSub.publish(:radio, {:radio_data, chunk})
    end)
    stream_from_file(fpath, bytes)
  end

end

快速浏览后,我认为您应该解决两件事:

  1. PlugTest is a Plug so call/2应该返回conn(但这不是你的问题)。它还应该在等待事件时阻塞(receive):
    def call(conn, _opts) do
      conn = conn
      |> send_chunked(200)
      |> put_resp_content_type("audio/mpeg")

      :ok = PubSub.subscribe(self(), :radio)
      send_chunk_to_connection(conn)
    end
  1. In send_chunk_to_connection你应该做:
    defp send_chunk_to_connection(conn) do
      receive do
        {:radio_data, data} ->
          case chunk(conn, data) do
            {:ok, conn} -> send_chunk_to_connection(conn)
            {:error, err} -> IO.puts err; conn # do nothing, as something went wrong (client disconnection or something else...)
          end
      end
    end
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Elixir 向所有订阅者直播 的相关文章

  • 使用 ImageIO 发送图像流?

    我设置了一个 ServerSocket 和一个 Socket 因此 ServerSocket 使用 ImageIO write 发送图像流 并且 Socket 尝试读取它们并用它们更新 JFrame 所以我想知道 ImageIO 是否可以检
  • 我应该将 FLV 文件放在哪里才能在本地 Red5 服务器上进行流式传输?

    我安装了最新的 Red5 服务器 但我不确定将 flv 文件放在哪里来进行流式传输 没有像我在网上找到的一些教程那样的 streams 或 ofla 目录 我应该将 flv 文件放在哪里来进行流式传输 Red5 附带了一些演示 但默认情况下
  • 使用流式 JSON 输出构建简单的 Nodejs API

    我正在尝试构建一个简单的基于 Node js 的流 API 我想做的就是当我点击服务器 URL 时 输出应该流式传输一组测试数据 JSON 如 Twitter 流 API var app require express var server
  • 使用 Servlet 启动 VLC HTTP Stream 时出现问题

    我正在为自己开发一个 VLC 项目 我的目标是创建一个 HTML 前端来启动流 我通过使用 Java Servlet 来完成此操作 概述 乌班图13 04 Java 7 21 冰茶 2 3 9 Eclipse JAVAEE IDE 雄猫7
  • GenServer 正常关闭

    我使用 GenServer 编写了一个 Elixir 应用程序 该应用程序在启动时启动外部应用程序 然后将其关闭 并在退出时进行其他清理 我在中添加了启动功能init 1 https hexdocs pm elixir GenServer
  • MySQL使用BLOB的二进制存储VS OS文件系统:大文件、大数量、大问题

    我正在运行的版本 基本上 最新的一切 PHP 5 3 1MySQL 5 1 41阿帕奇 2 2 14操作系统 CentOS 最新 情况是这样的 我有数千个非常重要的文档 从客户合同到语音签名 客户对合同的授权录音 文件类型包括但不限于jpg
  • 流媒体视频文件?

    我需要流式传输 flv 文件 流媒体应该看起来像直播 我应该有一种方法可以更改目标文件 抱歉我的英语不好 如果 流式传输 的意思是 显示 Flash 视频剪辑 则 flv streaming 并不是真正的流式传输 而是正常的文件传输 即使文
  • Ecto 中按日期时间查询

    这是我尝试过的 date Ecto DateTime from erl calendar universal time query gt where record record deadline gt date 我也尝试过 date Ect
  • 如何编写在日期时间字段上执行 group_by MONTH 的 Ecto 查询

    我正在执行 ecto 查询并尝试进行分组q created date 此查询成功执行了 GROUP BY 但它是按秒执行的 我正在尝试按月分组 MYQUERY gt group by q q created date q id 有没有类似的
  • Phoenix 中的 web/static 和 priv/static 有什么区别?

    我对长生不老药和凤凰是新手 现在我在凤凰城的静态资产方面遇到了麻烦 我想在我的页面中添加一个js文件 我在我的模板中添加以下代码 js test js gt gt 然后创建一个js文件web static js test js 但是 我收到
  • Python 2.7:支持一个端口上多个连接的流式 HTTP 服务器

    我正在寻找一个标准的Python 2 7包 提供一个同时执行的HTTP服务器流媒体同一端口号上的连接 嘿 各位版主 请停止将我的问题标记为想要以非流媒体方式提供服务的问题的重复项 例如 python 中的多线程 Web 服务器 https
  • 使用 Elixir 设置日期格式

    我正在尝试将 Timex 模块格式化为某种外观 我正在尝试获取今天的日期 但我希望它的格式如下 2017 12 12 年 月 日 在 ruby 中 我会去 strftime 类 但我不知道如何使用 Elixir 来做到这一点 目前的尝试 T
  • 预加载所有关系

    我有一个与此类似的 ERM ModelA ModelB ModelC 我得到模型及其模型如下 modela Repo get ModelA 1 modela preload modela modelb 现在我可以访问ModelBs with
  • 使用 Android Exoplayer 调整 Dash 流音量

    我正在尝试设置一个搜索栏来控制 exoplayer 流式破折号实例的级别 我正在使用的设置是演示项目的修改版本 并且无法确定我应该尝试影响搜索栏输出的哪个元素 即如何正确使用 MSG SET VOLUME 等 任何意见将不胜感激 我正在寻找
  • Elixir 中的小数四舍五入

    我有这个十进制数 Elixir c1 Decimal div a1 b1 gt Decimal lt 0 006453859622556080696687234694 gt 如何将其四舍五入为小数点后位数较少的较短数字 正如 Dogbert
  • Elixir 如何将 Map 结构转换为 Record 结构

    我有一个 Record 结构和一个 Map 结构 例如 defmodule Foo Bar do defstruct boo nil baz nil end defmodule Foo do require Record Record de
  • 与 ecto 的逆多态性

    当前的 Ecto 文档http hexdocs pm ecto Ecto Schema html http hexdocs pm ecto Ecto Schema html仅解释如何构建belongs to多态关联的类型 当多态Commen
  • Chrome 在传输一定量的数据后挂起 - 等待可用的套接字

    我有一个浏览器游戏 最近我开始向游戏添加音频 Chrome 无法加载整个页面并卡在 91 requests 8 1 MB transferred 并且不再加载任何内容 它甚至破坏了所有其他选项卡中的网站 说Waiting for avail
  • 通过 WiFi 将视频从一部 Android 手机直播到另一部手机

    我已经在互联网上搜索了好几天关于如何通过 WiFi 连接实现从 Android 手机到另一部 Android 手机的视频流功能 但我似乎找不到任何有用的东西 我查看了 Android 开发人员的示例代码 stackoverflow goog
  • Elixir 是否支持内省以显示函数起源?

    如果一个模块import由于有多个其他模块 因此给定函数的来源可能并不明显 例如 defmodule Aimable do import Camera import Gun def trigger do shoot which import

随机推荐

  • 为什么此代码不能正确增加计数器?

    在下面的代码中 为什么它不每次将整数加一 例如 假设我有1 OF 5当我提交表格时 提交后应该是2 OF 5 但相反 它显示5 OF 5 即使我将最大值从 5 更改为 3 也会发生这种情况 它开始于1 OF 3并立即跳转到3 OF 3 这是
  • 使用UTF-8编码的VBA Excel宏写入文件[重复]

    这个问题在这里已经有答案了 我正在 Excel 中创建一个宏 用于处理电子表格并将内容 文本 写入文件 我需要将此文件编码为 UTF 8 我尝试使用 OpenTextFile TristateTrue 和 StrConv vbUnicode
  • 如何避免Angularjs中的大量依赖

    我有一个 Angular 应用程序 它运行良好 但随着我的应用程序变得越来越大 我担心必须在每个控制器中注入大量依赖项 例如 app controller viewapps scope Appfactory Menu timeout fil
  • 单独窗口中的 Android Studio 模拟器

    我的 Android Studio 遇到问题 这是我第一次安装Android Studio 问题是 当我安装任何设备时 它会在设备或模拟器周围出现灰色背景 我不知道它是否与最新的 Android Studio 版本有关 因为我已经更新了它
  • C++ 中的头文件和 Java 中的抽象类/接口实现都是相同的想法,这是否正确?

    我对 C 有点熟悉 我知道对于几乎每个头文件 我都必须创建源文件来配合它 现在我正在研究java接口和实现 它看起来是一样的 首先 您只需在一个类中命名变量和方法 然后在其他类中定义它们 C 和Java中的这些东西是不是基本相同或者相似 J
  • 本地站点的 HTML 图像

    我是 HTML 新手 有一个关于图像的简单问题 下面是我正在使用的一个简单的 html 文件 我想将图像放入其中 该图像存储在我网站的主目录中 该网站是本地的 在我的 MacBook 上 h1 My First Heading h1 p M
  • 使用字符串日期对列表进行排序 [Kotlin]

    我有数组列表typeBeanArrayList其中元素有点像日期 例如 30 03 2012 28 03 2013 31 03 2012 2 04 2012 我怎样才能排序降序 Code typeBeanArrayList database
  • 如何通过保持“templateurl”不变来编译 Angular 2 Webpack

    Webpack 通过在中生成 js 来编译 typescript 文件dist文件夹 我发现 webpack 正在将所有 templateurl 更改为 template 如下所示 我的打字稿组件 Component selector ap
  • CListCtrl:如何保持水平滚动位置?

    如何保持 CListCtrl 的水平滚动条位置 我定期转储并重新填充列表控件的内容 因此无需明确记住旧位置并恢复它 滚动就会返回到左上角 我问了一个相关问题 CListCtrl 如何保持滚动位置 https stackoverflow co
  • 雪豹上的 python 需要 32 位 libxml2 吗?

    我在我的 sl mbp 上安装 scrapy 真是太麻烦了 它需要 libxml2 所以我开始安装它 从 macports 安装它似乎并没有拉下 python 绑定 通过 scrapy 的说明从源代码安装它 here http doc sc
  • 在Eclipse中导入ant build.xml

    我有一个使用ant构建的android项目 是否可以在eclipse IDE中导入这个ant项目 更新 有一个选项可以使用 ant 创建项目build xml在 Eclipse 中 现有 ant Buildfile 中的文件 gt 新建 g
  • 按降序对两列最常见的组合进行排序

    我的数据框看起来像这样 A B C 1 3 1 2 1 1 2 3 1 1 2 1 3 1 1 1 2 1 2 1 1 1 3 1 1 2 1 我想将数据减少为仅按降序排序的两列 A 和 B 最常见的组合 输出应该看起来像 A B coun
  • 淘汰赛手风琴绑定断裂

    以下绑定在 1 9 之前有效 ko bindingHandlers accordion init function element valueAccessor var options valueAccessor setTimeout fun
  • 如何将版本信息添加到我的 powershell 脚本中?

    我有一个脚本 test ps1 如下所示 但我希望能够运行 test ps1 version并让它返回当前的version剧本给我 有没有办法做到这一点 lt SYNOPSIS Test DESCRIPTION Desc INPUTS No
  • 使用外部框架将 Vue3 自定义元素集成到 Vue2 应用程序中

    我有一个用 Vue2 编写的应用程序 它还没有真正准备好升级到 Vue3 但是 我想开始在 Vue3 中编写一个组件库 并将组件导入回 Vue2 以便在准备就绪后最终进行升级 Vue 3 2 引入defineCustomElement它工作
  • 更改选择标签中单独选项的 css font-family

    我不知道这是否可能 如果不可能 是否有人可以提出可选的想法 但我试图在选择标签中显示不同字体的下拉菜单 特别是来自 Google 字体目录的字体 在下拉列表中 我尝试通过使用其代表的字体设置每个选项的样式来显示预览
  • IFrame 内 ExtJS 5 应用程序的滚动问题

    Hy 这就是我的测试页面的样子 蓝色区域是父页面 绿色区域是运行 ExtJS 应用程序的 IFrame 内部带有标签的简单视口 如果网站在触摸设备 iPad Android 平板电脑等 上执行 则无法通过在 IFrame 绿色区域 上 擦拭
  • Django choicefield 的初始值

    我遇到一个奇怪的问题 我似乎无法在 django 中的表单中设置其中一个字段的初始值 我的模型字段是 section models CharField max length 255 choices Application Applicati
  • JTextField 与 HTML 标签一起

    我正在研究 Java 7 我正在尝试使用 HTML 标签来格式化文本 我将文本传递到 JTextField text new JTextField text setText p The program performs encryption
  • Elixir 向所有订阅者直播

    我正在尝试在 Elixir 中实现一个无线电服务器 一个进程始终在工作并读取文件 mp3 并发布到主题 radio 当前用于测试目的 当它完成时会重新开始 每个连接订阅主题 radio 我不明白如何将块发送到所有订阅的连接 连接在 2 或