Deepkit Broker

Broker Queue

Deepkit Message Queue is a message queue system that allows you to send messages to the queue server and have workers process them.

The system is designed to be type-safe and automatically serializes and deserializes messages (using BSON).

The data is persisted on the server, so even if the server crashes, the data is not lost.

Usage

import { BrokerQueue, BrokerQueueChannel } from '@deepkit/broker';

const queue = new BrokerQueue(adapter);

type User = { id: number, username: string };

const registrationChannel = queue.channel<User>('user/registered', {
  process: QueueMessageProcessing.exactlyOnce,
  deduplicationInterval: '1s',
});

// a worker consumes the messages.
// this is usually done in a separate process.
await registrationChannel.consume(async (user) => {
  console.log('User registered', user);
  // if the worker crashes here, the message is not lost.
  // it will be automatically redelivered to another worker.
  // if this callback returns without an error, the message is 
  // marked as processed and eventually removed.
});

// the application sending the message
await registrationChannel.produce({ id: 1, username: 'Peter' });

App Usage

A full example of how to use the BrokerQueue in your application. The class is automatically available in the dependency injection container if you import the FrameworkModule. See the Getting started page for more information.

To utilize the queue systems the most, it is recommended to start several workers that consume the messages. You would write a separate App that is different to the main application that might have http routes etc.

You share common services via a shared app module. Channel definitions are shared via the rest of the application in a common file.

// file: channels.ts

export type RegistrationChannel = BrokerQueueChannel<User>;
export const registrationChannelProvider = provide<RegistrationChannel>((queue: BrokerQueue) => queue.channel<User>('user/registered', {
  process: QueueMessageProcessing.exactlyOnce,
  deduplicationInterval: '1s',
}));
// file: worker.ts
import { RegistrationChannel, registrationChannelProvider } from './channels';

async function consumerCommand(
  channel: RegistrationChannel, 
  database: Database) {

  await channel.consume(async (user) => {
    // do something with the user,
    // maybe store information, send emails, etc.
  });

  // the connection to the broker keeps the process alive.
}

const app = new App({
  providers: [
    Database,
    registrationChannelProvider,
  ],
  imports: [
    new FrameworkModule({}),
  ],
});

app.command('consumer', consumerCommand);

// start the worker command above directly
void app.run('consumer');

And in your application you produce messages like this:

// file: app.ts
import { RegistrationChannel, registrationChannelProvider } from './channels';

class Service {
  constructor(private channel: RegistrationChannel) {
  }

  async registerUser(user: User) {
    await this.channel.produce(user);
  }
}

const app = new App({
  providers: [
    Service,
    registrationChannelProvider,
  ],
  imports: [
    new FrameworkModule({}),
  ],
});

void app.run();