在 grpc python 中处理异步流请求

2024-05-11

我试图了解如何使用双向流处理 grpc api(使用 Python API)。

假设我有以下简单的服务器定义:

syntax = "proto3";
package simple;

service TestService {
  rpc Translate(stream Msg) returns (stream Msg){}
}

message Msg
{
 string msg = 1;
}

假设从客户端发送的消息是异步发送的(由于用户选择了一些 ui 元素)。

为客户端生成的 python 存根将包含一个方法Translate它将接受一个生成器函数并返回一个迭代器。

我不清楚的是如何编写生成器函数来返回用户创建的消息。在等待消息时在线程上休眠听起来并不是最好的解决方案。


现在这有点笨拙,但您可以按如下方式完成您的用例:

#!/usr/bin/env python

from __future__ import print_function

import time
import random
import collections
import threading

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
import grpc

from translate_pb2 import Msg
from translate_pb2_grpc import TestServiceStub
from translate_pb2_grpc import TestServiceServicer
from translate_pb2_grpc import add_TestServiceServicer_to_server


def translate_next(msg):
    return ''.join(reversed(msg))


class Translator(TestServiceServicer):
  def Translate(self, request_iterator, context):
    for req in request_iterator:
      print("Translating message: {}".format(req.msg))
      yield Msg(msg=translate_next(req.msg))

class TranslatorClient(object):
  def __init__(self):
    self._stop_event = threading.Event()
    self._request_condition = threading.Condition()
    self._response_condition = threading.Condition()
    self._requests = collections.deque()
    self._last_request = None
    self._expected_responses = collections.deque()
    self._responses = {}

  def _next(self):
    with self._request_condition:
      while not self._requests and not self._stop_event.is_set():
        self._request_condition.wait()
      if len(self._requests) > 0:
        return self._requests.popleft()
      else:
        raise StopIteration()

  def next(self):
    return self._next()

  def __next__(self):
    return self._next()

  def add_response(self, response):
    with self._response_condition:
      request = self._expected_responses.popleft()
      self._responses[request] = response
      self._response_condition.notify_all()

  def add_request(self, request):
    with self._request_condition:
      self._requests.append(request)
      with self._response_condition:
        self._expected_responses.append(request.msg)
      self._request_condition.notify()

  def close(self):
    self._stop_event.set()
    with self._request_condition:
      self._request_condition.notify()

  def translate(self, to_translate):
    self.add_request(to_translate)
    with self._response_condition:
      while True:
        self._response_condition.wait()
        if to_translate.msg in self._responses:
          return self._responses[to_translate.msg]


def _run_client(address, translator_client):
  with grpc.insecure_channel('localhost:50054') as channel:
    stub = TestServiceStub(channel)
    responses = stub.Translate(translator_client)
    for resp in responses:
      translator_client.add_response(resp)

def main():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_TestServiceServicer_to_server(Translator(), server)
  server.add_insecure_port('[::]:50054')
  server.start()
  translator_client = TranslatorClient()
  client_thread = threading.Thread(
      target=_run_client, args=('localhost:50054', translator_client))
  client_thread.start()

  def _translate(to_translate):
    return translator_client.translate(Msg(msg=to_translate)).msg

  translator_pool = futures.ThreadPoolExecutor(max_workers=4)
  to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)
  translations = translator_pool.map(_translate, to_translate)
  print("Translations: {}".format(zip(to_translate, translations)))

  translator_client.close()
  client_thread.join()
  server.stop(None)


if __name__ == "__main__":
  main()

基本思想是有一个名为TranslatorClient在单独的线程上运行,关联请求和响应。它期望响应将按照请求发送的顺序返回。它还实现了迭代器接口,以便您可以将其直接传递给Translate存根上的方法。

我们启动一个正在运行的线程_run_client从中提取响应TranslatorClient并在另一端反馈它们add_response.

The main我在这里包含的函数实际上只是一个稻草人,因为我没有您的 UI 代码的详细信息。我在跑_translate in a ThreadPoolExecutor为了证明这一点,尽管translator_client.translate是同步的,它会产生,允许您同时处理多个正在进行的请求。

我们认识到,对于这样一个简单的用例,需要编写大量代码。最终的答案将是asyncio支持。我们在不久的将来对此有计划。但就目前而言,无论您运行的是 python 2 还是 python 3,这种解决方案都应该能让您继续下去。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 grpc python 中处理异步流请求 的相关文章

