fontcolor_theme
Deepkit Broker

Broker 总线

Deepkit 消息总线是一种消息总线系统(发布/订阅,分布式事件系统),它允许你在应用的不同部分之间发送消息或事件。

它可用于微服务、单体或任何其他类型的应用。非常适合事件驱动架构。

它不同于 Deepkit 事件系统,后者用于进程内事件。Broker 总线用于需要发送到其他进程或服务器的事件。当你希望在由 FrameworkModule 自动启动的多个 worker 之间通信时,Broker 总线也非常适合,例如 new FrameworkModule({workers: 4})

该系统旨在保持类型安全,并会自动序列化与反序列化消息(使用 BSON)。如果你为消息类型添加了验证,它还会在发送前和接收后验证消息。这确保消息始终具有正确的格式并包含预期的数据。

用法

import { BrokerBus } from '@deepkit/broker';

const bus = new BrokerBus(adapter);

// 将此类型移动到共享文件
type UserEvent = { type: 'user-created', id: number } | { type: 'user-deleted', id: number };

const channel = bus.channel<Events>('user-events');

await channel.subscribe((event) => {
  if (event.type === 'user-created') {
    console.log('User created', event.id);
  } else if (event.type === 'user-deleted') {
    console.log('User deleted', event.id);
  }
});

await channel.publish({ type: 'user-created', id: 1 });

通过为通道定义名称和类型,你可以确保只发送和接收正确类型的消息。 数据会被自动序列化与反序列化(使用 BSON)。

在应用中使用

一个在你的应用中使用 BrokerBus 的完整示例。 如果你导入 FrameworkModule,该类会自动在依赖注入容器中可用。 有关更多信息,请参阅入门页面。

Subject

发送和监听消息的默认方式是使用 rxjs 的 Subject 类型。它的 subscribenext 方法使你能够以类型安全的方式发送和接收消息。所有 Subject 实例都由代理管理,一旦该 Subject 被垃圾回收,其订阅将从代理后端(例如 Redis)中移除。

覆盖 BrokerBus 的 BusBrokerErrorHandler 以处理发布或订阅消息时的失败。

这种方法很好地将你的业务代码与代理服务器解耦,并允许你在没有代理服务器的测试环境中使用相同的代码。

import { BrokerBus, BrokerBusChannel, provideBusSubject } from '@deepkit/broker';
import { FrameworkModule } from '@deepkit/framework';
import { Subject } from 'rxjs';

// 将此类型移动到共享文件
type MyChannel = Subject<{
    id: number;
    name: string;
}>;

class Service {
    // MyChannel 不是单例,而是为每个请求创建一个新实例。
    // 其生命周期由框架监控,一旦该 subject 被垃圾回收, 
    // 订阅将从代理后端(例如 Redis)中移除。
    constructor(private channel: MyChannel) {
        this.channel.subscribe((message) => {
            console.log('received message', message);
        });
    }

    update() {
        this.channel.next({ id: 1, name: 'Peter' });
    }
}

@rpc.controller('my-controller')
class MyRpcController {
    constructor(private channel: MyChannel) {
    }

    @rpc.action()
    getChannelData(): MyChannel {
        return this.channel;
    }
}

const app = new App({
    controllers: [MyRpcController],
    providers: [
        Service,
        provideBusSubject<MyChannel>('my-channel'),
    ],
    imports: [
        new FrameworkModule(),
    ],
});

总线通道

如果你需要确认消息已发送,并希望在每种情况下处理错误,可以使用 BrokerBusChannel 类型。它的 subscribepublish 方法会返回一个 Promise。

import { BrokerBus, BrokerBusChannel, provideBusChannel } from '@deepkit/broker';
import { FrameworkModule } from '@deepkit/framework';

// 将此类型移动到共享文件
type MyChannel = BrokerBusChannel<{
    id: number;
    name: string;
}>;

class Service {
    constructor(private channel: MyChannel) {
        this.channel.subscribe((message) => {
            console.log('received message', message);
        }).catch(e => {
            console.error('Error while subscribing', e);
        });
    }

    async update() {
        await this.channel.publish({ id: 1, name: 'Peter' });
    }
}

const app = new App({
    providers: [
        Service,
        provideBusChannel<MyChannel>('my-channel'),
    ],
    imports: [
        new FrameworkModule(),
    ],
});
English中文 (Chinese)한국어 (Korean)日本語 (Japanese)Deutsch (German)