github
DocsBlog
fontcolor_theme
package

API @deepkit/rpc

npm install @deepkit/rpc

Classes

RpcControllerState [source]
export class RpcControllerState {
    peerId?: string;
    constructor(public controller: string);
    getState(method: string): ControllerStateActionState;
}
RpcActionClient [source]
export class RpcActionClient {
    entityState;
    constructor(protected client: WritableClient);
    action<T>(controller: RpcControllerState, method: string, args: any[], options: {
        timeout?: number;
        dontWaitForConnection?: true;
        typeReuseDisabled?: boolean;
    } = {});
    async loadActionTypes(controller: RpcControllerState, method: string, options: {
        timeout?: number;
        dontWaitForConnection?: true;
        typeReuseDisabled?: boolean;
    } = {}): Promise<ControllerStateActionTypes>;
}
DirectClient [source]
export class DirectClient extends RpcClient {
    constructor(rpcKernel: RpcKernel, injector?: InjectorContext);
}
RpcDirectClientAdapter [source]
export class RpcDirectClientAdapter implements ClientTransportAdapter {
    constructor(public rpcKernel: RpcKernel, protected injector?: InjectorContext);
    async connect(connection: TransportClientConnection);
}
AsyncDirectClient [source]
export class AsyncDirectClient extends RpcClient {
    constructor(rpcKernel: RpcKernel, injector?: InjectorContext);
}

This direct client includes in each outgoing/incoming message an async hop making the communication asynchronous.

RpcAsyncDirectClientAdapter [source]
export class RpcAsyncDirectClientAdapter implements ClientTransportAdapter {
    constructor(public rpcKernel: RpcKernel, protected injector?: InjectorContext);
    async connect(connection: TransportClientConnection);
}
RpcWebSocketClient [source]
export class RpcWebSocketClient extends RpcClient {
    constructor(url: string);
    static fromCurrentHost<T extends ClassType<RpcClient>>(this: T, baseUrl: string = ): InstanceType<T>;
}

A RpcClient that connects via WebSocket transport.

