github
DocsBlog
fontcolor_theme
package

API @deepkit/broker

npm install @deepkit/broker

This package contains the abstraction of Deepkit Broker, as well as a server implementation, and several client implementations as adapters (BrokerDeepkitAdapter, BrokerMemoryAdapter).

Classes

BrokerConnection [source]
export class BrokerConnection extends RpcKernelBaseConnection {
    constructor(stats: RpcStats, logger: Logger, transportConnection: TransportConnection, connections: RpcKernelConnections, injector: InjectorContext, eventDispatcher: EventDispatcher, protected state: BrokerState);
    close(): void;
    async onMessage(message: RpcMessage, response: RpcMessageBuilder): Promise<void>;
}
BrokerState [source]
export class BrokerState {
    /**
     * Simple key/value store.
     */
    keyStore;
    /**
     * Cache store.
     */
    cacheStore;
    subscriptions;
    entityFields;
    queues;
    locker;
    enableSnapshot;
    snapshotInterval;
    snapshotPath;
    snapshotting;
    /**
     * All connections in this list are notified about cache invalidations.
     */
    invalidationCacheMessageConnections: BrokerConnection[];
    getEntityFields(name: string): string[];
    publishEntityFields(name: string, fields: string[]): boolean;
    unsubscribeEntityFields(name: string, fields: string[]);
    lock(id: string, ttl: number, timeout: number = ): Promise<ProcessLock>;
    tryLock(id: string, ttl: number = ): Promise<ProcessLock | undefined>;
    isLocked(id: string): boolean;
    unsubscribe(channel: string, connection: BrokerConnection);
    subscribe(channel: string, connection: BrokerConnection);
    publish(channel: string, v: Uint8Array);
    queueSubscribe(queueName: string, connection: BrokerConnection, maxParallel: number);
    queueUnsubscribe(queueName: string, connection: BrokerConnection);
    queuePublish(body: BrokerQueuePublish);
    /**
     * When a queue message has been sent to a consumer and the consumer answers.
     */
    queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: {
        error?: string;
        success: boolean;
        delay?: number;
    });
    setCache(id: string, data: Uint8Array, ttl: number);
    getCache(id: string): {
        data: Uint8Array;
        ttl: number;
    } | undefined;
    deleteCache(id: string);
    increment(id: string, v?: number): number;
    setKey(id: string, data: Uint8Array, ttl: number);
    getKey(id: string): Uint8Array | undefined;
    deleteKey(id: string);
}
BrokerKernel [source]
export class BrokerKernel extends RpcKernel {
    createConnection(transport: TransportConnection): BrokerConnection;
}
BrokerQueueMessage [source]
export class BrokerQueueMessage<T> {
    state:  |  | ;
    error?: Error;
    tries: number;
    delayed: number;
    constructor(public channel: string, public data: T);
    failed(error: Error);
    delay(seconds: number);
}
BrokerQueue [source]
export class BrokerQueue {
    constructor(public adapter: BrokerAdapterQueue);
    channel<T>(name: string, options?: BrokerQueueChannelOptions, type?: ReceiveType<T>): BrokerQueueChannel<T>;
}
BrokerQueueChannel [source]
export class BrokerQueueChannel<T> {
    constructor(public name: string, private adapter: BrokerAdapterQueue, options: BrokerQueueChannelOptions | undefined, private type: Type);
    async produce<T>(message: T, options?: BrokerAdapterQueueProduceOptions): Promise<void>;
    async consume(callback: (message: BrokerQueueMessage<T>) => Promise<void> | void, options: {
        maxParallel?: number;
    } = {}): Promise<Release>;
}
BrokerBusSubject [source]
export class BrokerBusSubject<T> extends Subject<T> {
    constructor(public handle: BrokerBusSubjectHandle, protected onFirst: () => void, protected onLast: () => void, protected onPublish: (value: T) => void);
    override next(value: T, publish = true): void;
}
BusBrokerErrorHandler [source]
export class BusBrokerErrorHandler {
    constructor(protected logger?: Logger);
    publishFailed(path: string, message: unknown, type: Type, error: Error);
    subscribeFailed(path: string, error: Error);
}
BrokerBus [source]
export class BrokerBus {
    constructor(public adapter: BrokerAdapterBus, errorHandler?: BusBrokerErrorHandler);
    /**
     * Creates a broker channel handle for a given path and type.
     *
     * @param path Unique identifier of the broker channel.
     * @param type Optional message type for type-safe handling.
     */
    channel<T>(path: string, type?: ReceiveType<T>): BrokerBusChannel<T>;
    /**
     * Creates a Subject for the given broker channel.
     * Subscription to the broker is delayed until the first observer subscribes.
     *
     * Calling `.next()` publishes a message to the broker and does not forward
     * it to its observers immediately. Only messages from the broker are forwarded to observers.
     * This is to ensure consistent behaviour with other broker subjects from the same path.
     *
     * @param path Unique broker path.
     * @param type Optional type for strongly typed messages.
     *
     * @example
     * ```typescript
     * const subject = bus.subject<string>('updates');
     * subject.subscribe(console.log);
     * subject.next('hello');
     * ```
     */
    subject<T>(path: string, type?: ReceiveType<T>): Subject<T>;
    /**
     * Ensures the provided Subject is actively subscribed to the broker.
     * This guarantees that all messages from this point forward will be buffered,
     * and replayed to the first observer to avoid data loss.
     *
     * The subject unsubscribes from the broker automatically when all observers
     * are gone. To make it active, you need to subscribe to it first or call `activateSubject`.
     *
     * @throws Error when the channel could not be subscribed to.
     *
     * @example
     * ```typescript
     * const subject = bus.subject<string>('updates');
     * await bus.activateSubject(subject);
     * subject.subscribe(value => {
     *    // receives all messages from the time of activation
     * });
     * ```
     */
    async activateSubject<T extends Subject<any>>(subject: T): Promise<T>;
}
BrokerBusChannel [source]
export class BrokerBusChannel<T> {
    constructor(public name: string, protected adapter: BrokerAdapterBus, public type: Type);
    async publish(message: T);
    async subscribe(callback: (message: T) => void): Promise<Release>;
}
BrokerLock [source]
export class BrokerLock {
    constructor(public adapter: BrokerAdapterLock);
    item(id: string, options: Partial<BrokerTimeOptions> = {}): BrokerLockItem;
}
BrokerLockItem [source]
export class BrokerLockItem {
    constructor(private id: string, private adapter: BrokerAdapterLock, private options: BrokerTimeOptionsResolved);
    async [Symbol.asyncDispose]();
    /**
     * Disposable way of acquiring a lock. Automatically releases the lock when the returned object is disposed.
     *
     * @example
     * ```typescript
     * async function doSomething() {
     *   async using hold = lock.hold();
     *
     *   // do stuff
     *
     *   // when out of scope, lock is automatically released.
     * }
     * ```
     */
    async hold();
    /**
     * Returns true if the current lock object is the holder of the lock.
     *
     * This does not check whether the lock is acquired by someone else.
     * Use isReserved() if you want to check that.
     */
    get acquired(): boolean;
    /**
     * Acquires the lock. If the lock is already acquired by someone else, this method waits until the lock is released.
     *
     * @throws BrokerLockError when lock is already acquired by this object.
     */
    async acquire(): Promise<this>;
    /**
     * Checks if the lock is acquired by someone else.
     */
    async isReserved(): Promise<boolean>;
    /**
     * Tries to acquire the lock.
     * If the lock is already acquired, nothing happens.
     *
     * @throws BrokerLockError when lock is already acquired by this object.
     */
    async try(): Promise<this | undefined>;
    /**
     * Releases the lock.
     */
    async release(): Promise<void>;
}
BrokerCacheStore [source]
export class BrokerCacheStore {
    /**
     * This is a short-lived cache pool for the current process.
     * Values are fetched from the broker when not available and stored here for a short time (configurable).
     */
    cache;
    constructor(public config: BrokerCacheOptionsResolved);
    invalidate(key: string);
    set(key: string, value: CacheStoreEntry);
}
BrokerCacheItem [source]
export class BrokerCacheItem<T> {
    constructor(private key: string, private builder: CacheBuilder<T>, private options: BrokerCacheItemOptionsResolved, private adapter: BrokerAdapterCache, private store: BrokerCacheStore, private type: Type, private logger: LoggerInterface);
    async set(value: T);
    async invalidate();
    async exists(): Promise<boolean>;
    async get(): Promise<T>;
}
BrokerCache [source]
export class BrokerCache {
    constructor(private adapter: BrokerAdapterCache, config: Partial<BrokerCacheOptions> = {}, private logger: LoggerInterface = new ConsoleLogger());
    item<T>(key: string, builder: CacheBuilder<T>, options?: Partial<BrokerCacheItemOptions>, type?: ReceiveType<T>): BrokerCacheItem<T>;
}
BrokerKeyValueItem [source]
export class BrokerKeyValueItem<T> {
    constructor(private key: string, private type: Type, private adapter: BrokerAdapterKeyValue, private options: BrokerKeyValueOptionsResolved);
    /**
     * @see BrokerKeyValue.get
     */
    async get(): Promise<T>;
    /**
     * @see BrokerKeyValue.set
     */
    async set(value: T): Promise<void>;
    /**
     * @see BrokerKeyValue.increment
     */
    async increment(value: number): Promise<number>;
    async remove(): Promise<void>;
}
BrokerKeyValue [source]
export class BrokerKeyValue {
    constructor(private adapter: BrokerAdapterKeyValue, config: Partial<BrokerKeyValueOptions> = {}, private logger: LoggerInterface = new ConsoleLogger());
    /**
     * Returns a new BrokerKeyValueItem for the given key.
     */
    item<T>(key: string, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): BrokerKeyValueItem<T>;
    /**
     * Returns the value for the given key.
     */
    async get<T>(key: string, type?: ReceiveType<T>): Promise<T>;
    /**
     * Sets the value for the given key.
     */
    async set<T>(key: string, value: T, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): Promise<void>;
    async remove(key: string): Promise<void>;
    /**
     * Increments the value for the given key by the given value.
     * Note that this is not compatible to get/set, as it only works with numbers.
     * Since this an atomic increment, there is no way to get the current value via `get` and then increment it,
     * but you have to use `increment(0)` to get the current value.
     */
    async increment(key: string, value: number): Promise<number>;
}
BrokerDeepkitConnection [source]
export class BrokerDeepkitConnection extends RpcBaseClient {
    activeChannels;
    consumers;
    subscribedToInvalidations?: ((key: string) => void)[];
}
BrokerDeepkitAdapterOptions [source]
export class BrokerDeepkitAdapterOptions {
    servers: Server[];
}
BrokerDeepkitPool [source]
export class BrokerDeepkitPool {
    connections: {
        connection: BrokerDeepkitConnection;
        server: Server;
    }[];
    constructor(public options: BrokerDeepkitAdapterOptions);
    getConnection(key: string): BrokerDeepkitConnection;
    async disconnect(): Promise<void>;
}
BrokerDeepkitAdapter [source]
export class BrokerDeepkitAdapter implements BrokerAdapter {
    constructor(public options: BrokerDeepkitAdapterOptions);
    async disconnect(): Promise<void>;
    onInvalidateCache(callback: (key: string) => void): void;
    async invalidateCache(key: string): Promise<void>;
    async setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise<void>;
    async getCacheMeta(key: string): Promise<{
        ttl: number;
    } | undefined>;
    async getCache(key: string, type: Type): Promise<{
        value: any;
        ttl: number;
    } | undefined>;
    async set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<void>;
    async get(key: string, type: Type): Promise<any>;
    async remove(key: string): Promise<any>;
    async increment(key: string, value: any): Promise<number>;
    async isLocked(id: string): Promise<boolean>;
    async lock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
    async tryLock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
    async publish(key: string, message: any, type: Type): Promise<void>;
    async subscribe(key: string, callback: (message: any) => void, type: Type): Promise<Release>;
    async produce<T>(key: string, message: T, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise<void>;
    async consume(key: string, callback: (message: BrokerQueueMessage<any>) => Promise<void>, options: {
        maxParallel: number;
    }, type: Type): Promise<Release>;
}

This is the Broker adapter for Deepkit Broker server.

BrokerMemoryAdapter [source]
export class BrokerMemoryAdapter extends BrokerDeepkitAdapter {
    constructor();
}

This adapter is only for testing purposes. It uses the in-memory broker kernel (server) and communicates with it via in-memory RPC transport adapter.

Events

onBrokerLock [source]
EventToken<BaseEvent>

Errors

BrokerLockError [source]
export class BrokerLockError extends Error {
}
BrokerCacheError [source]
export class BrokerCacheError extends Error {
}

Functions

provideBusChannel [source]
<T extends BrokerBusChannel<any>>(path: string, type?: ReceiveType<T>): Provider

Provides a bus channel for the given path for

provideBusSubject [source]
<T extends Subject<any>>(path: string, type?: ReceiveType<T>): Provider

Provides a bus Subject for the given channel path for

isBrokerAdapterCache [source]
(adapter: AnyAdapter): adapter is BrokerAdapterCache
isBrokerAdapterBus [source]
(adapter: AnyAdapter): adapter is BrokerAdapterBus
isBrokerAdapterLock [source]
(adapter: AnyAdapter): adapter is BrokerAdapterLock
isBrokerAdapterQueue [source]
(adapter: AnyAdapter): adapter is BrokerAdapterQueue
isBrokerAdapterKeyValue [source]
(adapter: AnyAdapter): adapter is BrokerAdapterKeyValue
parseBrokerKeyValueOptions [source]
(options: Partial<BrokerKeyValueOptions>): BrokerKeyValueOptionsResolved

Types

BrokerTimeOptions [source]
interface BrokerTimeOptions {
    /**
     * Time to live in milliseconds. 0 means no ttl.
     * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
     */
    ttl: string | number;

