API @deepkit/broker
npm install @deepkit/broker
이 패키지는 Deepkit Broker의 추상화와 서버 구현, 그리고 adapter로서의 여러 클라이언트 구현(BrokerDeepkitAdapter, BrokerMemoryAdapter)을 포함합니다.
This is the Broker adapter for Deepkit Broker server. This adapter is only for testing purposes. It uses the in-memory broker kernel (server)
and communicates with it via in-memory RPC transport adapter. Provides a bus channel for the given path for Provides a bus Subject for the given channel path forClasses
export class BrokerConnection extends RpcKernelBaseConnection {
constructor(stats: RpcStats, logger: Logger, transportConnection: TransportConnection, connections: RpcKernelConnections, injector: InjectorContext, eventDispatcher: EventDispatcher, protected state: BrokerState);
close(): void;
async onMessage(message: RpcMessage, response: RpcMessageBuilder): Promise<void>;
}
export class BrokerState {
/**
* Simple key/value store.
*/
keyStore;
/**
* Cache store.
*/
cacheStore;
subscriptions;
entityFields;
queues;
locker;
enableSnapshot;
snapshotInterval;
snapshotPath;
snapshotting;
/**
* All connections in this list are notified about cache invalidations.
*/
invalidationCacheMessageConnections: BrokerConnection[];
getEntityFields(name: string): string[];
publishEntityFields(name: string, fields: string[]): boolean;
unsubscribeEntityFields(name: string, fields: string[]);
lock(id: string, ttl: number, timeout: number = ): Promise<ProcessLock>;
tryLock(id: string, ttl: number = ): Promise<ProcessLock | undefined>;
isLocked(id: string): boolean;
unsubscribe(channel: string, connection: BrokerConnection);
subscribe(channel: string, connection: BrokerConnection);
publish(channel: string, v: Uint8Array);
queueSubscribe(queueName: string, connection: BrokerConnection, maxParallel: number);
queueUnsubscribe(queueName: string, connection: BrokerConnection);
queuePublish(body: BrokerQueuePublish);
/**
* When a queue message has been sent to a consumer and the consumer answers.
*/
queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: {
error?: string;
success: boolean;
delay?: number;
});
setCache(id: string, data: Uint8Array, ttl: number);
getCache(id: string): {
data: Uint8Array;
ttl: number;
} | undefined;
deleteCache(id: string);
increment(id: string, v?: number): number;
setKey(id: string, data: Uint8Array, ttl: number);
getKey(id: string): Uint8Array | undefined;
deleteKey(id: string);
}
export class BrokerKernel extends RpcKernel {
createConnection(transport: TransportConnection): BrokerConnection;
}
export class BrokerQueueMessage<T> {
state: | | ;
error?: Error;
tries: number;
delayed: number;
constructor(public channel: string, public data: T);
failed(error: Error);
delay(seconds: number);
}
export class BrokerQueue {
constructor(public adapter: BrokerAdapterQueue);
channel<T>(name: string, options?: BrokerQueueChannelOptions, type?: ReceiveType<T>): BrokerQueueChannel<T>;
}
export class BrokerQueueChannel<T> {
constructor(public name: string, private adapter: BrokerAdapterQueue, options: BrokerQueueChannelOptions | undefined, private type: Type);
async produce<T>(message: T, options?: BrokerAdapterQueueProduceOptions): Promise<void>;
async consume(callback: (message: BrokerQueueMessage<T>) => Promise<void> | void, options: {
maxParallel?: number;
} = {}): Promise<Release>;
}
export class BrokerBusSubject<T> extends Subject<T> {
constructor(public handle: BrokerBusSubjectHandle, protected onFirst: () => void, protected onLast: () => void, protected onPublish: (value: T) => void);
override next(value: T, publish = true): void;
}
export class BusBrokerErrorHandler {
constructor(protected logger?: Logger);
publishFailed(path: string, message: unknown, type: Type, error: Error);
subscribeFailed(path: string, error: Error);
}
export class BrokerBus {
constructor(public adapter: BrokerAdapterBus, errorHandler?: BusBrokerErrorHandler);
/**
* Creates a broker channel handle for a given path and type.
*
* @param path Unique identifier of the broker channel.
* @param type Optional message type for type-safe handling.
*/
channel<T>(path: string, type?: ReceiveType<T>): BrokerBusChannel<T>;
/**
* Creates a Subject for the given broker channel.
* Subscription to the broker is delayed until the first observer subscribes.
*
* Calling `.next()` publishes a message to the broker and does not forward
* it to its observers immediately. Only messages from the broker are forwarded to observers.
* This is to ensure consistent behaviour with other broker subjects from the same path.
*
* @param path Unique broker path.
* @param type Optional type for strongly typed messages.
*
* @example
* ```typescript
* const subject = bus.subject<string>('updates');
* subject.subscribe(console.log);
* subject.next('hello');
* ```
*/
subject<T>(path: string, type?: ReceiveType<T>): Subject<T>;
/**
* Ensures the provided Subject is actively subscribed to the broker.
* This guarantees that all messages from this point forward will be buffered,
* and replayed to the first observer to avoid data loss.
*
* The subject unsubscribes from the broker automatically when all observers
* are gone. To make it active, you need to subscribe to it first or call `activateSubject`.
*
* @throws Error when the channel could not be subscribed to.
*
* @example
* ```typescript
* const subject = bus.subject<string>('updates');
* await bus.activateSubject(subject);
* subject.subscribe(value => {
* // receives all messages from the time of activation
* });
* ```
*/
async activateSubject<T extends Subject<any>>(subject: T): Promise<T>;
}
export class BrokerBusChannel<T> {
constructor(public name: string, protected adapter: BrokerAdapterBus, public type: Type);
async publish(message: T);
async subscribe(callback: (message: T) => void): Promise<Release>;
}
export class BrokerLock {
constructor(public adapter: BrokerAdapterLock);
item(id: string, options: Partial<BrokerTimeOptions> = {}): BrokerLockItem;
}
export class BrokerLockItem {
constructor(private id: string, private adapter: BrokerAdapterLock, private options: BrokerTimeOptionsResolved);
async [Symbol.asyncDispose]();
/**
* Disposable way of acquiring a lock. Automatically releases the lock when the returned object is disposed.
*
* @example
* ```typescript
* async function doSomething() {
* async using hold = lock.hold();
*
* // do stuff
*
* // when out of scope, lock is automatically released.
* }
* ```
*/
async hold();
/**
* Returns true if the current lock object is the holder of the lock.
*
* This does not check whether the lock is acquired by someone else.
* Use isReserved() if you want to check that.
*/
get acquired(): boolean;
/**
* Acquires the lock. If the lock is already acquired by someone else, this method waits until the lock is released.
*
* @throws BrokerLockError when lock is already acquired by this object.
*/
async acquire(): Promise<this>;
/**
* Checks if the lock is acquired by someone else.
*/
async isReserved(): Promise<boolean>;
/**
* Tries to acquire the lock.
* If the lock is already acquired, nothing happens.
*
* @throws BrokerLockError when lock is already acquired by this object.
*/
async try(): Promise<this | undefined>;
/**
* Releases the lock.
*/
async release(): Promise<void>;
}
export class BrokerCacheStore {
/**
* This is a short-lived cache pool for the current process.
* Values are fetched from the broker when not available and stored here for a short time (configurable).
*/
cache;
constructor(public config: BrokerCacheOptionsResolved);
invalidate(key: string);
set(key: string, value: CacheStoreEntry);
}
export class BrokerCacheItem<T> {
constructor(private key: string, private builder: CacheBuilder<T>, private options: BrokerCacheItemOptionsResolved, private adapter: BrokerAdapterCache, private store: BrokerCacheStore, private type: Type, private logger: LoggerInterface);
async set(value: T);
async invalidate();
async exists(): Promise<boolean>;
async get(): Promise<T>;
}
export class BrokerCache {
constructor(private adapter: BrokerAdapterCache, config: Partial<BrokerCacheOptions> = {}, private logger: LoggerInterface = new ConsoleLogger());
item<T>(key: string, builder: CacheBuilder<T>, options?: Partial<BrokerCacheItemOptions>, type?: ReceiveType<T>): BrokerCacheItem<T>;
}
export class BrokerKeyValueItem<T> {
constructor(private key: string, private type: Type, private adapter: BrokerAdapterKeyValue, private options: BrokerKeyValueOptionsResolved);
/**
* @see BrokerKeyValue.get
*/
async get(): Promise<T>;
/**
* @see BrokerKeyValue.set
*/
async set(value: T): Promise<void>;
/**
* @see BrokerKeyValue.increment
*/
async increment(value: number): Promise<number>;
async remove(): Promise<void>;
}
export class BrokerKeyValue {
constructor(private adapter: BrokerAdapterKeyValue, config: Partial<BrokerKeyValueOptions> = {}, private logger: LoggerInterface = new ConsoleLogger());
/**
* Returns a new BrokerKeyValueItem for the given key.
*/
item<T>(key: string, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): BrokerKeyValueItem<T>;
/**
* Returns the value for the given key.
*/
async get<T>(key: string, type?: ReceiveType<T>): Promise<T>;
/**
* Sets the value for the given key.
*/
async set<T>(key: string, value: T, options?: Partial<BrokerKeyValueOptions>, type?: ReceiveType<T>): Promise<void>;
async remove(key: string): Promise<void>;
/**
* Increments the value for the given key by the given value.
* Note that this is not compatible to get/set, as it only works with numbers.
* Since this an atomic increment, there is no way to get the current value via `get` and then increment it,
* but you have to use `increment(0)` to get the current value.
*/
async increment(key: string, value: number): Promise<number>;
}
export class BrokerDeepkitConnection extends RpcBaseClient {
activeChannels;
consumers;
subscribedToInvalidations?: ((key: string) => void)[];
}
export class BrokerDeepkitAdapterOptions {
servers: Server[];
}
export class BrokerDeepkitPool {
connections: {
connection: BrokerDeepkitConnection;
server: Server;
}[];
constructor(public options: BrokerDeepkitAdapterOptions);
getConnection(key: string): BrokerDeepkitConnection;
async disconnect(): Promise<void>;
}
export class BrokerDeepkitAdapter implements BrokerAdapter {
constructor(public options: BrokerDeepkitAdapterOptions);
async disconnect(): Promise<void>;
onInvalidateCache(callback: (key: string) => void): void;
async invalidateCache(key: string): Promise<void>;
async setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise<void>;
async getCacheMeta(key: string): Promise<{
ttl: number;
} | undefined>;
async getCache(key: string, type: Type): Promise<{
value: any;
ttl: number;
} | undefined>;
async set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<void>;
async get(key: string, type: Type): Promise<any>;
async remove(key: string): Promise<any>;
async increment(key: string, value: any): Promise<number>;
async isLocked(id: string): Promise<boolean>;
async lock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
async tryLock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
async publish(key: string, message: any, type: Type): Promise<void>;
async subscribe(key: string, callback: (message: any) => void, type: Type): Promise<Release>;
async produce<T>(key: string, message: T, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise<void>;
async consume(key: string, callback: (message: BrokerQueueMessage<any>) => Promise<void>, options: {
maxParallel: number;
}, type: Type): Promise<Release>;
}
export class BrokerMemoryAdapter extends BrokerDeepkitAdapter {
constructor();
}
Events
EventToken<BaseEvent>
Errors
export class BrokerLockError extends Error {
}
export class BrokerCacheError extends Error {
}
Functions
<T extends BrokerBusChannel<any>>(path: string, type?: ReceiveType<T>): Provider
<T extends Subject<any>>(path: string, type?: ReceiveType<T>): Provider
(adapter: AnyAdapter): adapter is BrokerAdapterCache
(adapter: AnyAdapter): adapter is BrokerAdapterBus
(adapter: AnyAdapter): adapter is BrokerAdapterLock
(adapter: AnyAdapter): adapter is BrokerAdapterQueue
(adapter: AnyAdapter): adapter is BrokerAdapterKeyValue
(options: Partial<BrokerKeyValueOptions>): BrokerKeyValueOptionsResolved
Types
interface BrokerTimeOptions {
/**
* Time to live in milliseconds. 0 means no ttl.
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: string | number;
/**
* Timeout in milliseconds. 0 means no timeout.
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
timeout: number | string;
}
interface BrokerTimeOptionsResolved {
/**
* Time to live in milliseconds. 0 means no ttl.
*/
ttl: number;
/**
* Timeout in milliseconds. 0 means no timeout.
*/
timeout: number;
}
type BrokerQueueMessageProcessingOptions = {
process: QueueMessageProcessing.atLeastOnce
} | {
process: QueueMessageProcessing.exactlyOnce;
deduplicationInterval?: string,
hash?: string | number;
};
interface BrokerQueueMessageProcessingOptionsResolved {
process: QueueMessageProcessing;
deduplicationInterval: number;
hash?: string | number;
}
type BrokerAdapterQueueProduceOptions = { delay?: string; priority?: number } & BrokerQueueMessageProcessingOptions;
interface BrokerAdapterQueueProduceOptionsResolved extends BrokerQueueMessageProcessingOptionsResolved {
delay?: number;
priority?: number;
}
type Release = () => Promise<void>;
interface BrokerInvalidateCacheMessage {
key: string;
ttl: number;
}
interface BrokerAdapterBase {
disconnect(): Promise<void>;
logger?: Logger;
}
interface BrokerAdapterLock extends BrokerAdapterBase {
/**
* Acquires the lock. If the lock is already acquired by someone else, this method waits until the lock is released.
*/
lock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
/**
* Checks if the lock is acquired by someone else.
*/
isLocked(id: string): Promise<boolean>;
/**
* Tries to acquire the lock.
* If the lock is already acquired, nothing happens.
*/
tryLock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release>;
}
interface BrokerAdapterBus extends BrokerAdapterBase {
/**
* Publish a message on the bus aka pub/sub.
*/
publish(name: string, message: any, type: Type): Promise<void>;
/**
* Subscribe to messages on the bus aka pub/sub.
*/
subscribe(name: string, callback: (message: any) => void, type: Type): Promise<Release>;
}
interface BrokerAdapterQueue extends BrokerAdapterBase {
/**
* Consume messages from a queue.
*/
consume(name: string, callback: (message: any) => Promise<void>, options: { maxParallel: number }, type: Type): Promise<Release>;
/**
* Produce a message to a queue.
*/
produce(name: string, message: any, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise<void>;
}
type BrokerQueueChannelOptions = BrokerQueueMessageProcessingOptions;
type BrokerQueueChannelOptionsResolved = BrokerQueueMessageProcessingOptionsResolved;
type TypeOfSubject<T extends Subject<any>> = T extends Subject<infer U> ? U : never;
type BrokerAdapter = BrokerAdapterCache & BrokerAdapterBus & BrokerAdapterLock & BrokerAdapterQueue & BrokerAdapterKeyValue;
type AnyAdapter = BrokerAdapterCache | BrokerAdapterBus | BrokerAdapterLock | BrokerAdapterQueue | BrokerAdapterKeyValue;
interface BrokerCacheOptions {
/**
* Relative time to live in milliseconds. 0 means no ttl.
*
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: number | string;
/**
* How many ms the cache is allowed to be stale. Set to 0 to disable stale cache.
* Default is 1000ms.
* Improves performance by serving slightly stale cache while the cache is being rebuilt.
*/
maxStale: number | string;
/**
* How many ms the cache is allowed to be stored in-memory. Set to 0 to disable in-memory cache.
*/
inMemoryTtl: number | string;
}
interface BrokerCacheOptionsResolved extends BrokerCacheOptions {
ttl: number;
maxStale: number;
inMemoryTtl: number;
}
interface BrokerCacheItemOptions {
/**
* Relative time to live in milliseconds. 0 means no ttl.
*
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: number | string;
tags: string[];
/**
* How many ms the cache is allowed to be stale. Set to 0 to disable stale cache.
* Default is 1000ms.
* Improves performance by serving slightly stale cache while the cache is being rebuilt.
*/
maxStale: number | string;
}
interface BrokerCacheItemOptionsResolved extends BrokerCacheItemOptions {
ttl: number;
maxStale: number;
}
interface BrokerAdapterCache extends BrokerAdapterBase {
getCache(key: string, type: Type): Promise<{ value: any, ttl: number } | undefined>;
getCacheMeta(key: string): Promise<{ ttl: number } | undefined>;
setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise<void>;
invalidateCache(key: string): Promise<void>;
onInvalidateCache(callback: (key: string) => void): void;
}
type CacheBuilder<T> = () => T | Promise<T>;
interface BrokerKeyValueOptions {
/**
* Relative time to live in milliseconds. 0 means no ttl.
*
* Value is either milliseconds or a string like '2 minutes', '8s', '24hours'.
*/
ttl: number | string;
}
interface BrokerKeyValueOptionsResolved extends BrokerKeyValueOptions {
ttl: number;
}
interface BrokerAdapterKeyValue extends BrokerAdapterBase {
get(key: string, type: Type): Promise<any>;
set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise<any>;
remove(key: string): Promise<any>;
increment(key: string, value: number): Promise<number>;
}
interface Server {
url: string;
weight?: number;
secretKey?: string;
transport?: ClientTransportAdapter;
}