DeepkitClient [source]
export class DeepkitClient extends RpcWebSocketClient {
}
RpcWebSocketClientAdapter [source]
export class RpcWebSocketClientAdapter implements ClientTransportAdapter {
    constructor(public url: string, protected webSocketConstructor: typeof WebSocket = WebSocket);
    async getWebSocketConstructor(): Promise<typeof WebSocket>;
    async connect(connection: TransportClientConnection);
}
RpcClientToken [source]
export class RpcClientToken {
    constructor(protected token: any);
    get();
    set(v: any);
    has();
}
RpcClientTransporter [source]
export class RpcClientTransporter {
    connectionId: number;
    writerOptions: TransportOptions;
    id?: Uint8Array;
    /**
     * When the connection is established (including handshake and authentication).
     */
    readonly connection;
    /**
     * When the connection was reconnected. This is not called for the very first connection.
     */
    readonly reconnected;
    /**
     * When the connection was disconnected (due to error or close).
     * This increases the connectionId by one.
     */
    readonly disconnected;
    /**
     * Triggered for any onError call from the transporter.
     * Right after this event, onDisconnect is called (and thus connection.next(false) and disconnected.next()).
     */
    readonly errored;
    reader;
    constructor(public transport: ClientTransportAdapter, protected stats: RpcStats);
    bufferedAmount(): number;
    clientAddress(): string;
    /**
     * True when fully connected (after successful handshake and authentication)
     */
    isConnected(): boolean;
    onClose(error?: Error);
    /**
     * Optional handshake.
     * When peer messages are allowed, this needs to request the client id and returns id.
     */
    async onHandshake(): Promise<Uint8Array | undefined>;
    async onAuthenticate(token?: any): Promise<void>;
    onMessage(message: RpcMessage);
    async disconnect();
    /**
     * Simply connect with login using the token, without auto re-connect.
     */
    async connect(token?: any): Promise<void>;
    send(message: RpcMessageDefinition, progress?: SingleProgress);
}
RpcClientPeer [source]
export class RpcClientPeer {
    constructor(protected actionClient: RpcActionClient, protected peerId: string, protected onDisconnect: (peerId: string) => void);
    controller<T>(nameOrDefinition: string | ControllerDefinition<T>, options: {
        timeout?: number;
        dontWaitForConnection?: true;
    } = {}): RemoteController<T>;
    disconnect();
}
RpcBaseClient [source]
export class RpcBaseClient implements WritableClient {
    clientStats: RpcStats;
    actionClient;
    readonly token;
    readonly transporter: RpcClientTransporter;
    username?: string;
    typeReuseDisabled: boolean;
    events;
    constructor(protected transport: ClientTransportAdapter);
    onClose(error?: Error);
    /**
     * Per default entity types with a name (@entity.name()) will be reused. If a entity with a given name
     * was not loaded and error is thrown. This to ensure nominal typing (object instanceof T).
     * Use this method to disable this behavior and construct new nominal types if an entity is not loaded.
     */
    disableTypeReuse(): this;
    getId(): Uint8Array;
    debug();
    sendMessage<T>(type: number, body?: T, schema?: ReceiveType<T>, options: {
        dontWaitForConnection?: boolean;
        connectionId?: number;
        peerId?: string;
        timeout?: number;
    } = {}): RpcMessageSubject;
    async connect(): Promise<this>;
    async disconnect();
}
RpcClient [source]
export class RpcClient extends RpcBaseClient {
    /**
     * For server->client (us) communication.
     * This is automatically created when registerController is called.
     * Set this property earlier to work with a custom RpcKernel.
     */
    clientKernel?: RpcKernel;
    /**
     * For peer->client(us) communication.
     * This is automatically created when registerAsPeer is called.
     * Set this property earlier to work with a custom RpcKernel.
     */
    peerKernel?: RpcKernel;
    async ping(): Promise<void>;
    getId(): Uint8Array;
    /**
     * Registers a new controller for the peer's RPC kernel.
     * Use `registerAsPeer` first.
     */
    registerPeerController<T>(classType: ClassType<T>, nameOrDefinition: string | ControllerDefinition<T>);
    /**
     * Registers a new controller for the server's RPC kernel.
     * This is when the server wants to communicate actively with the client (us).
     */
    registerController<T>(classType: ClassType<T>, nameOrDefinition: string | ControllerDefinition<T>);
    async registerAsPeer(id: string);
    /**
     * Creates a new peer connection, or re-uses an existing non-disconnected one.
     *
     * Make sure to call disconnect() on it once you're done using it, otherwise the peer
     * will leak memory. (connection will be dropped if idle for too long automatically tough)
     */
    peer(peerId: string): RpcClientPeer;
    controller<T>(nameOrDefinition: string | ControllerDefinition<T>, options: {
        timeout?: number;
        dontWaitForConnection?: true;
        typeReuseDisabled?: boolean;
    } = {}): RemoteController<T>;
}
RpcHttpFetch [source]
export class RpcHttpFetch implements RpcHttpInterface {
    async fetch(url: string, options: {
        headers: {
            [name: string]: string;
        };
        method: string;
        body: any;
    }): Promise<RpcHttpResponseInterface>;
}
RpcHttpClientAdapter [source]
export class RpcHttpClientAdapter implements ClientTransportAdapter {
    constructor(public url: string, public headers: {
        [name: string]: string;
    } = {}, public http: RpcHttpInterface = new RpcHttpFetch());
    supportsPeers();
    supportsAuthentication();
    async connect(connection: TransportClientConnection): Promise<void>;
}
RpcMessageSubject [source]
export class RpcMessageSubject {
    /**
     * Releases this subject. It is necessary that eventually every created subject is released,
     * otherwise dramatic performance decrease and memory leak will happen.
     */
    constructor(private continuation: <T>(type: number, body?: T, schema?: ReceiveType<T>) => void, public release: () => void);
    /**
     * Called when the underlying transport disconnected.
     * This force-cleans everything up.
     */
    disconnect(error?: Error);
    next(next: RpcMessage);
    /**
     * Registers a callback that is called to handle unexpected rejections,
     * like disconnects, transport errors, or timeouts.
     */
    onRejected(callback: (error: any) => void): this;
    onReply(callback: (next: RpcMessage, subject: RpcMessageSubject) => void): this;
    /**
     * Sends a message to the server in the context of this created subject.
     * If the connection meanwhile has been reconnected, and completed MessageSubject.
     */
    send<T>(type: number, body?: T, schema?: ReceiveType<T>): this;
    /**
     * Waits for the Ack message from the server, then closes the subject.
     */
    async ackThenClose(): Promise<undefined>;
    /**
     * Wait for next message to arrive.
     */
    async waitNextMessage<T>(): Promise<RpcMessage>;
    /**
     * Wait for next message with body parse.
     */
    async waitNext<T>(type: number, schema?: ReceiveType<T>): Promise<T>;
    /**
     * Waits for the first message of a specific type, then closes the subject.
     */
    async firstThenClose<T = RpcMessage>(type: number, schema?: ReceiveType<T>): Promise<T>;
}
EntitySubjectStore [source]
export class EntitySubjectStore<T extends IdVersionInterface> {
    store;
    onCreation;
    constructor(protected classType: ClassType);
    isRegistered(id: IdType): boolean;
    register(item: T): void;
    deregister(id: IdType): void;
    getItem(id: IdType): T | undefined;
    onDelete(id: IdType): void;
    onSet(id: IdType, item: T): void;
    onPatch(id: IdType, version: number, patch: EntityPatch): void;
    /**
     * Before calling createFork you must be sure the item is already registered.
     */
    createFork(id: IdType): EntitySubject<T>;
    getForkCount(id: IdType): number;
    getEntitySubjectCount(): number;
}
EntityState [source]
export class EntityState {
    getStore<T extends IdVersionInterface>(classType: ClassType<T>): EntitySubjectStore<T>;
    getStoreByName<T extends IdVersionInterface>(name: string): EntitySubjectStore<T>;
    createEntitySubject(classType: ClassType, bodySchema: TypeObjectLiteral, message: RpcMessage);
    /**
     * Handles the RpcType.Entity, which is a composite per default.
     */
    handle(entityMessage: RpcMessage);
}
RpcServerAction [source]
export class RpcServerAction {
    constructor(protected stats: RpcStats, protected cache: RpcCache, protected connection: RpcKernelBaseConnection, protected controllers: Map<string, {
        controller: ClassType;
        module?: InjectorModule;
    }>, protected injector: InjectorContext, protected eventDispatcher: EventDispatcher, protected security: RpcKernelSecurity, protected sessionState: SessionState, protected logger: LoggerInterface);
    async handleActionTypes(message: RpcMessage, response: RpcMessageBuilder);
    async onClose();
    async handle(message: RpcMessage, response: RpcMessageBuilder);
    async handleAction(message: RpcMessage, response: RpcMessageBuilder);
}
RpcCompositeMessage [source]
export class RpcCompositeMessage {
    strictSerialization: boolean;
    logValidationErrors: boolean;
    errorLabel: string;
    constructor(protected stats: RpcTransportStats, protected logger: Logger, public type: number, protected id: number, protected writer: TransportMessageWriter, protected transportOptions: TransportOptions, protected clientId?: Uint8Array, protected source?: Uint8Array, protected routeType: RpcMessageRouteType.client | RpcMessageRouteType.server = RpcMessageRouteType.client);
    add<T>(type: number, body?: T, receiveType?: ReceiveType<T>): this;
    write(message: RpcMessageDefinition): void;
    send();
}
RpcMessageBuilder [source]
export class RpcMessageBuilder {
    routeType: RpcMessageRouteType.client | RpcMessageRouteType.server;
    strictSerialization: boolean;
    logValidationErrors: boolean;
    errorLabel: string;
    constructor(protected stats: RpcTransportStats, protected logger: Logger, protected writer: TransportMessageWriter, protected transportOptions: TransportOptions, protected id: number, protected clientId?: Uint8Array, protected source?: Uint8Array);
    write(message: RpcMessageDefinition): void;
    ack(): void;
    error(error: Error | string): void;
    reply<T>(type: number, body?: T, receiveType?: ReceiveType<T>): void;
    /**
     * @deprecated
     */
    replyBinary<T>(type: number, body?: Uint8Array): void;
    composite(type: number): RpcCompositeMessage;
}
RpcPeerExchange [source]
export class RpcPeerExchange {
    async isRegistered(id: string): Promise<boolean>;
    async deregister(id: string | Uint8Array): Promise<void>;
    register(id: string | Uint8Array, writer: TransportConnection): void;
    redirect(message: RpcMessage);
}

