src/app/shared/websocket.service.ts
Properties |
|
Methods |
|
wsConnected$ |
Default value : new BehaviorSubject<boolean>(null)
|
Defined in src/app/shared/websocket.service.ts:15
|
Private wsSubject$ |
Type : WebSocketSubject<any>
|
Defined in src/app/shared/websocket.service.ts:14
|
Private wsSubscription |
Type : Subscription
|
Defined in src/app/shared/websocket.service.ts:16
|
Protected wsUrl |
Type : string
|
Default value : ''
|
Defined in src/app/shared/websocket.service.ts:13
|
Protected closeConnection |
closeConnection()
|
Defined in src/app/shared/websocket.service.ts:47
|
Returns :
void
|
connect |
connect()
|
Defined in src/app/shared/websocket.service.ts:18
|
Returns :
void
|
getChannel | ||||||
getChannel(channelName: string)
|
||||||
Defined in src/app/shared/websocket.service.ts:66
|
||||||
Type parameters :
|
||||||
Parameters :
Returns :
Observable<T>
|
send |
send(event: string, data: any)
|
Defined in src/app/shared/websocket.service.ts:58
|
Returns :
void
|
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
import { map, share } from 'rxjs/operators';
import { WebSocketMessage } from 'rxjs/internal/observable/dom/WebSocketSubject';
interface WsMessage {
event: string;
data: any;
}
export class WebsocketService {
protected wsUrl = '';
private wsSubject$: WebSocketSubject<any>;
wsConnected$ = new BehaviorSubject<boolean>(null);
private wsSubscription: Subscription;
connect(): void {
if (!this.wsSubject$) {
this.wsSubject$ = webSocket({
deserializer(event: MessageEvent): any {
return JSON.parse(event.data);
},
serializer(value: any): WebSocketMessage {
return JSON.stringify(value);
},
openObserver: {
next: () => {
this.wsConnected$.next(true);
}
},
url: this.wsUrl
});
this.wsSubscription = this.wsSubject$.subscribe(
() => {},
() => {
this.closeConnection();
},
() => {
this.closeConnection();
}
);
}
}
protected closeConnection(): void {
this.wsConnected$.next(false);
if (this.wsSubscription) {
this.wsSubscription.unsubscribe();
}
if (this.wsSubject$) {
this.wsSubject$.complete();
this.wsSubject$ = null;
}
}
send(event: string, data: any): void {
if (!this.wsSubject$) {
this.connect();
}
this.wsSubject$.next({ event, data });
}
getChannel<T>(channelName: string): Observable<T> {
if (!this.wsSubject$) {
this.connect();
}
return this.wsSubject$
.multiplex(
() => ({ event: `subscribe:${channelName}` }),
() => ({ event: `unsubscribe:${channelName}` }),
message => (message.event === channelName)
)
.pipe(map((event: WsMessage): T => event.data))
.pipe(share());
}
}