File

src/app/shared/websocket.service.ts

Index

Properties
Methods

Properties

wsConnected$
Default value : new BehaviorSubject<boolean>(null)
Private wsSubject$
Type : WebSocketSubject<any>
Private wsSubscription
Type : Subscription
Protected wsUrl
Type : string
Default value : ''

Methods

Protected closeConnection
closeConnection()
Returns : void
connect
connect()
Returns : void
getChannel
getChannel(channelName: string)
Type parameters :
  • T
Parameters :
Name Type Optional
channelName string No
Returns : Observable<T>
send
send(event: string, data: any)
Parameters :
Name Type Optional
event string No
data any No
Returns : void
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
import { map, share } from 'rxjs/operators';
import { WebSocketMessage } from 'rxjs/internal/observable/dom/WebSocketSubject';

interface WsMessage {
  event: string;
  data: any;
}

export class WebsocketService {
  protected wsUrl = '';
  private wsSubject$: WebSocketSubject<any>;
  wsConnected$ = new BehaviorSubject<boolean>(null);
  private wsSubscription: Subscription;

  connect(): void {
    if (!this.wsSubject$) {
      this.wsSubject$ = webSocket({
        deserializer(event: MessageEvent): any {
          return JSON.parse(event.data);
        },
        serializer(value: any): WebSocketMessage {
          return JSON.stringify(value);
        },
        openObserver: {
          next: () => {
            this.wsConnected$.next(true);
          }
        },
        url: this.wsUrl
      });

      this.wsSubscription = this.wsSubject$.subscribe(
        () => {},
        () => {
          this.closeConnection();
        },
        () => {
          this.closeConnection();
        }
      );
    }
  }

  protected closeConnection(): void {
    this.wsConnected$.next(false);
    if (this.wsSubscription) {
      this.wsSubscription.unsubscribe();
    }
    if (this.wsSubject$) {
      this.wsSubject$.complete();
      this.wsSubject$ = null;
    }
  }

  send(event: string, data: any): void {
    if (!this.wsSubject$) {
      this.connect();
    }

    this.wsSubject$.next({ event, data });
  }

  getChannel<T>(channelName: string): Observable<T> {
    if (!this.wsSubject$) {
      this.connect();
    }

    return this.wsSubject$
      .multiplex(
        () => ({ event: `subscribe:${channelName}` }),
        () => ({ event: `unsubscribe:${channelName}` }),
        message => (message.event === channelName)
      )
      .pipe(map((event: WsMessage): T => event.data))
      .pipe(share());
  }
}

result-matching ""

    No results matching ""