This is a reference implementation and only works in a single process. A real-life implementation would use an external message-bus, like Redis & co.

RpcKernelBaseConnection [source]
export abstract class RpcKernelBaseConnection {
    sessionState;
    writer: TransportMessageWriter;
    /**
     * Statistics about the server->client communication.
     */
    clientStats: RpcStats;
    transportOptions: TransportOptions;
    readonly onClose: Promise<void>;
    closed: boolean;
    constructor(public stats: RpcStats, protected logger: Logger, public transportConnection: TransportConnection, protected connections: RpcKernelConnections, protected injector: InjectorContext, protected eventDispatcher: EventDispatcher);
    write(message: RpcMessageDefinition): void;
    /**
     * Serializes the message (binary) and sends it to the client using
     * a chunk writer (splitting the message into smaller parts if necessary,
     * so they can be tracked).
     */
    sendBinary(message: RpcMessageDefinition, writer: RpcBinaryWriter): void;
    clientAddress(): string | undefined;
    createMessageBuilder(): RpcMessageBuilder;
    /**
     * Creates a regular timer using setTimeout() and automatically cancel it once the connection breaks or server stops.
     */
    setTimeout(cb: () => void, timeout: number): any;
    close(reason: string | Error = ): void;
    feed(buffer: Uint8Array, bytes?: number): void;
    handleMessage(message: RpcMessage): void;
    onRequest(basePath: string, request: RpcHttpRequest, response: RpcHttpResponse): void | Promise<void>;
    abstract onMessage(message: RpcMessage, response: RpcMessageBuilder): void | Promise<void>;
    controller<T>(nameOrDefinition: string | ControllerDefinition<T>, timeoutInSeconds = ): RemoteController<T>;
    sendMessage<T>(type: number, body?: T, receiveType?: ReceiveType<T>): RpcMessageSubject;
}
RpcKernelConnections [source]
export class RpcKernelConnections {
    connections: RpcKernelBaseConnection[];
    stats: RpcTransportStats;
    broadcast(buffer: RpcMessageDefinition);
}
RpcCache [source]
export class RpcCache {
    actionsTypes: {
        [id: string]: ActionTypes;
    };
    actions: {
        [id: string]: RpcCacheAction;
    };
}
RpcKernelConnection [source]
export class RpcKernelConnection extends RpcKernelBaseConnection {
    myPeerId?: string;
    routeType: RpcMessageRouteType.client | RpcMessageRouteType.server;
    constructor(stats: RpcStats, logger: Logger, transport: TransportConnection, connections: RpcKernelConnections, injector: InjectorContext, eventDispatcher: EventDispatcher, protected cache: RpcCache, protected controllers: Map<string, {
        controller: ClassType;
        module?: InjectorModule;
    }>, protected security = new RpcKernelSecurity(), protected peerExchange: RpcPeerExchange);
    close(): void;
    async onRequest(basePath: string, request: RpcHttpRequest, response: RpcHttpResponse);
    async onMessage(message: RpcMessage): Promise<void>;
}
RpcKernel [source]
export class RpcKernel {
    readonly controllers;
    stats;
    injector: InjectorContext;
    constructor(injector?: InjectorContext | NormalizedProvider[], protected logger: Logger = new Logger());
    /**
     * Register a new event listener for given token.
     *
     * order: The lower the order, the sooner the listener is called. Default is 0.
     */
    listen<T extends EventToken<any>>(eventToken: T, callback: EventListenerCallback<T>, order: number = ): EventDispatcherUnsubscribe;
    getEventDispatcher(): EventDispatcher;
    onConnection(callback: OnConnectionCallback);
    /**
     * This registers the controller and when no custom InjectorContext was given adds it as provider to the injector.
     *
     * Note: Controllers can not be added to the injector when the injector was already built.
     */
    registerController(controller: ClassType, id?: string | ControllerDefinition<any>, module?: InjectorModule);
    createConnection(transport: TransportConnection, injector?: InjectorContext): RpcKernelBaseConnection;
}

The kernel is responsible for parsing the message header, redirecting to peer if necessary, loading the body parser, and encode/send outgoing messages.

Session [source]
export class Session {
    constructor(public readonly username: string, public readonly token: any);
    isAnonymous(): boolean;
}
RpcKernelSecurity [source]
export class RpcKernelSecurity {
    async hasControllerAccess(session: Session, controllerAccess: RpcControllerAccess, connection: RpcKernelBaseConnection): Promise<boolean>;
    async isAllowedToRegisterAsPeer(session: Session, peerId: string): Promise<boolean>;
    async isAllowedToSendToPeer(session: Session, peerId: string): Promise<boolean>;
    async authenticate(token: any, connection: RpcKernelBaseConnection): Promise<Session>;
    transformError(err: Error);
}
SessionState [source]
export class SessionState {
    setSession(session: Session);
    getSession(): Session;
}
CollectionQueryModel [source]
export class CollectionQueryModel<T> implements CollectionQueryModelInterface<T> {
    filter?: FilterQuery<T>;
    skip?: number;
    itemsPerPage: number;
    limit?: number;
    parameters: {
        [name: string]: any;
    };
    sort?: Sort<T>;
    readonly change;
    set(model: CollectionQueryModelInterface<any>);
    changed(): void;
    hasSort(): boolean;
    /**
     * Whether limit/skip is activated.
     */
    hasPaging(): boolean;
}

