Angular, EventSource, Go and wasted lifetime

If you have ever used EventSource and Angular and have read all the other blog posts and github issues and Stackoverflow posts and nothing worked, then you’ve probably come here and will happily leave knowing you solved your problem.

First of all I use cobra, I know, get to the point. As part of cobra I add the serve command, so I can `go run main.go serve`

This is the serve.go file

package cmd

import (
"git.icod.de/dalu/eventsrc/server/handler"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/spf13/cobra"
)

// serveCmd represents the serve command
var serveCmd = &cobra.Command{
Use: "serve",
Short: "",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
r := gin.Default()
r.Use(cors.Default())
h := handler.NewHandler()
defer h.Close()
r.GET("/api/v1/events/", h.Stream)
return r.Run(":8080")
},
}

func init() {
rootCmd.AddCommand(serveCmd)
}

I set up the route, cors and run it

The server/handler/handler.go file

package handler

import (
"fmt"
"log"
"time"

"github.com/gin-contrib/sse"
"github.com/gin-gonic/gin"
)

type Handler struct {
t *time.Ticker
}

func NewHandler() *Handler {
h := new(Handler)
h.t = time.NewTicker(time.Second * 1)

return h
}

func (h *Handler) Close() {
h.t.Stop()
}

func (h *Handler) Stream(cx *gin.Context) {
i := 0
w := cx.Writer
clientGone := w.CloseNotify()

for {
select {
case <-clientGone: return case t := <-h.t.C: type M struct { Id int `json:"id"` Model string `json:"model"` Action string `json:"action"` Time time.Time `json:"time"` } m := new(M) m.Model = "profile" m.Action = "update" m.Id = 1 m.Time = t h := w.Header() h.Set("Cache-Control", "no-cache") h.Set("Connection", "keep-alive") h.Set("Content-Type", "text/event-stream") h.Set("X-Accel-Buffering", "no") ev := sse.Event{ Id: fmt.Sprintf("%d", i), Event: "message", Data: m, } if e := sse.Encode(w, ev); e != nil { log.Println(e.Error()) return } w.Flush() i++ } } }

Here is the important part. I wasted the last 6 hours and previous to that 2 days on this issue.
If you're serving this via nginx, you have to set this header `X-Accel-Buffering = no`.
If you don't send this header responses will get buffered by nginx until the timeout it met then flushed to the client.
The above code has a ticker that ticks every second and sends a new "Server Sent Event".

Why it didn't work for me was, as you see above `Event: "message"`. I had that set to "darko".

The Angular service

import {Injectable, NgZone} from '@angular/core';
import {Observable} from 'rxjs';

@Injectable({
providedIn: 'root'
})
export class NotiService {

constructor(private zone: NgZone) {
this.zone = new NgZone({ enableLongStackTrace: false });
}

watch(): Observable {
return Observable.create((observer) => {
const eventSource = new EventSource('/api/v1/events/');

eventSource.onmessage = (event) => this.zone.run(() => {
console.log(event);
observer.next(JSON.parse(event.data));
});
eventSource.addEventListener('darko', (event: any) => this.zone.run(() =>{
console.log('darko event', event);
observer.next(JSON.parse(event.data));
}));

eventSource.onerror = error => this.zone.run(() => {
if (eventSource.readyState === eventSource.CLOSED) {
console.log('The stream has been closed by the server.');
eventSource.close();
observer.complete();
} else {
observer.error(error);
}
});
return () => eventSource.close();
});
}
}

eventSource.onmessage expects a message with the `Event: "message"` content. Since I had it set to "darko",
the onmessage event never fired. If you for whatever reason need to send an event that is not a message type,
the `eventSource.addEventListener` is how you listen for that event.
As you might have seen in other blog posts or github issues, zonejs and EventSource aren't the best of friends.
So you have to wrap it all in `zone.run()` so you can have real time updates, and not just when you unsubscribe from the Observable.

Finally, the component

import {Component, OnDestroy, OnInit} from '@angular/core';
import {NotiService} from '../noti.service';

@Component({
selector: 'icod-home',
templateUrl: './home.component.html',
styleUrls: ['./home.component.scss']
})
export class HomeComponent implements OnInit, OnDestroy {

msgSub;
msgs = [];

constructor(private notiService: NotiService) {
}

ngOnInit() {
}

ngOnDestroy(): void {
if (this.msgSub) {
this.msgSub.unsubscribe();
}
}

watchEvents() {
this.msgSub = this.notiService.watch().subscribe( data => {
console.log(data);
this.msgs.push(data);
});
}

stopWatching() {
this.msgSub.unsubscribe();
}

}

and the component html


{{msg|json}}

Finally, the nginx configuration for the development server. To serve it all.
Here I'm using es.dev.luketic on the local network.

server {
listen 80;
listen [::]:80;
server_name es.dev.luketic;
root /home/darko/WebProjects/es/src;
index index.html;

error_log /var/log/nginx/es.error;

location / {
proxy_pass http://localhost:4200;
proxy_read_timeout 30;
proxy_connect_timeout 30;
proxy_redirect off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}

location /sockjs-node/ {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
rewrite ^/(.*)$ /$1 break;
proxy_set_header Host localhost;
proxy_pass http://localhost:4200/;
}

location ~ ^/api/v1/.* {
proxy_pass http://localhost:8080;
proxy_read_timeout 30;
proxy_connect_timeout 30;
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}

Schreib einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.