The documentation you are viewing is for Dapr v1.12 which is an older version of Dapr. For up-to-date documentation, see the latest version.

无 CloudEvents 的发布/订阅

在没有 CloudEvents 的情况下使用发布订阅

介绍

Dapr 使用 CloudEvents 为事件负载提供额外的上下文,从而启用以下功能:

  • 追踪
  • 按消息 Id 进行重复数据删除
  • 用于正确反序列化事件数据的 Content-type

更多关于 CloudEvents 的信息,查看 CloudEvents 规范

当添加 Dapr 到你的应用时,某些服务可能仍需要通过未封装在 CloudEvents 中的原始发布/订阅消息进行通信。 这可能是出于兼容性原因,或者因为某些应用程序没有使用 Dapr。 Dapr 允许应用程序发布和订阅未包装在 CloudEvent 中的原始事件。

发布原始消息

Dapr 应用能够在没有云事件封装的情况下将原始事件发布到 pub/sub,以便与非 Dapr 应用兼容。

图表展示了当订阅者没有使用 Dapr 或者 CloudEvent 时如何用 Dapr 进行发布。

要禁用 CloudEvent 包装,请将 rawPayload 元数据设置为 true ,作为发布的一部分。 这允许订阅者接收这些消息,而不必分析 CloudEvent 。


curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"order-number": "345"}'

from dapr.clients import DaprClient

with DaprClient() as d:
    req_data = {
        'order-number': '345'
    }
    # Create a typed message with content type and body
    resp = d.publish_event(
        pubsub_name='pubsub',
        topic='TOPIC_A',
        data=json.dumps(req_data),
        publish_metadata={'rawPayload': 'true'}
    )
    # Print the request
    print(req_data, flush=True)

<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
    $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
    $publisher->topic('TOPIC_A')->publish('data', ['rawPayload' => 'true']);
});

订阅原始消息

Dapr 应用程序还能够订阅来自不使用 CloudEvent 封装的现有 pub/sub 的原始事件。

图表展示了当订阅者没有使用 Dapr 或者 CloudEvent 时如何用 Dapr 进行发布。

以编程方式订阅原始事件

在使用编程式订阅时,添加 rawPayload 元数据条目,以便 Dapr sidecar 自动将有效载荷包裹到与当前 Dapr SDK 兼容的 CloudEvent 中。


import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [{'pubsubname': 'pubsub',
                      'topic': 'deathStarStatus',
                      'route': 'dsstatus',
                      'metadata': {
                          'rawPayload': 'true',
                      } }]
    return jsonify(subscriptions)

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

app.run()

<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
    new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus', metadata: [ 'rawPayload' => 'true'] ),
]]));

$app->post('/dsstatus', function(
    #[\Dapr\Attributes\FromBody]
    \Dapr\PubSub\CloudEvent $cloudEvent,
    \Psr\Log\LoggerInterface $logger
    ) {
        $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
        return ['status' => 'SUCCESS'];
    }
);

$app->start();

声明式订阅原始事件

同样,您可以通过将 rawPayload 元数据条目添加到订阅自定义资源定义 (CRD) 来声明方式订阅原始事件:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: deathStarStatus
  route: /dsstatus
  pubsubname: pubsub
  metadata:
    rawPayload: "true"
scopes:
- app1
- app2

下一步