    /**
     * Timeout in milliseconds. 0 means no timeout.
     * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
     */
    timeout: number | string;
}
BrokerTimeOptionsResolved [source]
interface BrokerTimeOptionsResolved {
    /**
     * Time to live in milliseconds. 0 means no ttl.
     */
    ttl: number;

    /**
     * Timeout in milliseconds. 0 means no timeout.
     */
    timeout: number;
}
BrokerQueueMessageProcessingOptions [source]
type BrokerQueueMessageProcessingOptions = {
    process: QueueMessageProcessing.atLeastOnce
} | {
    process: QueueMessageProcessing.exactlyOnce;
    deduplicationInterval?: string,
    hash?: string | number;
};
BrokerQueueMessageProcessingOptionsResolved [source]
interface BrokerQueueMessageProcessingOptionsResolved {
    process: QueueMessageProcessing;
    deduplicationInterval: number;
    hash?: string | number;
}
BrokerAdapterQueueProduceOptions [source]
type BrokerAdapterQueueProduceOptions = { delay?: string; priority?: number } & BrokerQueueMessageProcessingOptions;
BrokerAdapterQueueProduceOptionsResolved [source]
interface BrokerAdapterQueueProduceOptionsResolved extends BrokerQueueMessageProcessingOptionsResolved {
    delay?: number;
    priority?: number;
}
Release [source]
type Release = () => Promise<void>;
BrokerInvalidateCacheMessage [source]
interface BrokerInvalidateCacheMessage {
    key: string;
    ttl: number;
}
BrokerAdapterBase [source]
interface BrokerAdapterBase {
    disconnect(): Promise<void>;

