API @deepkit/broker
npm install @deepkit/broker
This package contains the abstraction of Deepkit Broker, as well as a server implementation, and several client implementations as adapters (BrokerDeepkitAdapter
, BrokerMemoryAdapter
).
This is the Broker adapter for Deepkit Broker server. This adapter is only for testing purposes. It uses the in-memory broker kernel (server)
and communicates with it via in-memory RPC transport adapter. Provides a bus channel for the given path for Provides a bus Subject for the given channel path forClasses
export class BrokerConnection extends RpcKernelBaseConnection {
constructor(stats: RpcStats, logger: Logger, transportConnection: TransportConnection, connections: RpcKernelConnections, injector: InjectorContext, eventDispatcher: EventDispatcher, protected state: BrokerState);
close(): void;
async onMessage(message: RpcMessage, response: RpcMessageBuilder): Promise<void>;
}
export class BrokerState {
/**
* Simple key/value store.
*/
keyStore;
/**
* Cache store.
*/
cacheStore;
subscriptions;
entityFields;
queues;
locker;
enableSnapshot;
snapshotInterval;
snapshotPath;
snapshotting;
/**
* All connections in this list are notified about cache invalidations.
*/
invalidationCacheMessageConnections: BrokerConnection[];
getEntityFields(name: string): string[];
publishEntityFields(name: string, fields: string[]): boolean;
unsubscribeEntityFields(name: string, fields: string[]);
lock(id: string, ttl: number, timeout: number = ): Promise<ProcessLock>;
tryLock(id: string, ttl: number = ): Promise<ProcessLock | undefined>;
isLocked(id: string): boolean;
unsubscribe(channel: string, connection: BrokerConnection);
subscribe(channel: string, connection: BrokerConnection);
publish(channel: string, v: Uint8Array);
queueSubscribe(queueName: string, connection: BrokerConnection, maxParallel: number);
queueUnsubscribe(queueName: string, connection: BrokerConnection);
queuePublish(body: BrokerQueuePublish);
/**
* When a queue message has been sent to a consumer and the consumer answers.
*/
queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: {
error?: string;
success: boolean;
delay?: number;
});
setCache(id: string, data: Uint8Array, ttl: number);
getCache(id: string): {
data: Uint8Array;
ttl: number;
} | undefined;
deleteCache(id: string);
increment(id: string, v?: number): number;
setKey(id: string, data: Uint8Array, ttl: number);
getKey(id: string): Uint8Array | undefined;
deleteKey(id: string);
}
export class BrokerKernel extends RpcKernel {
createConnection(transport: TransportConnection): BrokerConnection;
}
export class BrokerQueueMessage<T> {
state: | | ;
error?: Error;
tries: number;
delayed: number;
constructor(public channel: string, public data: T);
failed(error: Error);
delay(seconds: number);
}
export class BrokerQueue {
constructor(public adapter: BrokerAdapterQueue);
channel<T>(name: string, options?: BrokerQueueChannelOptions, type?: ReceiveType<T>): BrokerQueueChannel<T>;
}
export class BrokerQueueChannel<T> {
constructor(public name: string, private adapter: BrokerAdapterQueue, options: BrokerQueueChannelOptions | undefined, private type: Type);
async produce<T>(message: T, options?: BrokerAdapterQueueProduceOptions): Promise<void>;
async consume(callback: (message: BrokerQueueMessage<T>) => Promise<void> | void, options: {
maxParallel?: number;
} = {}): Promise<Release>;
}
export class BrokerBusSubject<T> extends Subject<T> {
constructor(public handle: BrokerBusSubjectHandle, protected onFirst: () => void, protected onLast: () => void, protected onPublish: (value: T) => void);
override next(value: T, publish = true): void;
}
export class BusBrokerErrorHandler {
constructor(protected logger?: Logger);
publishFailed(path: string, message: unknown, type: Type, error: Error);
subscribeFailed(path: string, error: Error);
}
export class BrokerBus {
constructor(public adapter: BrokerAdapterBus, errorHandler?: BusBrokerErrorHandler);
/**
* Creates a broker channel handle for a given path and type.
*
* @param path Unique identifier of the broker channel.
* @param type Optional message type for type-safe handling.
*/
channel<T>(path: string, type?: ReceiveType<T>): BrokerBusChannel<T>;
/**
* Creates a Subject for the given broker channel.
* Subscription to the broker is delayed until the first observer subscribes.
*
* Calling `.next()` publishes a message to the broker and does not forward
* it to its observers immediately. Only messages from the broker are forwarded to observers.
* This is to ensure consistent behaviour with other broker subjects from the same path.
*
* @param path Unique broker path.
* @param type Optional type for strongly typed messages.
*
* @example
* ```typescript
* const subject = bus.subject<string>('updates');
* subject.subscribe(console.log);
* subject.next('hello');
* ```
*/
subject<T>(path: string, type?: ReceiveType<T>): Subject<T>;
/**
* Ensures the provided Subject is actively subscribed to the broker.
* This guarantees that all messages from this point forward will be buffered,
* and replayed to the first observer to avoid data loss.
*
* The subject unsubscribes from the broker automatically when all observers
* are gone. To make it active, you need to subscribe to it first or call `activateSubject`.
*
* @throws Error when the channel could not be subscribed to.
*
* @example
* ```typescript
* const subject = bus.subject<string>('updates');
* await bus.activateSubject(subject);
* subject.subscribe(value => {
* // receives all messages from the time of activation
* });
* ```
*/
async activateSubject<T extends Subject<any>>(subject: T): Promise<T>;
}
export class BrokerBusChannel<T> {
constructor(public name: string, protected adapter: BrokerAdapterBus, public type: Type);
async publish(message: T);
async subscribe(callback: (message: T) => void): Promise<Release>;
}
export class BrokerLock {
constructor(public adapter: BrokerAdapterLock);
item(id: string, options: Partial<BrokerTimeOptions> = {}): BrokerLockItem;
}
export class BrokerLockItem {
constructor(private id: string, private adapter: BrokerAdapterLock, private options: BrokerTimeOptionsResolved);
async [Symbol.asyncDispose]();
/**
* Disposable way of acquiring a lock. Automatically releases the lock when the returned object is disposed.
*
* @example
* ```typescript
* async function doSomething() {
* async using hold = lock.hold();
*
* // do stuff
*
* // when out of scope, lock is automatically released.
* }
* ```
*/
async hold();
/**
* Returns true if the current lock object is the holder of the lock.
*
* This does not check whether the lock is acquired by someone else.
* Use isReserved() if you want to check that.
*/
get acquired(): boolean;
/**
* Acquires the lock. If the lock is already acquired by someone else, this method waits until the lock is released.
*
* @throws BrokerLockError when lock is already acquired by this object.
*/
async acquire(): Promise<this>;
/**
* Checks if the lock is acquired by someone else.
*/
async isReserved(): Promise<boolean>;
/**
* Tries to acquire the lock.
* If the lock is already acquired, nothing happens.
*
* @throws BrokerLockError when lock is already acquired by this object.
*/
async try(): Promise<this | undefined>;
/**
* Releases the lock.
*/
async release(): Promise<void>;
}
export class BrokerCacheStore {
/**
* This is a short-lived cache pool for the current process.
* Values are fetched from the broker when not available and stored here for a short time (configurable).
*/
cache;
constructor(public config: BrokerCacheOptionsResolved);
invalidate(key: string);
set(key: string, value: CacheStoreEntry);
}
export class BrokerCacheItem<T> {
constructor(private key: string, private builder: CacheBuilder<T>, private options: BrokerCacheItemOptionsResolved, private adapter: BrokerAdapterCache, private store: BrokerCacheStore, private type: Type, private logger: LoggerInterface);
async set(value: T);
async invalidate();
async exists(): Promise<boolean>;
async get(): Promise<T>;
}
export class BrokerCache {
constructor(private adapter: BrokerAdapterCache, config: Partial<BrokerCacheOptions> = {}, private logger: LoggerInterface = new ConsoleLogger());
item<T>(key: string, builder: CacheBuilder<T>, options?: Partial<BrokerCacheItemOptions>, type?: ReceiveType<T>): BrokerCacheItem<T>;
}
export class BrokerKeyValueItem<T> {
constructor(private key: string, private type: Type, private adapter: BrokerAdapterKeyValue, private options: BrokerKeyValueOptionsResolved);
/**
* @see BrokerKeyValue.get
*/
async get(): Promise<T>;
/**
* @see BrokerKeyValue.set
*/
async set(value: T): Promise<void>;
/**
* @see BrokerKeyValue.increment
*/
async increment(value: number): Promise<number>;
async remove(): Promise<void>;
}
export class BrokerKeyValue {
constructor(private adapter: BrokerAdapterKeyValue, config: Partial<BrokerKeyValueOptions> = {}, private logger: LoggerInterface = new ConsoleLogger());
/**
* Returns a new BrokerKeyValueItem for the given key.
*/
item<T>(key: string, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): BrokerKeyValueItem<T>;
/**
* Returns the value for the given key.
*/
async get<T>(key: string, type?: ReceiveType<T>): Promise<T>;
/**
* Sets the value for the given key.
*/
async set<T>(key: string, value: T, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): Promise<void>;
async remove(key: string): Promise<void>;
/**
* Increments the value for the given key by the given value.
* Note that this is not compatible to get/set, as it only works with numbers.
* Since this an atomic increment, there is no way to get the current value via `get` and then increment it,
* but you have to use `increment(0)` to get the current value.
*/
async increment(key: string, value: number): Promise<number>;
}
export class BrokerDeepkitConnection extends RpcBaseClient {
activeChannels;
consumers;
subscribedToInvalidations?: ((key: string) => void)[];
}
export class BrokerDeepkitAdapterOptions {
servers: Server[];
}
export class BrokerDeepkitPool {
connections: {
connection: BrokerDeepkitConnection;
server: Server;
}[];
constructor(public options: BrokerDeepkitAdapterOptions);
getConnection(key: string): BrokerDeepkitConnection;
async disconnect(): Promise<void>;
}
export class BrokerDeepkitAdapter implements BrokerAdapter {
constructor(public options: BrokerDeepkitAdapterOptions);
async disconnect(): Promise<void>;
onInvalidateCache(callback: (key: string) => void): void;
async invalidateCache(key: string): Promise<void>;
async setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise<void>;
async getCacheMeta(key: string): Promise<{
ttl: number;
} | undefined>;
async getCache(key: string, type: Type): Promise<{
value: any;
ttl: number;
} | undefined>;
async set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<void>;
async get(key: string, type: Type): Promise<any>;
async remove(key: string): Promise<any>;
async increment(key: string, value: any): Promise<number>;
async isLocked(id: string): Promise<boolean>;
async lock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
async tryLock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
async publish(key: string, message: any, type: Type): Promise<void>;
async subscribe(key: string, callback: (message: any) => void, type: Type): Promise<Release>;
async produce<T>(key: string, message: T, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise<void>;
async consume(key: string, callback: (message: BrokerQueueMessage<any>) => Promise<void>, options: {
maxParallel: number;
}, type: Type): Promise<Release>;
}
export class BrokerMemoryAdapter extends BrokerDeepkitAdapter {
constructor();
}
Events
EventToken<BaseEvent>
Errors
export class BrokerLockError extends Error {
}
export class BrokerCacheError extends Error {
}
Functions
<T extends BrokerBusChannel<any>>(path: string, type?: ReceiveType<T>): Provider
<T extends Subject<any>>(path: string, type?: ReceiveType<T>): Provider
(adapter: AnyAdapter): adapter is BrokerAdapterCache
(adapter: AnyAdapter): adapter is BrokerAdapterBus
(adapter: AnyAdapter): adapter is BrokerAdapterLock
(adapter: AnyAdapter): adapter is BrokerAdapterQueue
(adapter: AnyAdapter): adapter is BrokerAdapterKeyValue
(options: Partial<BrokerKeyValueOptions>): BrokerKeyValueOptionsResolved
Types
interface BrokerTimeOptions {
/**
* Time to live in milliseconds. 0 means no ttl.
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: string | number;
/**
* Timeout in milliseconds. 0 means no timeout.
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
timeout: number | string;
}
interface BrokerTimeOptionsResolved {
/**
* Time to live in milliseconds. 0 means no ttl.
*/
ttl: number;
/**
* Timeout in milliseconds. 0 means no timeout.
*/
timeout: number;
}
type BrokerQueueMessageProcessingOptions = {
process: QueueMessageProcessing.atLeastOnce
} | {
process: QueueMessageProcessing.exactlyOnce;
deduplicationInterval?: string,
hash?: string | number;
};
interface BrokerQueueMessageProcessingOptionsResolved {
process: QueueMessageProcessing;
deduplicationInterval: number;
hash?: string | number;
}
type BrokerAdapterQueueProduceOptions = { delay?: string; priority?: number } & BrokerQueueMessageProcessingOptions;
interface BrokerAdapterQueueProduceOptionsResolved extends BrokerQueueMessageProcessingOptionsResolved {
delay?: number;
priority?: number;
}
type Release = () => Promise<void>;
interface BrokerInvalidateCacheMessage {
key: string;
ttl: number;
}
interface BrokerAdapterBase {
disconnect(): Promise<void>;
logger?: Logger;
}
interface BrokerAdapterLock extends BrokerAdapterBase {
/**
* Acquires the lock. If the lock is already acquired by someone else, this method waits until the lock is released.
*/
lock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
/**
* Checks if the lock is acquired by someone else.
*/
isLocked(id: string): Promise<boolean>;
/**
* Tries to acquire the lock.
* If the lock is already acquired, nothing happens.
*/
tryLock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
}
interface BrokerAdapterBus extends BrokerAdapterBase {
/**
* Publish a message on the bus aka pub/sub.
*/
publish(name: string, message: any, type: Type): Promise<void>;
/**
* Subscribe to messages on the bus aka pub/sub.
*/
subscribe(name: string, callback: (message: any) => void, type: Type): Promise<Release>;
}
interface BrokerAdapterQueue extends BrokerAdapterBase {
/**
* Consume messages from a queue.
*/
consume(name: string, callback: (message: any) => Promise<void>, options: { maxParallel: number }, type: Type): Promise<Release>;
/**
* Produce a message to a queue.
*/
produce(name: string, message: any, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise<void>;
}
type BrokerQueueChannelOptions = BrokerQueueMessageProcessingOptions;
type BrokerQueueChannelOptionsResolved = BrokerQueueMessageProcessingOptionsResolved;
type TypeOfSubject<T extends Subject<any>> = T extends Subject<infer U> ? U : never;
type BrokerAdapter = BrokerAdapterCache & BrokerAdapterBus & BrokerAdapterLock & BrokerAdapterQueue & BrokerAdapterKeyValue;
type AnyAdapter = BrokerAdapterCache | BrokerAdapterBus | BrokerAdapterLock | BrokerAdapterQueue | BrokerAdapterKeyValue;
interface BrokerCacheOptions {
/**
* Relative time to live in milliseconds. 0 means no ttl.
*
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: number | string;
/**
* How many ms the cache is allowed to be stale. Set to 0 to disable stale cache.
* Default is 1000ms.
* Improves performance by serving slightly stale cache while the cache is being rebuilt.
*/
maxStale: number | string;
/**
* How many ms the cache is allowed to be stored in-memory. Set to 0 to disable in-memory cache.
*/
inMemoryTtl: number | string;
}
interface BrokerCacheOptionsResolved extends BrokerCacheOptions {
ttl: number;
maxStale: number;
inMemoryTtl: number;
}
interface BrokerCacheItemOptions {
/**
* Relative time to live in milliseconds. 0 means no ttl.
*
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: number | string;
tags: string[];
/**
* How many ms the cache is allowed to be stale. Set to 0 to disable stale cache.
* Default is 1000ms.
* Improves performance by serving slightly stale cache while the cache is being rebuilt.
*/
maxStale: number | string;
}
interface BrokerCacheItemOptionsResolved extends BrokerCacheItemOptions {
ttl: number;
maxStale: number;
}
interface BrokerAdapterCache extends BrokerAdapterBase {
getCache(key: string, type: Type): Promise<{ value: any, ttl: number } | undefined>;
getCacheMeta(key: string): Promise<{ ttl: number } | undefined>;
setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise<void>;
invalidateCache(key: string): Promise<void>;
onInvalidateCache(callback: (key: string) => void): void;
}
type CacheBuilder<T> = () => T | Promise<T>;
interface BrokerKeyValueOptions {
/**
* Relative time to live in milliseconds. 0 means no ttl.
*
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: number | string;
}
interface BrokerKeyValueOptionsResolved extends BrokerKeyValueOptions {
ttl: number;
}
interface BrokerAdapterKeyValue extends BrokerAdapterBase {
get(key: string, type: Type): Promise<any>;
set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<any>;
remove(key: string): Promise<any>;
increment(key: string, value: number): Promise<number>;
}
interface Server {
url: string;
weight?: number;
secretKey?: string;
transport?: ClientTransportAdapter;
}