fontcolor_theme
Deepkit Broker

Broker Queue

Deepkit Message Queue는 메시지를 큐 서버로 전송하고 워커가 이를 처리할 수 있게 해주는 메시지 큐 시스템입니다.

이 시스템은 type-safe하도록 설계되어 있으며 메시지를 자동으로 직렬화/역직렬화합니다(BSON 사용).

데이터는 서버에 영구 저장되므로 서버가 크래시하더라도 데이터가 유실되지 않습니다.

사용법

import { BrokerQueue, BrokerQueueChannel } from '@deepkit/broker';

const queue = new BrokerQueue(adapter);

type User = { id: number, username: string };

const registrationChannel = queue.channel<User>('user/registered', {
  process: QueueMessageProcessing.exactlyOnce,
  deduplicationInterval: '1s',
});

// 워커가 메시지를 소비합니다.
// 일반적으로 별도의 프로세스에서 수행됩니다.
await registrationChannel.consume(async (user) => {
  console.log('User registered', user);
  // 워커가 여기서 크래시하더라도 메시지는 유실되지 않습니다.
  // 다른 워커에게 자동으로 다시 전달됩니다.
  // 이 콜백이 에러 없이 반환되면, 
  // 메시지는 처리 완료로 표시되고 결국 제거됩니다.
});

// 메시지를 전송하는 애플리케이션
await registrationChannel.produce({ id: 1, username: 'Peter' });

앱 사용법

애플리케이션에서 BrokerQueue를 사용하는 전체 예시입니다. FrameworkModule를 임포트하면 해당 Class는 의존성 주입 컨테이너에서 자동으로 사용 가능합니다. 자세한 내용은 Getting started 페이지를 참조하세요.

큐 시스템을 최대한 활용하려면 메시지를 소비하는 여러 워커를 실행하는 것이 좋습니다. http 라우트 등을 가진 메인 애플리케이션과는 별도의 App을 작성합니다.

공통 Service는 공유 앱 모듈을 통해 공유합니다. Channel 정의는 애플리케이션 전반에서 공용 파일을 통해 공유합니다.

// file: channels.ts

export type RegistrationChannel = BrokerQueueChannel<User>;
export const registrationChannelProvider = provide<RegistrationChannel>((queue: BrokerQueue) => queue.channel<User>('user/registered', {
  process: QueueMessageProcessing.exactlyOnce,
  deduplicationInterval: '1s',
}));
// file: worker.ts
import { RegistrationChannel, registrationChannelProvider } from './channels';

async function consumerCommand(
  channel: RegistrationChannel, 
  database: Database) {

  await channel.consume(async (user) => {
    // user로 무언가를 수행하고,
    // 정보를 저장하거나 이메일을 보내는 등 작업을 수행합니다.
  });

  // broker에 대한 연결이 프로세스를 유지합니다.
}

const app = new App({
  providers: [
    Database,
    registrationChannelProvider,
  ],
  imports: [
    new FrameworkModule({}),
  ],
});

app.command('consumer', consumerCommand);

// 위의 worker 커맨드를 직접 시작합니다
void app.run('consumer');

그리고 애플리케이션에서는 다음과 같이 메시지를 생성(produce)합니다:

// file: app.ts
import { RegistrationChannel, registrationChannelProvider } from './channels';

class Service {
  constructor(private channel: RegistrationChannel) {
  }

  async registerUser(user: User) {
    await this.channel.produce(user);
  }
}

const app = new App({
  providers: [
    Service,
    registrationChannelProvider,
  ],
  imports: [
    new FrameworkModule({}),
  ],
});

void app.run();
English中文 (Chinese)한국어 (Korean)日本語 (Japanese)Deutsch (German)