fontcolor_theme
Deepkit Broker

Broker Bus

Deepkit Message Bus는 애플리케이션의 서로 다른 부분 간에 메시지나 이벤트를 전송할 수 있게 해주는 메시지 버스 시스템(Pub/Sub, 분산 이벤트 시스템)입니다.

마이크로서비스, 모놀리식, 혹은 그 밖의 어떤 형태의 애플리케이션에서도 사용할 수 있습니다. 이벤트 주도 아키텍처에 완벽히 적합합니다.

이는 프로세스 내 이벤트에 사용하는 Deepkit Event system과는 다릅니다. Broker Bus는 다른 프로세스나 서버로 보내야 하는 이벤트에 사용됩니다. 또한 FrameworkModule이 자동으로 시작한 여러 worker 간 통신이 필요할 때에도 Broker Bus는 매우 적합합니다(예: new FrameworkModule({workers: 4})).

이 시스템은 타입 안전성을 염두에 두고 설계되었으며 메시지를 자동으로 직렬화/역직렬화합니다(BSON 사용). 메시지 타입에 검증을 추가하면, 전송 전과 수신 후에 메시지를 검증합니다. 이를 통해 메시지가 항상 올바른 형식을 유지하고 기대한 데이터를 포함함을 보장합니다.

사용법

import { BrokerBus } from '@deepkit/broker';

const bus = new BrokerBus(adapter);

// move this type to a shared file
type UserEvent = { type: 'user-created', id: number } | { type: 'user-deleted', id: number };

const channel = bus.channel<Events>('user-events');

await channel.subscribe((event) => {
  if (event.type === 'user-created') {
    console.log('User created', event.id);
  } else if (event.type === 'user-deleted') {
    console.log('User deleted', event.id);
  }
});

await channel.publish({ type: 'user-created', id: 1 });

채널에 이름과 타입을 정의하면 올바른 타입의 메시지만 전송되고 수신되도록 보장할 수 있습니다. 데이터는 자동으로 직렬화/역직렬화됩니다(BSON 사용).

앱에서의 사용

애플리케이션에서 BrokerBus를 사용하는 전체 예제입니다. FrameworkModule을 가져오면 이 클래스는 의존성 주입 컨테이너에서 자동으로 사용할 수 있게 됩니다. 자세한 내용은 시작하기 페이지를 참고하세요.

Subject

메시지를 전송하고 수신하는 기본 접근 방식은 rxjs Subject 타입을 사용하는 것입니다. 그 subscribenext 메서드는 타입 안전한 방식으로 메시지를 보내고 받도록 해줍니다. 모든 Subject 인스턴스는 브로커에 의해 관리되며, Subject가 가비지 컬렉션되면 브로커 백엔드(예: Redis)에서 구독이 제거됩니다.

메시지 발행 또는 구독 실패를 처리하려면 BrokerBus의 BusBrokerErrorHandler를 오버라이드하세요.

이 접근 방식은 비즈니스 코드와 브로커 서버를 깔끔하게 분리해 주며, 브로커 서버 없이도 테스트 환경에서 동일한 코드를 사용할 수 있게 합니다.

import { BrokerBus, BrokerBusChannel, provideBusSubject } from '@deepkit/broker';
import { FrameworkModule } from '@deepkit/framework';
import { Subject } from 'rxjs';

// 이 타입은 공유 파일로 이동하세요
type MyChannel = Subject<{
    id: number;
    name: string;
}>;

class Service {
    // MyChannel은 singleton이 아니며, 요청마다 새 인스턴스가 생성됩니다.
    // 그 수명은 프레임워크가 모니터링하며, Subject가 가비지 컬렉션되면 
    // 브로커 백엔드(예: Redis)에서 구독이 제거됩니다.
    constructor(private channel: MyChannel) {
        this.channel.subscribe((message) => {
            console.log('received message', message);
        });
    }

    update() {
        this.channel.next({ id: 1, name: 'Peter' });
    }
}

@rpc.controller('my-controller')
class MyRpcController {
    constructor(private channel: MyChannel) {
    }

    @rpc.action()
    getChannelData(): MyChannel {
        return this.channel;
    }
}

const app = new App({
    controllers: [MyRpcController],
    providers: [
        Service,
        provideBusSubject<MyChannel>('my-channel'),
    ],
    imports: [
        new FrameworkModule(),
    ],
});

BusChannel

메시지 전송에 대한 확인이 필요하고 각 경우마다 에러를 처리하고 싶다면 BrokerBusChannel 타입을 사용할 수 있습니다. subscribepublish 메서드는 Promise를 반환합니다.

import { BrokerBus, BrokerBusChannel, provideBusChannel } from '@deepkit/broker';
import { FrameworkModule } from '@deepkit/framework';

// 이 타입은 공유 파일로 이동하세요
type MyChannel = BrokerBusChannel<{
    id: number;
    name: string;
}>;

class Service {
    constructor(private channel: MyChannel) {
        this.channel.subscribe((message) => {
            console.log('received message', message);
        }).catch(e => {
            console.error('Error while subscribing', e);
        });
    }

    async update() {
        await this.channel.publish({ id: 1, name: 'Peter' });
    }
}

const app = new App({
    providers: [
        Service,
        provideBusChannel<MyChannel>('my-channel'),
    ],
    imports: [
        new FrameworkModule(),
    ],
});
English中文 (Chinese)한국어 (Korean)日本語 (Japanese)Deutsch (German)