internal note: This is aligned with

CollectionState [source]
export class CollectionState {
    /**
     * Total count in the database for the current query, regardless of paging (skip/limit) count.
     *
     * Use count() to get the items count on the current page (which is equal to all().length)
     */
    total: number;
}
Collection [source]
export class Collection<T extends IdInterface> extends ReplaySubject<T[]> {
    readonly event: Subject<CollectionEvent<T>>;
    readonly removed;
    readonly added;
    state: CollectionState;
    readonly deepChange;
    model: CollectionQueryModel<T>;
    readonly entitySubjects;
    constructor(public readonly classType: ClassType<T>);
    getTotal();
    getItemsPerPage();
    getPages();
    getSort();
    getParameter(name: string);
    setParameter(name: string, value: any): this;
    orderByField(name: keyof T & string, order: SORT_ORDER = );
    setPage(page: number);
    getPage();
    apply();
    getEntitySubject(idOrItem: string | number | T): EntitySubject<T> | undefined;
    has(id: string | number);
    get(id: string | number): T | undefined;
    setState(state: CollectionState);
    setSort(ids: (string | number)[]);
    /**
     * Resolves when next change happened.
     */
    get nextStateChange(): Promise<void>;
    unsubscribe();
    addTeardown(teardown: TeardownLogic);
    index(item: T): number;
    /**
     * Returns the page zero-based of the current item.
     */
    getPageOf(item: T, itemsPerPage = ): number;
    reset();
    all(): T[];
    /**
     * Count of current page if paging is used, otherwise total count.
     */
    count();
    ids(): (string | number)[];
    empty();
    /**
     * All items from id -> value map.
     */
    map();
    loaded();
    set(items: T[], withEvent = true);
    removeMany(ids: (string | number)[], withEvent = true);
    update(items: T | T[], withEvent = true);
    add(items: T | T[], withEvent = true);
    remove(ids: (string | number) | (string | number)[], withEvent = true);
}
RpcController [source]
export class RpcController {
    name: string;
    classType?: ClassType;
    definition?: ControllerDefinition<any>;
    strictSerialization: boolean;
    logValidationErrors: boolean;
    actions;
    getPath(): string;
}
RpcAction [source]
export class RpcAction {
    name: string;
    classType: ClassType;
    category: string;
    description: string;
    strictSerialization?: boolean;
    logValidationErrors?: boolean;
    groups: string[];
    data: {
        [name: string]: any;
    };
}
RpcTransportStats [source]
export class RpcTransportStats {
    readonly incomingBytes: number;
    readonly outgoingBytes: number;
    /**
     * Amount of incoming and outgoing messages.
     */
    readonly incoming: number;
    readonly outgoing: number;
    increase(name: NumericKeys<RpcTransportStats>, count: number);
}
ActionStats [source]
export class ActionStats {
    readonly observables: number;
    readonly subjects: number;
    readonly behaviorSubjects: number;
    readonly progressTrackers: number;
    readonly subscriptions: number;
    increase(name: NumericKeys<ActionStats>, count: number);
}
ForwardedActionStats [source]
export class ForwardedActionStats extends ActionStats {
    constructor(protected forward: ActionStats);
    increase(name: NumericKeys<ActionStats>, count: number);
}
RpcStats [source]
export class RpcStats extends RpcTransportStats {
    readonly connections: number;
    readonly totalConnections: number;
    /**
     * How many actions have been executed.
     */
    readonly actions: number;
    readonly active: ActionStats;
    readonly total: ActionStats;
    increase(name: NumericKeys<RpcStats>, count: number);
}
ForwardedRpcStats [source]
export class ForwardedRpcStats extends RpcStats {
    readonly active;
    readonly total;
    constructor(protected forward: RpcStats);
    increase(name: NumericKeys<RpcStats>, count: number);
}
StreamBehaviorSubject [source]
export class StreamBehaviorSubject<T> extends BehaviorSubject<T> {
    readonly appendSubject;
    constructor(item: T, teardown?: TeardownLogic);
    isUnsubscribed(): boolean;
    get nextStateChange();
    addTearDown(teardown: TeardownLogic);
    /**
     * This method differs to BehaviorSubject in the way that this does not throw an error
     * when the subject is closed/unsubscribed.
     */
    getValue(): T;
    next(value: T): void;
    activateNextOnAppend();
    toUTF8();
    append(value: T): void;
    unsubscribe(): void;
}
EntitySubject [source]
export class EntitySubject<T extends IdInterface> extends StreamBehaviorSubject<T> {
    /**
     * Patches are in class format.
     */
    readonly patches;
    readonly delete;
    [IsEntitySubject];
    deleted: boolean;
    get id(): string | number;
    get onDeletion(): Observable<void>;
    next(value: T | undefined): void;
}
ControllerDefinition [source]
export class ControllerDefinition<T> {
    constructor(public path: string, public entities: ClassType[] = []);
}
JSONError [source]
@entity.name()
export class JSONError {
    constructor(public readonly json: any);
}
RpcMessage [source]
export class RpcMessage {
    constructor(public id: number, public composite: boolean, public type: number, public routeType: RpcMessageRouteType, public bodyOffset: number = , public bodySize: number = , public buffer?: Uint8Array);
    debug();
    getBuffer(): Uint8Array;
    getPeerId(): string;
    getSource(): Uint8Array;
    getDestination(): Uint8Array;
    getError(): Error;
    isError(): boolean;
    parseGenericBody(): object;
    parseBody<T>(type?: ReceiveType<T>): T;
    decodeBody<T>(decoder: BodyDecoder<T>): T;
    getBodies(): RpcMessage[];
}
ErroredRpcMessage [source]
export class ErroredRpcMessage extends RpcMessage {
    constructor(public id: number, public error: Error);
    getError(): Error;
}
RpcBinaryMessageReader [source]
export class RpcBinaryMessageReader {
    constructor(protected readonly onMessage: (response: RpcMessage) => void, protected readonly onChunk?: (id: number, abort: boolean) => void);
    cleanUp();
    onChunkAck(id: number, callback: (active: boolean) => void);
    removeChunkAck(id: number, aborted = false);
    registerProgress(id: number, progress: SingleProgress);
    feed(buffer: Uint8Array, bytes?: number);
}
SingleProgress [source]
export class SingleProgress extends Subject<SingleProgress> {
    done;
    total;
    current;
    stats;
    abortController: AbortController;
    finished;
    constructor();
    /**
     * Aborts the current progress (either upload or download).
     */
    abort();
    get aborted();
    /**
     * Acts like a BehaviorSubject.
     */
    _subscribe(subscriber: Subscriber<SingleProgress>): Subscription;
    setStart(total: number);
    setBatch(size: number);
    get progress(): number;
    set(total: number, current: number);
}
Progress [source]
export class Progress extends BehaviorSubject<number> {
    readonly upload;
    readonly download;
    /**
     * Aborts both upload and download progress.
     */
    abort();
    constructor();
}
ClientProgress [source]
export class ClientProgress {
    static nextProgress?: Progress;
    /**
     * Returns the current stack and sets a new one.
     */
    static getNext(): Progress | undefined;
    /**
     * Sets up a new Progress object for the next API request to be made.
     * Only the very next API call will be tracked.
     *
     * @example
     * ```typescript
     *
     * const progress = ClientProgress.track();
     * await api.myMethod();
     *
     * ```
     */
    static track(): Progress;
}
TransportOptions [source]
export class TransportOptions {
    /**
     * Stores big buffers to the file system and stream it from there.
     * In bytes.
     * note: not implemented yet
     */
    cacheOnFileSystemWhenSizeIsAtLeast: number;
    /**
     * When back-pressure is bigger than this value, we wait with sending new data.
     * In bytes.
     * note: not implemented yet
     */
    stepBackWhenBackPressureBiggerThan: number;
    /**
     * Chunk size.
     * In bytes.
     */
    chunkSize: number;
}
TransportBinaryMessageChunkWriter [source]
export class TransportBinaryMessageChunkWriter {
    constructor(protected reader: RpcBinaryMessageReader, protected options: TransportOptions);
    /**
     * Writes a message buffer to the connection and chunks if necessary.
     */
    write(writer: RpcBinaryWriter, message: Uint8Array, progress?: SingleProgress): void;
    async writeFull(writer: RpcBinaryWriter, buffer: Uint8Array, progress?: SingleProgress): Promise<void>;
}