随机推荐

  • 如何使用 javascript 禁用组合键?

    I would like to disable view source shortcut key for IE using JavaScript To disable Ctrl C I am using the following func
  • 超慢的表格布局性能

    我遇到了糟糕的 TableLayout 性能 我在这里读过一些帖子 谈论同样的事情 Android 动态创建表 性能不佳 https stackoverflow com questions 9813427 android dynamical
  • pdflatex: \includegraphics{} -> 找不到文件

    首先 我知道这个问题已经存在了成百上千次 但我在过去四个小时内找到的给出的答案都没有解决我的具体问题 我在这里变得疯狂 我将非常感谢任何帮助和建议 尝试编译一个非常简单的 tex 文件 其中包括 包括图形命令 我最终收到 文件未找到 错误
  • AS3 [Event(name="", type="")],有什么意义?

    我使用 FlashDevelop3 R2 和 Flex 3 3 SDK 进行开发 在很多情况下我必须使用嵌入元数据标签 如下所示 Embed source path to file private var Asset Class 我很好地理
  • Spring Oauth2. DaoAuthenticationProvider 中未设置密码编码器

    我对 Spring Oauth 和 Spring Security 很陌生 我正在尝试在我的项目中使用 client credentials 流程 现在 我设法使用自己的 CustomDetailsS ervice 来从系统中已存在的数据库
  • 通过API更新Twitter背景

    我在通过 Twitter 的 API 更新背景时遇到了一些问题 target url http www google com logos 11th birthday gif ch curl init curl setopt ch CURLO
  • 用于验证 InetSocketAddresses 的正则表达式(ipv4/v6 + 端口地址)

    我在寻找testedipv4 和 ipv6 的正则表达式InetSocket地址 http download oracle com javase 6 docs api java net InetSocketAddress html toSt
  • 在 WebView 中打开 PDF 文件

    大约 2 天 我尝试在我的自定义中打开 PDF 文件WebvView 这是我的WebView code import android app AlertDialog import android app ProgressDialog imp
  • 当我移动我的 pygame 角色时,它会留下痕迹[重复]

    这个问题在这里已经有答案了 我一直在尝试用 Python 制作一个游戏 但是当我移动我的角色时 它会留下痕迹 我知道它并没有显示出那么多 但如果你靠近的话 你可以看到这条踪迹 这真的让我很困扰 这是我的代码 import pygame im
  • jQuery异步ajax查询和返回值问题(范围、闭包)

    由于异步查询和变量范围问题 代码无法正常工作 我不明白如何解决这个问题 使用 async false 更改为 ajax 方法 不是一个选项 我知道闭包 但我如何在这里实现它 不知道 我已经在这里看到了有关 js 中的闭包和 jQuery 异
  • Win7下Jupyter Notebook中撤消文本输入

    我很惊讶我没有在 Win7 下的 Jupyter Notebook 上或在网络上找到这个问题的答案 我只是错误地覆盖了调用块的大部分 但不是全部 自动保存可以追溯到之前的一些我不想恢复的更改 Jupyter Notebook 中文本输入的撤
  • android中ScrollView中的图像

    在我的应用程序中 我想放置一个 png 文件 并且希望它在横向和纵向模式下都被视为滚动图像 请建议代码或示例 要使您的 Imageview 在高度不适合时滚动 您可以在 xml 中的 ScrollView 内添加一个 ImageView 并
  • 即使在 Excel 2007 中插入行时也保持绝对引用

    我有一个电子表格 我希望单元格公式始终查看特定单元格 即使插入行或列并且特定单元格移动也是如此 实际上 我总是想查看表格的 顶部 单元格 即使在表格顶部插入了新行 例如 单元格 A2 的公式为 E 2 现在我突出显示第 1 行并执行 插入行
  • std::make_pair 与浮点数组(float2,无符号整数)

    我有一个用 float2 unsigned int 对模板化的向量 例如 std vector
  • 当我们第一次部署 WAR 文件时,某些代码可以运行吗?

    是否有任何方法或 API 可以使用 以便每当我部署新的 WAR 文件时 部分代码都应该执行 或者当 Tomcat 启动时 相应的 servlet 应该连续启动或运行一些代码 恢复一个老问题 因为唯一的答案没有显示任何例子 为了在部署 取消部
  • 返回即将推出的 YouTube API V3 视频安排日期?

    我想要返回 YouTube 中的直播的安排日期 Example of scheduled streams YT链接 https www youtube com channel UCP7jMXSY2xbc3KCAE0MHQ A https w
  • Pandas 使用什么规则来生成视图和副本?

    我对 Pandas 在决定数据帧中的选择是原始数据帧的副本或原始数据帧的视图时使用的规则感到困惑 例如 如果我有 df pd DataFrame np random randn 8 8 columns list ABCDEFGH index
  • Inno Setup安装先决条件[重复]

    这个问题在这里已经有答案了 我正在通过 Inno Setup 创建一个安装程序 我看到很多关于如何检测先决条件是否存在的代码示例 但没有任何关于当我找不到先决条件时如何实际安装先决条件的代码示例 我确信它非常简单 但是我该如何安装先决条件呢
  • 原子存储抛出错误

    我最近升级到了 C 11 兼容编译器 并且尝试将一些代码从 boost 更新到 c 11 标准 我在使用atomic store转换一些代码时遇到了问题 这是一些简单的测试代码 似乎会引发编译器错误 int main std shared
  • 在 grpc python 中处理异步流请求

    我试图了解如何使用双向流处理 grpc api 使用 Python API 假设我有以下简单的服务器定义 syntax proto3 package simple service TestService rpc Translate stre