Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import {webSocket, WebSocketSubject} from 'rxjs/webSocket';
import {BehaviorSubject, Observable} from 'rxjs';
import {map, share} from 'rxjs/operators';
import {WebSocketMessage} from 'rxjs/internal/observable/dom/WebSocketSubject';
interface WsMessage {
event: string;
data: any;
}
export class WebsocketService {
protected url = 'ws://127.0.0.1:3000';
protected urlParam = "XYZ";
private webSocketSubject$: WebSocketSubject<any>;
public serviceConnected$ = new BehaviorSubject<boolean>(null);
constructor(
) {
}
public connect(urlParam: string, forceReconnect: boolean = false): WebSocketSubject<any> {
this.urlParam = urlParam;
// const url = 'wss://echo.websocket.org';
if (!this.webSocketSubject$ || forceReconnect) {
console.log('connecting...' + urlParam);
this.webSocketSubject$ = webSocket({
deserializer(event: MessageEvent): any {
return JSON.parse(event.data);
},
serializer(value: any): WebSocketMessage {
return JSON.stringify(value);
},
openObserver: {
next: () => {
console.log('connection established');
this.serviceConnected$.next(true);
}
},
url: this.url + '/' + urlParam
});
this.webSocketSubject$.subscribe(
() => {},
() => {
console.log('connection error');
this.serviceConnected$.next(false);
},
() => {
console.log('connection closed');
this.serviceConnected$.next(false);
}
);
}
return this.webSocketSubject$;
}
public send(event: string, data: any) {
if (!this.webSocketSubject$) {
this.connect(this.urlParam);
}
this.webSocketSubject$.next({event, data});
}
public getChannel<T>(channelName: string): Observable<T> {
if (!this.webSocketSubject$) {
this.connect(this.urlParam);
}
return this.webSocketSubject$
.multiplex(
() => ({event: `subscribe:${channelName}`}),
() => ({event: `unsubscribe:${channelName}`}),
message => (message.event === channelName)
)
.pipe(map((event: WsMessage): T => event.data))
.pipe(share());
}
}