fontcolor_theme

Deepkit Broker

Deepkit Broker 是一个针对消息队列、消息总线、事件总线、发布/订阅、键值存储、缓存以及原子操作的高性能抽象。它以类型安全为核心,具备自动序列化与验证、高性能和可扩展性。

Deepkit Broker 同时是客户端和服务器。它可以作为独立服务器使用,或作为客户端连接到其他 Deepkit Broker 服务器。它被设计用于微服务架构,也可以用于单体应用。

客户端使用适配器模式以支持不同的后端。你可以用相同的代码在不同后端之间切换,甚至可以同时使用多个后端。

目前有 3 个可用适配器。一个是默认的 BrokerDeepkitAdapter,它与 Deepkit Broker 服务器通信,并随 Deepkit Broker 开箱即用(包括服务器)。 第二个是位于 @deepkit/broker-redisBrokerRedisAdapter,它与 Redis 服务器通信。第三个是 BrokerMemoryAdapter,用于测试的内存适配器。

安装

使用 Deepkit 框架 时会默认安装并启用 Deepkit Broker。否则,可以通过以下方式安装:

npm install @deepkit/broker

Broker 类

Deepkit Broker 提供以下主要的 broker 类:

  • BrokerCache - 二级缓存(L2)抽象与缓存失效
  • BrokerBus - 消息总线(发布/订阅)
  • BrokerQueue - 队列系统
  • BrokerLock - 分布式锁
  • BrokerKeyValue - 键值存储

这些类通过接收一个 broker 适配器,以类型安全的方式与 broker 服务器通信。

import { BrokerBus, BrokerMemoryAdapter } from '@deepkit/broker';

const bus = new BrokerBus(new BrokerMemoryAdapter());
const channel = bus.channel<{ foo: string }>('my-channel-name');
await channel.subscribe((message) => {
  console.log('received message', message);
});

await channel.publish({ foo: 'bar' });

FrameworkModule 提供并注册默认适配器并连接到 Deepkit Broker 服务器,该服务器默认在同一进程中运行。

借助运行时类型以及调用 channel<Type>('my-channel-name');,一切都是类型安全的,消息可在适配器中直接自动验证与序列化。 默认实现 BrokerDeepkitAdapter 会为你自动处理这些(并使用 BSON 进行序列化)。

请注意,每个 broker 类都有自己的适配器接口,因此你可以只实现所需的方法。BrokerDeepkitAdapter 实现了所有这些接口,可用于所有 broker API。

应用集成

要在使用依赖注入的 Deepkit 应用中使用这些类,可以使用 FrameworkModule,它提供以下内容:

  • broker 服务器的默认适配器
  • broker 服务器本身(并自动启动)
  • 将所有 broker 类注册为提供者

FrameworkModule 会基于给定配置为已配置的 broker 服务器提供默认的 broker 适配器。 它还会为所有 broker 类注册提供者,因此你可以将它们(例如 BrokerBus)直接注入到你的服务和提供者工厂中。

// 在单独文件中,例如 broker-channels.ts
type MyBusChannel = BrokerBusChannel<MyMessage>;

const app = new App({
  providers: [
    Service,
    provide<MyBusChannel>((bus: BrokerBus) => bus.channel<MyMessage>('my-channel-name')),
  ],
  imports: [new FrameworkModule({
    broker: {
      // 如果 startOnBootstrap 为 true,broker 服务器将启动在此地址。可以是 Unix 套接字路径或 host:port 组合
      listen: 'localhost:8811', // 或 'var/broker.sock';
      // 若要使用不同的 broker 服务器,这里是其地址。可以是 Unix 套接字路径或 host:port 组合。
      host: 'localhost:8811', //或 'var/broker.sock';
      // 在主进程中自动启动单个 broker。若有自定义 broker 节点,请禁用它。
      startOnBootstrap: true,
    },
  })],
});

然后你可以将派生的 broker 类(或直接使用 broker 类)注入到你的服务中:

import { MyBusChannel } from './broker-channels.ts';

class Service {
  constructor(private bus: MyBusChannel) {
  }

  async addUser() {
    await this.bus.publish({ foo: 'bar' });
  }
}

为你的通道创建一个派生类型(如上所示的 MyBusChannel)通常是个好主意,这样你就可以轻松地将它们注入到服务中。 否则,你每次需要时都必须注入 BrokerBus 并调用 channel<MyMessage>('my-channel-name'),这既容易出错,也不符合 DRY 原则。

几乎所有 broker 类都支持这种派生方式,你可以在一个地方轻松定义并在各处使用。有关更多信息,请参阅相应的 broker 类文档。

自定义适配器

如果你需要自定义适配器,可以通过实现 @deepkit/broker 中以下接口之一或多个来创建自己的适配器:

export type BrokerAdapter = BrokerAdapterCache & BrokerAdapterBus & BrokerAdapterLock & BrokerAdapterQueue & BrokerAdapterKeyValue;
import { BrokerAdapterBus, BrokerBus, Release } from '@deepkit/broker';
import { Type } from '@deepkit/type';

class MyAdapter implements BrokerAdapterBus {
  disconnect(): Promise<void> {
    // 实现:从 broker 服务器断开连接
  }
  async publish(name: string, message: any, type: Type): Promise<void> {
    // 实现:向 broker 服务器发送消息,name 为 'my-channel-name',message 为 { foo: 'bar' }
  }
  async subscribe(name: string, callback: (message: any) => void, type: Type): Promise<Release> {
    // 实现:订阅 broker 服务器,name 为 'my-channel-name'
  }
}

// 或使用默认适配器 BrokerDeepkitAdapter
const adapter = new MyAdapter;
const bus = new BrokerBus(adapter);

Broker 服务器

默认情况下,一旦引入 FrameworkModule 并运行 server:start 命令,Broker 服务器就会自动启动。 所有 Broker 类默认都配置为连接到该服务器。

在生产环境中,你通常会在单独的进程或不同的机器上运行 broker 服务器。 你会使用 server:broker:start 来启动 broker 服务器,而不是 server:start

如果你的流量很大并且需要扩展性,应该改用 Redis 适配器。它的设置更复杂, 因为你需要运行一个 Redis 服务器,但性能更好,可以处理更多的流量。

ts-node app.ts server:broker:start

这会在(例如通过 new FrameworkModule({broker: {listen: 'localhost:8811'}}))配置的地址上启动服务器。 如果启用了 app.loadConfigFromEnv({prefix: 'APP_', namingStrategy: 'upper'});,你也可以通过环境变量修改地址:

APP_FRAMEWORK_BROKER_LISTEN=localhost:8811 ts-node app.ts server:broker:start

如果你手动启动服务器,请确保在应用配置中通过 startOnBootstrap: false 禁用自动启动 broker 服务器。

English中文 (Chinese)한국어 (Korean)日本語 (Japanese)Deutsch (German)