Newer
Older
/* eslint-disable @typescript-eslint/no-explicit-any */
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
import { map, share } from 'rxjs/operators';
import { WebSocketMessage } from 'rxjs/internal/observable/dom/WebSocketSubject';
interface WsMessage {
event: string;
data: any;
}
export class WebsocketService {
private wsSubject$: WebSocketSubject<any>;
if (!this.wsSubject$) {
this.wsSubject$ = webSocket({
deserializer(event: MessageEvent): any {
return JSON.parse(event.data);
},
serializer(value: any): WebSocketMessage {
return JSON.stringify(value);
},
openObserver: {
next: () => {
this.wsConnected$.next(true);
url: this.wsUrl
this.wsSubscription = this.wsSubject$.subscribe(
() => {},
() => {
this.closeConnection();
},
() => {
this.closeConnection();
}
protected closeConnection(): void {
this.wsConnected$.next(false);
if (this.wsSubscription) {
this.wsSubscription.unsubscribe();
}
if (this.wsSubject$) {
this.wsSubject$.complete();
this.wsSubject$ = null;
}
if (!this.wsSubject$) {
this.connect();
if (!this.wsSubject$) {
return this.wsSubject$
.multiplex(
() => ({ event: `subscribe:${channelName}` }),
() => ({ event: `unsubscribe:${channelName}` }),
message => (message.event === channelName)
)
.pipe(map((event: WsMessage): T => event.data))
.pipe(share());