    logger?: Logger;
}
BrokerAdapterLock [source]
interface BrokerAdapterLock extends BrokerAdapterBase {
    /**
     * Acquires the lock. If the lock is already acquired by someone else, this method waits until the lock is released.
     */
    lock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;

    /**
     * Checks if the lock is acquired by someone else.
     */
    isLocked(id: string): Promise<boolean>;

    /**
     * Tries to acquire the lock.
     * If the lock is already acquired, nothing happens.
     */
    tryLock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
}
BrokerAdapterBus [source]
interface BrokerAdapterBus extends BrokerAdapterBase {
    /**
     * Publish a message on the bus aka pub/sub.
     */
    publish(name: string, message: any, type: Type): Promise<void>;

    /**
     * Subscribe to messages on the bus aka pub/sub.
     */
    subscribe(name: string, callback: (message: any) => void, type: Type): Promise<Release>;
}
BrokerAdapterQueue [source]
interface BrokerAdapterQueue extends BrokerAdapterBase {
    /**
     * Consume messages from a queue.
     */
    consume(name: string, callback: (message: any) => Promise<void>, options: { maxParallel: number }, type: Type): Promise<Release>;

    /**
     * Produce a message to a queue.
     */
    produce(name: string, message: any, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise<void>;
}
BrokerQueueChannelOptions [source]
type BrokerQueueChannelOptions = BrokerQueueMessageProcessingOptions;
BrokerQueueChannelOptionsResolved [source]
type BrokerQueueChannelOptionsResolved = BrokerQueueMessageProcessingOptionsResolved;
TypeOfSubject [source]
type TypeOfSubject<T extends Subject<any>> = T extends Subject<infer U> ? U : never;
BrokerAdapter [source]
type BrokerAdapter = BrokerAdapterCache & BrokerAdapterBus & BrokerAdapterLock & BrokerAdapterQueue & BrokerAdapterKeyValue;
AnyAdapter [source]
type AnyAdapter = BrokerAdapterCache | BrokerAdapterBus | BrokerAdapterLock | BrokerAdapterQueue | BrokerAdapterKeyValue;
BrokerCacheOptions [source]
interface BrokerCacheOptions {
    /**
     * Relative time to live in milliseconds. 0 means no ttl.
     *
     * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
     */
    ttl: number | string;