This class acts as a layer between kernel/client and a connection writer. It automatically chunks long messages into multiple smaller one using the RpcType.Chunks type.

todo: It keeps track of the back-pressure and sends only when the pressure is not too big. It automatically saves big buffer to the file system and streams data from there to not block valuable memory.

Events

onRpcConnection [source]
EventTokenSync<DataEvent<RpcConnectionEvent>>
onRpcConnectionClose [source]
EventTokenSync<DataEvent<RpcConnectionCloseEvent>>
onRpcAction [source]
EventTokenSync<DataEvent<RpcActionEvent>>
onRpcAuth [source]
EventToken<DataEvent<RpcAuthEvent>>
onRpcControllerAccess [source]
EventToken<DataEvent<RpcControllerAccessEvent>>

Errors

OfflineError [source]
export class OfflineError extends CustomError {
}
UnexpectedMessageType [source]
export class UnexpectedMessageType extends CustomError {
}
RpcError [source]
export class RpcError extends CustomError {
}
AuthenticationError [source]
export class AuthenticationError extends CustomError {
}

Const

RpcHttpHeaderNames [source]
string[]

Functions

webSocketFromBaseUrl [source]
(baseUrl: string, portMapping?: { [name: number]: number; }): string

Returns the WebSocket URL for the given base URL and allows port mapping. Default port-mapping maps Angular server :4200 to :8080

provideRpcWebSocketClient [source]
(baseUrl?: string, portMapping?: { [name: number]: number; }): { provide: typeof RpcWebSocketClient; useFactory: () => RpcWebSocketClient; }

Creates a provider for RpcWebSocketClient that is compatible with Angular and Deepkit. baseUrl needs to be absolute.

