Go database changes notifications over websocket. Generic solution. Reactive apps.

Do you know the problem when you’re writing single page apps and you have to manually update your data from the backend because you know you have just made a change and want the current state to update in “real time”? So you either load the data again by hand/initialite a refresh of the data, or you reload the page or you navigate to a redirect component to land back at your page so it’s loaded anew?

Well there are databases who can notify you of changes done. There’s RethinkDB, there’s Postgresql’s NOTIFY and LISTEN, there’s MongoDB’s change stream that never worked for me and I could never receive any help because IRC was always dead when I joined.

Anyhow, instead of relying on database to notify us that changes were made why not just have the changes in our handler or repository?

I wrote

https://git.icod.de/dalu/notifier

a library that deals with the problem.

func main() {
r := gin.Default()
m := melody.New()
n := notifier.NewNotifier(m)
h := NewHandler(n)

r.GET("/", h.Index)
r.GET("/ws", func(cx *gin.Context) {
m.HandleRequest(cx.Writer, cx.Request)
})
r.GET("/acme/ins", h.InsertDB)

go n.Listen(func(notification *notifier.Notification) {
b, e := json.Marshal(notification)
if e != nil {
log.Println(e.Error())
return
}
m.Broadcast(b)
})
r.Run(":5000")
}

So you need an instance of melody, created with `melody.New()`
You pass this instance to create a new Notifier instance and you pass this notifier instance to the handler if you use that.

GET / serves the index.html page that’s essentially a copy/paste of melody’s chat app and does connect and receive notifications via WebSocket. I mean that’s what it does, it returns the HTML that does it.

GET /ws is the WebSocket that is used to connect with

Now

go n.Listen()

with the paramter

func(notification *notifier.Notification) {}

That parameter is the callback function that is called when a notification is sent.
In this case the notification is marshalled into []byte and then broadcast by melody to all connected WebSockets.

Later in your handler, in this example we only have an INSERT handler without database code.

func (h *Handler) InsertDB(cx *gin.Context) {
// Do database insert stuff
h.notifier.Notify("acme", "INSERT", []byte(`{"id": 1, "name": acme}`))
cx.JSON(201, map[string]interface{}{"status": "ok"})
}

“Do database insert stuff” could be for instance

ms := h.ms.Copy()
defer ms.Close()

acmeC := ms.DB("database").C("acme")
model = new(Acme)
cx.BindJSON(model)
acmeC.Insert(model)

and then you continue with notifying the connected clients via WebSocket by writing

modelJSON, _ := json.Marshal(model)
h.notifier.Notify("acme", "INSERT", modelJSON)
// and finally
cx.JSON(201, nil)

The very moment the notification is emitted it is sent(broadcast) to clients via WebSocket.
In your client-side webapp you receive that notification and filter by collection and do stuff like reloading entities or lists of entities.

By the way, if you don’t want to pass any data in the Notify function you can just write

h.notifier.Notify("acme", "INSERT", nil)

Because you see the Notification type has the `,omitempty` JSON tag.

The overhead of this whole ordeal is between 2-3 µs per notification.

You can also write

go h.notifier.Notify("acme", "INSERT", nil)

if you’d like it to be done at the end, when your response finished.

Note the go in front of the Notify function

Now in case you were wondering how to use a notification by WebSocket, here’s an example for Angular 6.

First the base web-socket-service-ts

import {Injectable} from '@angular/core';
import {Observable, Observer, Subject} from 'rxjs';

@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private subject: Subject;

constructor() {
}

public connect(url: string): Subject {
if (!this.subject) {
this.subject = this.create(url);
}
return this.subject;
}

private create(url: string): Subject {
const ws = new WebSocket(url);
const observable = Observable.create(
(obs: Observer) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
}
);
const observer = {
next: (data: Object) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
}
};
return Subject.create(observer, observable);
}
}

Building upon the WebSocketService is the NoficiationService

import {Injectable} from '@angular/core';
import {WebSocketService} from './web-socket.service';
import {environment} from '../../environments/environment';
import {map} from 'rxjs/operators';
import {Subject} from 'rxjs';

@Injectable({
providedIn: 'root'
})
export class NotificationService {

public messages: Subject;

constructor(ws: WebSocketService) {
this.messages = >ws.connect(environment.wsRoot).pipe(
map((response: MessageEvent): Notification => {
const data = JSON.parse(response.data);
return {
collection: data.collection,
action: data.action,
data: data.data
};
})
);
}
}

export interface Notification {
collection: string;
action: string;
data?: any;
}

And how you use this in your component, you do as you did before, you have an update data function where you grab the data from your endpoints, e.g.

private update(): void {
this.route.paramMap.subscribe(value => {
const id = value.get('id');
this.id = id;
this.communityService.find(id).subscribe(data => {
this.community = data;
this.containerService.list({community_id: data.id, preload_depth: '2'}).subscribe(containerdata => {
this.containers = containerdata;
});
});
});
}

Let’s keep it simple for now, not go crazy with switchMap etc.

In NgOnInit then you initially call this update function but also subscribe to the notification service.

ngOnInit() {
this.update();

this.notifyService.messages.subscribe(data => {
switch (data.collection) {
case 'containers':
case 'forums': {
this.update();
}
}
});
}

The above means, if the updated collection is either containers or forums, grab fresh data from the endpoints.

Don’t forget to unsubscribe in NgOnDestroy() in any case.

ngOnDestroy(): void {
this.notifyService.messages.unsubscribe();
}

So an example work-in-progress component.ts would look something like this

import {ActivatedRoute} from '@angular/router';
import {Community} from '../../models/community';
import {Container} from '../../models/container';
import {NotificationService} from '../../services/notification.service';

@Component({
selector: 'icod-community-home',
templateUrl: './community-home.component.html',
styleUrls: ['./community-home.component.scss']
})
export class CommunityHomeComponent implements OnInit, OnDestroy {

private _id: string;
private _community: Community = {};
private _containers: Container[] = [];

constructor(
private route: ActivatedRoute,
private notifyService: NotificationService,
private communityService: CommunityService,
private containerService: ContainerService,
) {
}

ngOnInit() {
this.update();

this.notifyService.messages.subscribe(data => {
switch (data.collection) {
case 'containers':
case 'forums': {
this.update();
}
}
});
}

ngOnDestroy(): void {
this.notifyService.messages.unsubscribe();
}

private update(): void {
this.route.paramMap.subscribe(value => {
const id = value.get('id');
this.id = id;
this.communityService.find(id).subscribe(data => {
this.community = data;
this.containerService.list({community_id: data.id, preload_depth: '2'}).subscribe(containerdata => {
this.containers = containerdata;
});
});
});
}

get community() {
return this._community;
}

set community(m: Community) {
this._community = m;
}

get containers() {
return this._containers;
}

set containers(m: Container[]) {
this._containers = m;
}

get id(): string {
return this._id;
}

set id(value: string) {
this._id = value;
}
}

Schreib einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.