The documentation you are viewing is for Dapr v1.12 which is an older version of Dapr. For up-to-date documentation, see the latest version.
开始使用 Dapr Python gRPC 服务扩展
如何使用 Dapr Python gRPC 扩展包启动和运行
Dapr Python SDK 提供了一个内置的 gRPC 服务器扩展模块 dapr.ext.grpc
,用于创建 Dapr 服务。
安装
您可以使用以下命令下载并安装 Dapr gRPC 服务器扩展模块:
pip install dapr-ext-grpc
Note
开发包包含的功能和行为将兼容此前发行的 Dapr 运行时。 在安装 dapr-dev 包之前,请务必卸载以前任意稳定版本的 Python SDK 扩展包。
pip3 install dapr-ext-grpc-dev
示例
App
对象可以用来创建服务器。
监听服务调用请求
InvokeMethodReqest
和 InvokeMethodResponse
对象可用于处理传入请求。
监听并响应请求的简单服务将如下所示:
from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
app = App()
@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
print(request.metadata, flush=True)
print(request.text(), flush=True)
return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")
app.run(50051)
完整的示例可以在 这里 找到。
订阅主题
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
app = App()
# Default subscription for a topic
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> None:
print(event.Data(),flush=True)
# Specific handler using Pub/Sub routing
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
print(event.Data(),flush=True)
app.run(50051)
完整的示例可以在 这里 找到。
设置输入绑定触发器
from dapr.ext.grpc import App, BindingRequest
app = App()
@app.binding('kafkaBinding')
def binding(request: BindingRequest):
print(request.text(), flush=True)
app.run(50051)
完整的示例可以在 这里 找到。
相关链接
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.