createRpcHttpClientProvider [source]
(baseUrl?: string, headers?: { [name: string]: string; }, http?: RpcHttpInterface): { provide: typeof RpcClient; useFactory: () => RpcClient; }
getActions [source]
<T>(target: ClassType<T>): Map<string, RpcAction>
rpcClass [source]
ClassDecoratorResult<typeof RpcClass>
rpcProperty [source]
PropertyDecoratorResult<typeof RpcProperty>
rpc [source]
FluidDecorator<RpcClass, ClassDecoratorFn> & ClassDecoratorFn & { _fetch: (classType: AbstractClassType, property?: string, parameterIndexOrDescriptor?: any) => RpcController; } & FluidDecorator<...> & PropertyDecoratorFn & { ...; }
isEntitySubject [source]
(v: any): v is EntitySubject<any>
ControllerSymbol [source]
<T>(path: string, entities?: ClassType[]): ControllerDefinition<T>
createBodyDecoder [source]
<T>(type?: ReceiveType<T>): BodyDecoder<T>
readBinaryRpcMessage [source]
(buffer: Uint8Array): RpcMessage
createRpcCompositeMessage [source]
<T>(id: number, type: number, messages: RpcCreateMessageDef<any>[], routeType?: RpcMessageRouteType.client | RpcMessageRouteType.server): RpcMessageDefinition
serializeBinaryRpcCompositeMessage [source]
(message: RpcMessageDefinition): Uint8Array
createRpcCompositeMessageSourceDest [source]
(id: number, source: Uint8Array, destination: Uint8Array, type: number, messages: RpcCreateMessageDef<any>[]): RpcMessageDefinition
serializeBinaryRpcCompositeMessageSourceDest [source]
(message: RpcMessageDefinition): Uint8Array
createRpcMessage [source]
<T>(id: number, type: number, body?: T, routeType?: RpcMessageRouteType.client | RpcMessageRouteType.server, schema?: ReceiveType<T>): RpcMessageDefinition
serializeBinaryRpcMessage [source]
(message: RpcMessageDefinition): Uint8Array
serializeBinaryRpcMessageSingleBody [source]
(message: RpcMessageDefinition): Uint8Array
createRpcMessagePeer [source]
<T>(id: number, type: number, source: Uint8Array, peerId: string, body?: T, schema?: ReceiveType<T>): RpcMessageDefinition
serializeBinaryRpcMessagePeer [source]
(message: RpcMessageDefinition): Uint8Array
createRpcMessageSourceDest [source]
<T>(id: number, type: number, source: Uint8Array, destination: Uint8Array, body?: T, schema?: ReceiveType<T>): RpcMessageDefinition
serializeBinaryRpcMessageSourceDest [source]
(message: RpcMessageDefinition): Uint8Array
createRpcMessageSourceDestForBody [source]
<T>(id: number, type: number, source: Uint8Array, destination: Uint8Array, body: Uint8Array): Uint8Array
readUint32LE [source]
(buffer: Uint8Array, offset?: number): number
rpcEncodeError [source]
(error: Error | string): EncodedError
rpcDecodeError [source]
(error: EncodedError): Error
createErrorMessage [source]
(id: number, error: Error | string, routeType: RpcMessageRouteType.client | RpcMessageRouteType.server): RpcMessageDefinition
createWriter [source]
(transport: TransportConnection, options: TransportOptions, reader: RpcBinaryMessageReader): TransportMessageWriter
createSubject [source]
<T>(producer: (subject: Subject<T>) => void | Promise<void>, teardown?: () => void): Subject<T>

Create a Subject with teardown function that is called when client disconnects or completes the subject.

The producer is called in the next tick, no matter if the client subscribes to the subject or not. This is fundamentally different from Observables, where the producer is only called when the Observable is subscribed to.

Teardown is also called when the producer errors or completes.

You should normally prefer using Observables (with instantObservable) over Subjects, as Subjects are not lazy and start emitting values immediately when created.

class Controller {
instantSubject [source]
<T>(producer: InstanceProducer<Subject<T>>): Subject<T>

Returns a Subject that is immediately subscribed to the given producer.

The producer can be a Promise or a function that returns a Promise.

Note that the Subject will be requested from the Promise right away.

const client = new RpcClient();
const controller = client.controller<MyController>('controller');

// normally you would do this:
const subject = await controller.subscribeChats('asd');

// but with instantSubject you can do this, allowing you to get
// a Subject in a synchronous way.
const subject = instantSubject(controller.subscribeChats('asd'));

Types

RemoteController [source]
type RemoteController<T> = {
    [P in keyof T]: T[P] extends (...args: any[]) => any ? PromisifyFn<T[P]> : never
};
ClientTransportAdapter [source]
interface ClientTransportAdapter {
    connect(connection: TransportClientConnection): Promise<void> | void;

    /**
     * Whether ClientId call is needed to get a client id.
     * This is disabled for http adapter.
     */
    supportsPeers?(): boolean;

    /**
     * Whether Authentication call is needed to authenticate the client.
     * This is disabled for http adapter (Authorization header is used).
     */
    supportsAuthentication?(): boolean;
}
WritableClient [source]
interface WritableClient {
    clientStats: RpcStats;

