Skip to content
Snippets Groups Projects
websocket.service.ts 2.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • import {webSocket, WebSocketSubject} from 'rxjs/webSocket';
    
    import {BehaviorSubject, Observable, Subscription} from 'rxjs';
    
    import {map, share} from 'rxjs/operators';
    import {WebSocketMessage} from 'rxjs/internal/observable/dom/WebSocketSubject';
    
    interface WsMessage {
      event: string;
      data: any;
    }
    
    export class WebsocketService {
    
    paf's avatar
    paf committed
      protected wsUrl = '';
    
      private wsSubject$: WebSocketSubject<any>;
      public wsConnected$ = new BehaviorSubject<boolean>(null);
    
      private wsSubscription: Subscription;
    
      public connect() {
        if (!this.wsSubject$) {
    
          console.log('connecting...' + this.wsUrl);
    
          this.wsSubject$ = webSocket({
    
    
            deserializer(event: MessageEvent): any {
              return JSON.parse(event.data);
            },
    
            serializer(value: any): WebSocketMessage {
              return JSON.stringify(value);
            },
    
            openObserver: {
              next: () => {
                console.log('connection established');
    
                this.wsConnected$.next(true);
    
          this.wsSubscription = this.wsSubject$.subscribe(
    
                console.error('connection error');
    
                this.closeConnection();
    
              },
    
              () => {
                console.log('connection closed');
    
                this.closeConnection();
    
      protected closeConnection(): void {
        this.wsConnected$.next(false);
    
        this.wsSubscription.unsubscribe();
    
    paf's avatar
    paf committed
        this.wsSubject$.complete();
    
        this.wsSubject$ = null;
      }
    
    
      public send(event: string, data: any) {
    
        if (!this.wsSubject$) {
          this.connect();
    
        this.wsSubject$.next({event, data});
    
      }
    
      public getChannel<T>(channelName: string): Observable<T> {
    
        if (!this.wsSubject$) {
            this.connect();
    
            .multiplex(
                () => ({event: `subscribe:${channelName}`}),
                () => ({event: `unsubscribe:${channelName}`}),
                message => (message.event === channelName)
            )
            .pipe(map((event: WsMessage): T => event.data))
            .pipe(share());
      }
    }