我想使用使用 java nio 编写的 comet 服务器来发送实时更新。当接收信息时,我希望它扫描数据,并通过rabbitmq将任务发送到工作线程。理想情况下,我希望 celery 服务器位于rabbit的另一端,管理一个处理这些任务的工作线程池。
然而,根据我的理解,celery 的工作原理是位于rabbitmq 的两端,它本质上通过嵌入到消费者和生产者的代码中来接管生产者和消费者的角色。有没有办法像我上面描述的那样设置芹菜?谢谢
是的当然 !
你可以加Custom Message Consumers
到芹菜应用程序。
请参阅扩展和引导步骤在芹菜文件中。
这是上面链接中的示例代码的一部分:
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://')
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=['json'])]
def handle_message(self, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
Test it:
python -m celery -A 主要工作人员
也可以看看:将 Celery 与现有 RabbitMQ 消息结合使用
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)