    /**
     * How many ms the cache is allowed to be stale. Set to 0 to disable stale cache.
     * Default is 1000ms.
     * Improves performance by serving slightly stale cache while the cache is being rebuilt.
     */
    maxStale: number | string;

    /**
     * How many ms the cache is allowed to be stored in-memory. Set to 0 to disable in-memory cache.
     */
    inMemoryTtl: number | string;
}
BrokerCacheOptionsResolved [source]
interface BrokerCacheOptionsResolved extends BrokerCacheOptions {
    ttl: number;
    maxStale: number;
    inMemoryTtl: number;
}
BrokerCacheItemOptions [source]
interface BrokerCacheItemOptions {
    /**
     * Relative time to live in milliseconds. 0 means no ttl.
     *
     * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
     */
    ttl: number | string;

    tags: string[];

    /**
     * How many ms the cache is allowed to be stale. Set to 0 to disable stale cache.
     * Default is 1000ms.
     * Improves performance by serving slightly stale cache while the cache is being rebuilt.
     */
    maxStale: number | string;
}
BrokerCacheItemOptionsResolved [source]
interface BrokerCacheItemOptionsResolved extends BrokerCacheItemOptions {
    ttl: number;
    maxStale: number;
}
BrokerAdapterCache [source]
interface BrokerAdapterCache extends BrokerAdapterBase {
    getCache(key: string, type: Type): Promise<{ value: any, ttl: number } | undefined>;

    getCacheMeta(key: string): Promise<{ ttl: number } | undefined>;

    setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise<void>;

    invalidateCache(key: string): Promise<void>;

    onInvalidateCache(callback: (key: string) => void): void;
}
CacheBuilder [source]
type CacheBuilder<T> = () => T | Promise<T>;
BrokerKeyValueOptions [source]
interface BrokerKeyValueOptions {
    /**
     * Relative time to live in milliseconds. 0 means no ttl.
     *
     * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
     */
    ttl: number | string;
}
BrokerKeyValueOptionsResolved [source]
interface BrokerKeyValueOptionsResolved extends BrokerKeyValueOptions {
    ttl: number;
}
BrokerAdapterKeyValue [source]
interface BrokerAdapterKeyValue extends BrokerAdapterBase {
    get(key: string, type: Type): Promise<any>;

    set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<any>;

    remove(key: string): Promise<any>;

    increment(key: string, value: number): Promise<number>;
}
Server [source]
interface Server {
    url: string;
    weight?: number;
    secretKey?: string;
    transport?: ClientTransportAdapter;
}