API @deepkit/core-rxjs
npm install @deepkit/core-rxjs
RXJS subscription collection, to easily collect multiple subscriptions and unsubscribe all at once.
Added subscriptions are automatically removed when they get unsubscribed. Allows to create Timer which is deactivated automatically
when the observer is stopped. This class allows to track multiple progress states. Natively supported as return type in Handles incoming messages in batches. The handler is called when the observable completes or when a certain time passed since the last message. This makes sure the handler is awaited before the next batch is processed. Clone a given subject (BehaviourSubject or ProgressTracker or Subject) and decouple it from the source,
so that when the new object is completed or errored, the source is not affected. This is handy if you want to hand out a subject to a consumer, but you don't want the consumer to be
able to complete or error the subject, which is usually requires for RPC controllers. Turns a ProgressTracker into a BehaviorSubjectClasses
export class AsyncSubscription {
constructor(private cb: () => Promise<void>);
async unsubscribe(): Promise<void>;
}
export class Subscriptions {
readonly list: Subscription[];
constructor(protected teardown?: () => void | Promise<void>);
unsubscribe();
}
export class ObserverTimer extends Timer {
constructor(protected observer: Observer<any>);
setTimeout(cb: () => void, timeout: number): any;
}
export class ProgressTrackerGroup {
stopCallbacks: (() => void)[];
constructor(public state: ProgressTrackerState);
changed();
/**
* Registers a callback that is called when the progress is stopped.
*/
onStop(callback: () => void);
stop();
/**
* Number between 0 and 1.
*/
get progress(): number;
/**
* Total number of items to process.
*/
get total(): number;
/**
* True if the progress is finished (done === total).
* Same as progress === 1.
*/
get finished();
/**
* True if the progress is running (finished === false && stopped === false).
*/
get running();
/**
* True if the progress is ended (finished === true || stopped === true).
*/
get ended();
/**
* True if the progress is stopped (stopped === true), but might not be finished.
*/
get stopped();
get message(): string;
get done(): number;
}
export class ProgressTracker extends BehaviorSubject<ProgressTrackerState[]> {
groups: ProgressTrackerGroup[];
changed;
constructor(states: ProgressTrackerState[] = []);
next(states: ProgressTrackerState[]);
stop();
track(message: string = , total: number, current: number = ): ProgressTrackerGroup;
get progress(): number;
/**
* True if the progress is finished (done === total).
* Same as progress === 1.
*/
get finished(): boolean;
/**
* True if the progress is running (finished === false && stopped === false).
*/
get running(): boolean;
/**
* True if the progress is ended (finished === true || stopped === true).
*/
get ended(): boolean;
/**
* True if the progress is stopped (stopped === true), but might not be finished.
*/
get stopped();
get done(): number;
get total(): number;
get current(): ProgressTrackerGroup | undefined;
}
export class ProgressTrackerWatcher<T extends ProgressTracker = ProgressTracker> extends BehaviorSubject<T> {
}
Functions
(observer: Observer<any>): { closed: boolean; }
(v: any): v is Subject<any>
(v: any): v is BehaviorSubject<any>
<T>(subscription: Subscription): Promise<void>
<T>(o: Observable<T>): Promise<T>
<T>(o: Observable<T>, next?: (data: T) => void): Promise<T>
<T>(o: () => Promise<T>): Observable<T>
(teardown: TeardownLogic): Promise<void>
<T, R>(observable: Observable<T>, handler: (messages: T[]) => Promise<R>, options?: Partial<{ maxWait: number; batchSize: number; }>): Promise<R[]>
maxWait
in milliseconds, this makes sure every maxWait
ms the handler is called with the current messages if there are any.
batchSize
this is the maximum amount of messages that are passed to the handler.<T extends Observable<any> | undefined>(observable: T): T
<T extends ProgressTracker>(tracker: T): ProgressTrackerWatcher<T>
Types
interface ProgressTrackerState {
total: number;
done: number;
message: string;
speed: number;
stopped: boolean;
}