User Tools

Site Tools


Apollo GrapQL web socket reader & parser

apollows.go
/*
 * Copyright 2020 Oleg Borodin  <borodin@unix7.org>
 *
 */
 
package mdcore
 
import (
    "context"
    "encoding/json"
 
    "net/http"
    "net/url"
    "sync"
 
    "app/pgquery"
    "app/pmlog"
 
    "github.com/gorilla/websocket"
)
 
const (
    gPayloadMessageKey      string = "message"
 
    gwsConnectionInit       string  = "connection_init"
    gwsConnectionError      string  = "conn_err"
    gwsStart                string  = "start"
    gwsStop                 string  = "stop"
    gwsError                string  = "error"
    gwsData                 string  = "data"
    gwsComplete             string  = "complete"
    gwsConnectionKeepAlive  string  = "ka"
    gwsConnectionAck        string  = "connection_ack"
    gwsConnectionTerminate  string  = "connection_terminate"
    gwsUnknown              string  = "unknown"
    gwsInternal             string  = "internal"
)
 
type GMessage struct {
    Id      string         `json:"id,omitempty"`
    Type    string         `json:"type"`
    Payload interface{}    `json:"payload,omitempty"`
}
 
func NewGMessage(mtype string, payload interface{}) *GMessage {
    if payload == nil {
        payload = pgquery.NewGQuery()
    }
    return &GMessage{
        Type:        mtype,
        Payload:     payload,
    }
}
func (core *GMessage) GetJson() string {
    result, _ := json.Marshal(core)
    return string(result)
}
 
func (core *GMessage) GetMessage() string {
    var message string
    if core.Payload != nil {
        payload := core.Payload.(map[string]interface{})
        j, _ := json.Marshal(payload[gPayloadMessageKey])
        message = string(j)
    }
    return message
}
 
type GwsDataHandlerFunc = func(string) error
 
//
// GetWsURL()
//
func (core *Mdcore) GetWsURL() (*url.URL, error) {
    var err error
    var url *url.URL
 
 
    schema      := "ws://"
    hostname    := core.gqURL.Hostname()
    port        := core.gqURL.Port()
    path        := core.gqURL.EscapedPath()
    wsRef       := schema + hostname + ":" + port + path
    url, err = url.Parse(wsRef)
    if err != nil {
        return url, err
    }
    return url, err
}
 
type SubscribeLoopFunc = func()
//
// Subscribe
//
func (core *Mdcore) Subscribe(externalParentCtx context.Context, wg *sync.WaitGroup, gQuery *pgquery.GQuery, dataHandler GwsDataHandlerFunc) (SubscribeLoopFunc, context.CancelFunc, error) {
    var err error
    var loopFunc SubscribeLoopFunc
 
    externalCtx, extCancel := context.WithCancel(externalParentCtx)
    subCtx, subCancel := context.WithCancel(core.pixcoreCtx)
    wsCtx, wsCancel := context.WithCancel(subCtx)
 
    wsRef, err := core.GetWsURL()
    if err != nil {
        return loopFunc, extCancel, err
    }
 
    headers := make(http.Header)
    headers.Add("Sec-Websocket-Protocol", "graphql-ws")
    headers.Add("Authorization", "Bearer " + core.GetJWTToken())
 
    // Start Level1
    wsConn, _, err := websocket.DefaultDialer.DialContext(wsCtx, wsRef.String(), headers)
    if err != nil {
        return loopFunc, extCancel, err
    }
 
    closeHandler := func(code int, text string) error {
        var err error
        pmlog.LogInfo("web socket closed on level1 handler, code:", code, "text:", text)
        // Send terminate message on Level2, wo control response
        gMessage := NewGMessage(gwsConnectionTerminate, nil)
        err = wsConn.WriteJSON(gMessage)
        wsCancel()
        return err
    }
    wsConn.SetCloseHandler(closeHandler)
 
 
    // Start Level2
    gMessage := NewGMessage(gwsConnectionInit, nil)
    err = wsConn.WriteJSON(gMessage)
    if err != nil {
        return loopFunc, extCancel, err
    }
 
    loopFunc = func() {
        defer wg.Done()
        for {
            // Check context
            select {
                case <- subCtx.Done():
                    pmlog.LogInfo("subsription canceled")
                    // Send terminate message on Level2, wo control response
                    gMessage := NewGMessage(gwsConnectionTerminate, nil)
                    err = wsConn.WriteJSON(gMessage)
                    if err != nil {
                        pmlog.LogInfo("gws write connection terminate error:", err)
                    }
                    wsCancel()
                    return
                case <- externalCtx.Done():
                    pmlog.LogInfo("subscription canceled from consumer")
                    subCancel()
                    continue
                default:
            }
 
            var gMessage GMessage
            err = wsConn.ReadJSON(&gMessage)
            if err != nil {
                pmlog.LogInfo("gws inloop reading error:", err)
                subCancel()
                continue
            }
 
            switch gMessage.Type {
                case gwsConnectionAck:
                    gMessage := NewGMessage(gwsStart, gQuery)
                    err = wsConn.WriteJSON(gMessage)
                    if err != nil {
                        pmlog.LogError("gws start writing error:", err)
                    }
                case gwsConnectionKeepAlive:
                    continue
 
                case gwsData:
                    data, _ := json.MarshalIndent(gMessage.Payload, " ", "    ")
                    go dataHandler(string(data))        // Async handling
 
                case gwsConnectionError:
                    pmlog.LogError("gws got connection error message: "  + gMessage.GetMessage())
                    gMessage := NewGMessage(gwsConnectionTerminate, nil)
                    err := wsConn.WriteJSON(gMessage)
                    if err != nil {
                        pmlog.LogError("gws connection terminate writing error:", err)
                    }
                    err = wsConn.Close()
                    pmlog.LogInfo("gws close websocket")
                    if err != nil {
                        pmlog.LogError("gws closing websocket error:", err)
                    }
                    wsCancel()
                    pmlog.LogInfo("gws canceled websocket and exit from loop")
                    return
 
                case gwsError:
                    pmlog.LogError("gws got generic error message: " + gMessage.GetMessage())
                    gMessage := NewGMessage(gwsConnectionTerminate, nil)
                    err := wsConn.WriteJSON(gMessage)
                    if err != nil {
                        pmlog.LogError("gws unable write message:", err)
                    }
                    err = wsConn.Close()
                    pmlog.LogInfo("gws close websocket")
                    if err != nil {
                        pmlog.LogError("gws closing websocket error:", err)
                    }
                    wsCancel()
                    pmlog.LogInfo("gws canceled websocket and exit from loop")
                    return
                default:
                    pmlog.LogInfo("gws got unknown type of message:", gMessage.GetJson())
                    continue
            }
        }
    }
    return loopFunc, extCancel, err
}