    sendMessage<T>(
        type: number,
        body?: T,
        receiveType?: ReceiveType<T>,
        options?: {
            dontWaitForConnection?: boolean,
            connectionId?: number,
            peerId?: string,
            timeout?: number
        },
    ): RpcMessageSubject;
}
RpcEventMessage [source]
type RpcEventMessage = { id: number, date: Date, type: number, body: any };
RpcClientEventIncomingMessage [source]
type RpcClientEventIncomingMessage =
    { event: 'incoming', composite: boolean, messages: RpcEventMessage[] }
    & RpcEventMessage;
RpcClientEventOutgoingMessage [source]
type RpcClientEventOutgoingMessage =
    { event: 'outgoing', composite: boolean, messages: RpcEventMessage[] }
    & RpcEventMessage;
RpcClientEvent [source]
type RpcClientEvent = RpcClientEventIncomingMessage | RpcClientEventOutgoingMessage;
RpcHttpResponseInterface [source]
interface RpcHttpResponseInterface {
    status: number;
    headers: { [name: string]: string };
    body?: any;
}
RpcHttpInterface [source]
interface RpcHttpInterface {
    fetch(url: string, options: {
        headers: { [name: string]: string },
        method: string,
        body: any
    }): Promise<RpcHttpResponseInterface>;
}
ActionTypes [source]
type ActionTypes = {
    strictSerialization: boolean;
    actionCallSchema: TypeObjectLiteral, //with args as property
    parametersValidate: Guard<any>,

    parameters: TypeTuple,
    mode: ActionMode;
    type: Type; //the type T of Collection<T>, EntitySubject<T>, Observable<T>, or return type of the function if mode=arbitrary

    resultSchema: TypeObjectLiteral, //with v as property
    observableNextSchema?: TypeObjectLiteral, //with v as property
    collectionSchema?: Type, //with v as array property
    collectionQueryModel?: Type,

    noTypeWarned: boolean;
};
RpcServerActionObservableSubject [source]
interface RpcServerActionObservableSubject {
    subject: Subject<any>,
    trackingType: NumericKeys<ActionStats>,
    completed: boolean,
    completedByClient: boolean,
    subscription: Subscription
}
RpcCacheAction [source]
interface RpcCacheAction {
    controller: RpcControllerAccess;
    fn: Function;
    types: ActionTypes;
    action: RpcAction;
    resolver: Resolver<any>;
    bodyDecoder: BodyDecoder<any>;
    label: string; //controller.action
}
OnConnectionCallback [source]
type OnConnectionCallback = (connection: RpcKernelConnection, injector: InjectorContext, logger: LoggerInterface) => void;
RpcControllerAccess [source]
interface RpcControllerAccess {
    controllerName: string;
    controllerClassType: ClassType;
    actionName: string;
    actionGroups: string[];
    actionData: { [name: string]: any };
}
FilterParameters [source]
type FilterParameters = { [name: string]: any | undefined };
QuerySelector [source]
type QuerySelector<T> = {
    // Comparison
    $eq?: T;
    $gt?: T;
    $gte?: T;
    $in?: T[];
    $lt?: T;
    $lte?: T;
    $ne?: T;
    $nin?: T[];
    // Logical
    $not?: T extends string ? (QuerySelector<T> | RegExp) : QuerySelector<T>;
    $regex?: T extends string ? (RegExp | string) : never;

    //special deepkit/type type
    $parameter?: string;
};
RootQuerySelector [source]
type RootQuerySelector<T> = {
    $and?: Array<FilterQuery<T>>;
    $nor?: Array<FilterQuery<T>>;
    $or?: Array<FilterQuery<T>>;
    // we could not find a proper TypeScript generic to support nested queries e.g. 'user.friends.name'
    // this will mark all unrecognized properties as any (including nested queries)
    [key: string]: any;
};
Condition [source]
type Condition<T> = MongoAltQuery<T> | QuerySelector<MongoAltQuery<T>>;
FilterQuery [source]
type FilterQuery<T> = {
    [P in keyof T & string]?: Condition<T[P]>;
} &
    RootQuerySelector<T>;
SORT_ORDER [source]
type SORT_ORDER = 'asc' | 'desc' | any;
Sort [source]
type Sort<T, ORDER extends SORT_ORDER = SORT_ORDER> = { [P in keyof T & string]?: ORDER };
CollectionEventAdd [source]
interface CollectionEventAdd<T> {
    type: 'add';
    items: T[];
}
CollectionEventState [source]
interface CollectionEventState {
    type: 'state';
    state: CollectionState;
}
CollectionEventRemove [source]
interface CollectionEventRemove {
    type: 'remove';
    ids: (string | number)[];
}
CollectionEventSet [source]
interface CollectionEventSet {
    type: 'set';
    items: any[];
}
CollectionEventUpdate [source]
interface CollectionEventUpdate {
    type: 'update';
    items: any[];
}
CollectionSetSort [source]
interface CollectionSetSort {
    type: 'sort';
    ids: (string | number)[];
}
CollectionEvent [source]
type CollectionEvent<T> = CollectionEventAdd<T> | CollectionEventRemove | CollectionEventSet | CollectionEventState | CollectionEventUpdate | CollectionSetSort;
CollectionSortDirection [source]
type CollectionSortDirection = 'asc' | 'desc';
CollectionSort [source]
interface CollectionSort {
    field: string;
    direction: CollectionSortDirection;
}
CollectionEntitySubjectFetcher [source]
interface CollectionEntitySubjectFetcher {
    fetch<T extends IdInterface>(classType: ClassType<T>, id: string | number): EntitySubject<T>;
}
CollectionQueryModelInterface [source]
interface CollectionQueryModelInterface<T> {
    filter?: FilterQuery<T>;
    skip?: number;
    itemsPerPage: number;
    limit?: number;
    parameters: { [name: string]: any };
    sort?: Sort<T>;
}
IdType [source]
type IdType = string | number;
IdInterface [source]
interface IdInterface {
    id: IdType;
}
IdVersionInterface [source]
interface IdVersionInterface extends IdInterface {
    version: number;
}
NumericKeys [source]
type NumericKeys<T> = {
    [K in keyof T]: T[K] extends number ? K : never;
}[keyof T];
rpcClientId [source]
interface rpcClientId {
    id: Uint8Array;
}
rpcChunk [source]
interface rpcChunk {
    seq: number; //sequence number of this chunk
    total: number; //size in bytes
    v: Uint8Array;
}
rpcActionObservableSubscribeId [source]
interface rpcActionObservableSubscribeId {
    id: number;
}
rpcActionObservableNext [source]
interface rpcActionObservableNext {
    id: number;
    v: any;
}
rpcError [source]
interface rpcError {
    classType: string;
    message: string;
    stack: string;
    properties?: Record<string, any>;
}
rpcResponseActionObservableSubscriptionError [source]
interface rpcResponseActionObservableSubscriptionError extends rpcError {
    id: number;
}
rpcSort [source]
interface rpcSort {
    field: string;
    direction: 'asc' | 'desc';
}
rpcResponseActionObservable [source]
interface rpcResponseActionObservable {
    type: ActionObservableTypes;
}
rpcAuthenticate [source]
interface rpcAuthenticate {
    token: any;
}
rpcResponseAuthenticate [source]
interface rpcResponseAuthenticate {
    username: string;
}
rpcAction [source]
interface rpcAction {
    controller: string;
    method: string;
}
rpcActionType [source]
interface rpcActionType {
    controller: string;
    method: string;
    disableTypeReuse?: boolean;
}
ActionMode [source]
type ActionMode = 'arbitrary' | 'collection' | 'entitySubject' | 'observable';
rpcResponseActionType [source]
interface rpcResponseActionType {
    mode: ActionMode;
    type: any; //Type as SerializedTypes
    parameters: any; //TypeTuple as SerializedTypes
}
rpcPeerRegister [source]
interface rpcPeerRegister {
    id: string;
}
rpcPeerDeregister [source]
interface rpcPeerDeregister {
    id: string;
}
rpcResponseActionCollectionRemove [source]
interface rpcResponseActionCollectionRemove {
    ids: (string | number)[];
}
rpcResponseActionCollectionSort [source]
interface rpcResponseActionCollectionSort {
    ids: (string | number)[];
}
rpcEntityRemove [source]
interface rpcEntityRemove {
    entityName: string;
    ids: (string | number)[];
}
EntityPatch [source]
interface EntityPatch {
    $set?: { [path: string]: any },
    $unset?: { [path: string]: number }
    $inc?: { [path: string]: number }
}
rpcEntityPatch [source]
interface rpcEntityPatch {
    entityName: string;
    id: string | number;
    version: number;
    patch: {
        $set?: Record<string, any>,
        $unset?: Record<string, number>,
        $inc?: Record<string, number>,
    };
}
WrappedV [source]
interface WrappedV {
    v: any;
}
BodyDecoder [source]
interface BodyDecoder<T> {
    type: Type;

