我正在使用 java 库从我的代码中订阅订阅。使用sbt:"com.google.cloud" % "google-cloud-pubsub" % "0.24.0-beta"
我按照本指南编写订阅者:https://cloud.google.com/pubsub/docs/pull https://cloud.google.com/pubsub/docs/pull
val projectId = "test-topic"
val subscriptionId = "test-sub"
def main(args: Array[String]): Unit = {
val subscriptionName = SubscriptionName.create(projectId, subscriptionId)
val subscriber = Subscriber.defaultBuilder(subscriptionName, new PastEventMessageReceiver()).build()
subscriber.startAsync()
System.in.read()
}
class PastEventMessageReceiver extends MessageReceiver {
override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
println(message)
consumer.ack()
}
它工作得很好,我能够拉出已发布的消息,但我每分钟都会在日志中多次看到此错误。
com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1 onFailure
WARNING: Terminated streaming with exception
io.grpc.StatusRuntimeException: UNAVAILABLE: The service was unable to fulfill your request. Please try again. [code=8a75]
at io.grpc.Status.asRuntimeException(Status.java:526)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:385)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:422)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:61)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:504)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:425)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:536)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:102)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
当我第一次运行该应用程序时,有一个小的延迟(大约 1-2 分钟),我没有看到该错误,延迟之后我每分钟会看到多次该错误。我的应用程序似乎仍然能够艰难地拉取消息。
此消息是 Google Cloud Pub/Sub 库中的内部错误,当发送到 Pub/Sub 服务器的请求中出现断开连接或可重试错误时,就会发生此错误。客户端库应该无缝地重新创建连接并重试这些错误的请求。在客户端库的 0.26.0-beta 版及更高版本中,这些错误不应再在日志中打印出来,除非您将日志级别设置为 FINE https://github.com/GoogleCloudPlatform/google-cloud-java/commit/540734e6e9bce69f47854d2aabfbe61f1ac53631#diff-a876d55b1b7adce753a2915708a68ee8。一般来说,发生此错误后,消息仍应继续发送到您的 MessageReceiver。客户端库本身无法重试的任何错误(例如,未找到订阅时)都会传播回调用方。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)