File

src/app/shared/websocket-backend.service.ts

Extends

WebsocketService

Implements

OnDestroy

Index

Properties
Methods

Constructor

constructor(serverUrl: string, http: HttpClient)
Parameters :
Name Type Optional
serverUrl string No
http HttpClient No

Properties

Protected connectionClosed
Default value : true
connectionStatus$
Type : BehaviorSubject<ConnectionStatus>
Default value : new BehaviorSubject<ConnectionStatus>('initial')
data$
Type : BehaviorSubject<T>
Protected Abstract initialData
Type : T
Protected Abstract pollingEndpoint
Type : string
Protected Abstract pollingInterval
Type : number
Private pollingTimeoutId
Type : number
Default value : null
Protected Abstract wsChannelName
Type : string
Private wsConnectionStatusSubscription
Type : Subscription
Default value : null
Private wsDataSubscription
Type : Subscription
Default value : null
wsConnected$
Default value : new BehaviorSubject<boolean>(null)
Inherited from WebsocketService
Private wsSubject$
Type : WebSocketSubject<any>
Inherited from WebsocketService
Private wsSubscription
Type : Subscription
Inherited from WebsocketService
Protected wsUrl
Type : string
Default value : ''
Inherited from WebsocketService

Methods

cutConnection
cutConnection()
Returns : void
ngOnDestroy
ngOnDestroy()
Returns : void
Protected observeEndpointAndChannel
observeEndpointAndChannel()
Returns : Observable<T>
Private pollNext
pollNext()
Returns : void
Private scheduleNextPoll
scheduleNextPoll()
Returns : void
Private subScribeToWsChannel
subScribeToWsChannel()
Returns : void
Private unsubscribeFromWebsocket
unsubscribeFromWebsocket()
Returns : void
Protected closeConnection
closeConnection()
Inherited from WebsocketService
Returns : void
connect
connect()
Inherited from WebsocketService
Returns : void
getChannel
getChannel(channelName: string)
Inherited from WebsocketService
Type parameters :
  • T
Parameters :
Name Type Optional
channelName string No
Returns : Observable<T>
send
send(event: string, data: any)
Inherited from WebsocketService
Parameters :
Name Type Optional
event string No
data any No
Returns : void
import { Inject, OnDestroy } from '@angular/core';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
import {
  catchError, map, skipWhile, tap
} from 'rxjs/operators';
import { HttpClient, HttpResponse } from '@angular/common/http';
import { ApiError } from '../app.interfaces';
import { WebsocketService } from './websocket.service';

export type ConnectionStatus = 'initial' | 'ws-offline' | 'ws-online' | 'polling-sleep' | 'polling-fetch' | 'error';

export abstract class WebsocketBackendService<T> extends WebsocketService implements OnDestroy {
  protected abstract pollingEndpoint: string;
  protected abstract pollingInterval: number;
  protected abstract wsChannelName: string;
  protected abstract initialData: T;

  data$: BehaviorSubject<T>;
  connectionStatus$: BehaviorSubject<ConnectionStatus> = new BehaviorSubject<ConnectionStatus>('initial');

  private wsConnectionStatusSubscription: Subscription = null;
  private wsDataSubscription: Subscription = null;
  private pollingTimeoutId: number = null;

  protected connectionClosed = true;

  constructor(
    @Inject('SERVER_URL') protected serverUrl: string,
    protected http: HttpClient
  ) {
    super();
  }

  ngOnDestroy(): void {
    this.cutConnection();
  }

  protected observeEndpointAndChannel(): Observable<T> {
    if (!this.data$) {
      this.data$ = new BehaviorSubject<T>(this.initialData);
      this.pollNext();
    }
    return this.data$;
  }

  private pollNext(): void {
    this.connectionClosed = false;

    this.unsubscribeFromWebsocket();

    this.connectionStatus$.next('polling-fetch');

    this.http
      .get<T>(this.serverUrl + this.pollingEndpoint, { observe: 'response' })
      .pipe(
        // TODO interceptor should have interfered and moved to error-page
        // https://github.com/iqb-berlin/testcenter-frontend/issues/53
        catchError((err: ApiError) => {
          this.connectionStatus$.next('error');
          return new Observable<T>();
        })
      )
      .subscribe((response: HttpResponse<T>) => {
        this.data$.next(response.body);
        if (response.headers.has('SubscribeURI')) {
          this.wsUrl = response.headers.get('SubscribeURI');
          this.subScribeToWsChannel();
        } else {
          this.connectionStatus$.next('polling-sleep');
          this.scheduleNextPoll();
        }
      });
  }

  cutConnection(): void {
    this.unsubscribeFromWebsocket();
    this.closeConnection();

    if (this.pollingTimeoutId) {
      clearTimeout(this.pollingTimeoutId);
      this.pollingTimeoutId = null;
    }

    this.data$ = null;
  }

  private scheduleNextPoll(): void {
    if (this.pollingTimeoutId) {
      clearTimeout(this.pollingTimeoutId);
    }

    this.pollingTimeoutId = window.setTimeout(
      () => {
        if (!this.connectionClosed) { this.pollNext(); }
      },
      this.pollingInterval
    );
  }

  private unsubscribeFromWebsocket() {
    if (this.wsConnectionStatusSubscription) {
      this.wsConnectionStatusSubscription.unsubscribe();
    }

    if (this.wsDataSubscription) {
      this.wsDataSubscription.unsubscribe();
    }
  }

  private subScribeToWsChannel() {
    this.wsDataSubscription = this.getChannel<T>(this.wsChannelName)
      .subscribe((dataObject: T) => this.data$.next(dataObject)); // subscribe only next, not complete!

    this.wsConnectionStatusSubscription = this.wsConnected$
      .pipe(
        skipWhile((item: boolean) => item === null), // skip pre-init-state
        tap((wsConnected: boolean) => {
          if (!wsConnected) {
            this.scheduleNextPoll();
          }
        }),
        map((wsConnected: boolean): ConnectionStatus => (wsConnected ? 'ws-online' : 'ws-offline'))
      )
      .subscribe(this.connectionStatus$);
  }
}

result-matching ""

    No results matching ""