    (buffer: Uint8Array, offset: number): T;
}
RpcCreateMessageDef [source]
interface RpcCreateMessageDef<T> {
    type: number;
    schema?: Type;
    body?: T;
}
RpcMessageDefinition [source]
interface RpcMessageDefinition {
    id: number;
    type: number;
    routeType: RpcMessageRouteType;
    composite?: RpcCreateMessageDef<any>[];
    peerId?: string;
    source?: Uint8Array;
    destination?: Uint8Array;
    body?: {
        type: Type;
        body: any;
    };
}
EncodedError [source]
interface EncodedError {
    classType: string;
    message: string;
    stack: string;
    properties?: { [name: string]: any };
}
TransportMessageWriter [source]
interface TransportMessageWriter {
    (message: RpcMessageDefinition, options: TransportOptions, stats: RpcTransportStats, progress?: SingleProgress): void;
}
TransportConnection [source]
interface TransportConnection {
    /**
     * Write is used either by Client->Server, or Server->Client.
     * The method is responsible to serialize the message and send it over the wire.
     */
    write?: TransportMessageWriter;

    /**
     * Same as write, but sends binary directly. This enables chunking automatically.
     */
    writeBinary?(message: Uint8Array): void;

    bufferedAmount?(): number;

    clientAddress?(): string;

    close(): void;
}
TransportClientConnection [source]
interface TransportClientConnection {
    token?: any;

    onConnected(established: TransportConnection): void;

    onClose(reason: string): void;

    onError(error: Error): void;

    /**
     * Called when data is received from the other side.
     * The method is responsible to deserialize the message.
     */
    read(message: RpcMessage): void;

    readBinary(message: Uint8Array, bytes?: number): void;
}
RpcBinaryWriter [source]
type RpcBinaryWriter = (buffer: Uint8Array) => void;
InstanceProducer [source]
type InstanceProducer<T> = T | Promise<T> | (() => T) | (() => Promise<T>);
WithFail [source]
type WithFail<T> = Omit<T, 'phase'> & { phase: 'fail'; error: Error };
RpcConnectionEvent [source]
interface RpcConnectionEvent {
    context: {
        connection: RpcKernelBaseConnection;
        injector: InjectorContext;
    };
}
RpcConnectionCloseEvent [source]
interface RpcConnectionCloseEvent extends RpcConnectionEvent {
    /**
     * There could be multiple reasons why a connection is closed. This could be a normal
     * close, an error, or a timeout.
     * If the connection was closed because of an error, this property contains the error object.
     */
    reason: string | Error;
}
RpcActionTimings [source]
interface RpcActionTimings {
    /**
     * The start of the action (performance.now())
     */
    start: number;

    /**
     * The end of the action (performance.now())
     */
    end: number;

    /**
     * Time it took to check the controller access (since start)
     */
    types?: number;

    /**
     * Time it took to parse the body (since start)
     */
    parseBody?: number;

    /**
     * Time it took to validate the parameters (since start)
     */
    validate?: number;

    /**
     * Time it took to check the controller access (since start)
     */
    controllerAccess?: number;
}

All times are in milliseconds (using performance.now()).

RpcActionEventBase [source]
interface RpcActionEventBase extends RpcConnectionEvent {
    phase: 'start' | 'success';
    controller: RpcControllerAccess;
    timing: RpcActionTimings;
}
RpcActionEvent [source]
type RpcActionEvent = RpcActionEventBase | WithFail<RpcActionEventBase>;
RpcAuthEventStart [source]
interface RpcAuthEventStart extends RpcConnectionEvent {
    token: any;
    phase: 'start';
    /**
     * Set this to the session object if the authentication was successful.
     * If set this will bypass RpcKernelSecurity and directly use this session.
     */
    session?: Session;
}
RpcAuthEventSuccess [source]
interface RpcAuthEventSuccess extends RpcConnectionEvent {
    token: any;
    phase: 'success';
    session: Session;
}
RpcAuthEvent [source]
type RpcAuthEvent = RpcAuthEventStart | RpcAuthEventSuccess | WithFail<RpcAuthEventStart>;
RpcControllerAccessEventStart [source]
interface RpcControllerAccessEventStart extends RpcConnectionEvent {
    phase: 'start';
    session: Session;
    controller: RpcControllerAccess;
    /**
     * Set this to true if you want to grant access to the controller.
     * If set this will bypass RpcKernelSecurity and directly grant access.
     */
    granted?: boolean;
}
RpcControllerAccessEventBase [source]
interface RpcControllerAccessEventBase extends RpcConnectionEvent {
    phase: 'success' | 'denied';
    session: Session;
    controller: RpcControllerAccess;
}
RpcControllerAccessEvent [source]
type RpcControllerAccessEvent = RpcControllerAccessEventStart | RpcControllerAccessEventBase | WithFail<RpcControllerAccessEventBase>;