fontcolor_theme
Deepkit Broker

Broker 队列

Deepkit Message Queue 是一个消息队列系统,它允许你将消息发送到队列服务器并由 worker 进行处理。

该系统设计为类型安全,并自动序列化和反序列化消息(使用 BSON)。

数据持久化在服务器上,因此即使服务器崩溃,数据也不会丢失。

使用

import { BrokerQueue, BrokerQueueChannel } from '@deepkit/broker';

const queue = new BrokerQueue(adapter);

type User = { id: number, username: string };

const registrationChannel = queue.channel<User>('user/registered', {
  process: QueueMessageProcessing.exactlyOnce,
  deduplicationInterval: '1s',
});
// 一个 worker 消费消息。
// 这通常在单独的进程中完成。
await registrationChannel.consume(async (user) => {
  console.log('User registered', user);
  // 如果 worker 在这里崩溃,消息不会丢失。
  // 它会自动重新投递给另一个 worker。
  // 如果这个回调未抛出错误就返回,该消息会被
  // 标记为已处理,并最终被移除。
});

// 应用程序发送消息
await registrationChannel.produce({ id: 1, username: 'Peter' });

在应用中的使用

一个在应用中如何使用 BrokerQueue 的完整示例。 如果你导入了 FrameworkModule,该类会自动在依赖注入容器中可用。 更多信息请参见“入门”页面。

为了最大化利用队列系统,建议启动多个 worker 来消费消息。 你会编写一个与主应用(可能有 http 路由等)不同的独立 App

通过共享的应用模块共享通用服务。通道定义放在一个公共文件中,供应用的其余部分共享。

// 文件:channels.ts

export type RegistrationChannel = BrokerQueueChannel<User>;
export const registrationChannelProvider = provide<RegistrationChannel>((queue: BrokerQueue) => queue.channel<User>('user/registered', {
  process: QueueMessageProcessing.exactlyOnce,
  deduplicationInterval: '1s',
}));
// 文件:worker.ts
import { RegistrationChannel, registrationChannelProvider } from './channels';

async function consumerCommand(
  channel: RegistrationChannel, 
  database: Database) {

  await channel.consume(async (user) => {
    // 对该用户执行一些操作,
    // 例如存储信息、发送邮件等。
  });

  // 与 broker 的连接会保持进程存活。
}

const app = new App({
  providers: [
    Database,
    registrationChannelProvider,
  ],
  imports: [
    new FrameworkModule({}),
  ],
});

app.command('consumer', consumerCommand);

// 直接启动上面的 worker 命令
void app.run('consumer');

在你的应用中,你可以像这样生产消息:

// 文件:app.ts
import { RegistrationChannel, registrationChannelProvider } from './channels';

class Service {
  constructor(private channel: RegistrationChannel) {
  }

  async registerUser(user: User) {
    await this.channel.produce(user);
  }
}

const app = new App({
  providers: [
    Service,
    registrationChannelProvider,
  ],
  imports: [
    new FrameworkModule({}),
  ],
});

void app.run();
English中文 (Chinese)한국어 (Korean)日本語 (Japanese)Deutsch (German)