/* * Copyright 2020 Oleg Borodin <borodin@unix7.org> * */ package mdcore import ( "context" "encoding/json" "net/http" "net/url" "sync" "app/pgquery" "app/pmlog" "github.com/gorilla/websocket" ) const ( gPayloadMessageKey string = "message" gwsConnectionInit string = "connection_init" gwsConnectionError string = "conn_err" gwsStart string = "start" gwsStop string = "stop" gwsError string = "error" gwsData string = "data" gwsComplete string = "complete" gwsConnectionKeepAlive string = "ka" gwsConnectionAck string = "connection_ack" gwsConnectionTerminate string = "connection_terminate" gwsUnknown string = "unknown" gwsInternal string = "internal" ) type GMessage struct { Id string `json:"id,omitempty"` Type string `json:"type"` Payload interface{} `json:"payload,omitempty"` } func NewGMessage(mtype string, payload interface{}) *GMessage { if payload == nil { payload = pgquery.NewGQuery() } return &GMessage{ Type: mtype, Payload: payload, } } func (core *GMessage) GetJson() string { result, _ := json.Marshal(core) return string(result) } func (core *GMessage) GetMessage() string { var message string if core.Payload != nil { payload := core.Payload.(map[string]interface{}) j, _ := json.Marshal(payload[gPayloadMessageKey]) message = string(j) } return message } type GwsDataHandlerFunc = func(string) error // // GetWsURL() // func (core *Mdcore) GetWsURL() (*url.URL, error) { var err error var url *url.URL schema := "ws://" hostname := core.gqURL.Hostname() port := core.gqURL.Port() path := core.gqURL.EscapedPath() wsRef := schema + hostname + ":" + port + path url, err = url.Parse(wsRef) if err != nil { return url, err } return url, err } type SubscribeLoopFunc = func() // // Subscribe // func (core *Mdcore) Subscribe(externalParentCtx context.Context, wg *sync.WaitGroup, gQuery *pgquery.GQuery, dataHandler GwsDataHandlerFunc) (SubscribeLoopFunc, context.CancelFunc, error) { var err error var loopFunc SubscribeLoopFunc externalCtx, extCancel := context.WithCancel(externalParentCtx) subCtx, subCancel := context.WithCancel(core.pixcoreCtx) wsCtx, wsCancel := context.WithCancel(subCtx) wsRef, err := core.GetWsURL() if err != nil { return loopFunc, extCancel, err } headers := make(http.Header) headers.Add("Sec-Websocket-Protocol", "graphql-ws") headers.Add("Authorization", "Bearer " + core.GetJWTToken()) // Start Level1 wsConn, _, err := websocket.DefaultDialer.DialContext(wsCtx, wsRef.String(), headers) if err != nil { return loopFunc, extCancel, err } closeHandler := func(code int, text string) error { var err error pmlog.LogInfo("web socket closed on level1 handler, code:", code, "text:", text) // Send terminate message on Level2, wo control response gMessage := NewGMessage(gwsConnectionTerminate, nil) err = wsConn.WriteJSON(gMessage) wsCancel() return err } wsConn.SetCloseHandler(closeHandler) // Start Level2 gMessage := NewGMessage(gwsConnectionInit, nil) err = wsConn.WriteJSON(gMessage) if err != nil { return loopFunc, extCancel, err } loopFunc = func() { defer wg.Done() for { // Check context select { case <- subCtx.Done(): pmlog.LogInfo("subsription canceled") // Send terminate message on Level2, wo control response gMessage := NewGMessage(gwsConnectionTerminate, nil) err = wsConn.WriteJSON(gMessage) if err != nil { pmlog.LogInfo("gws write connection terminate error:", err) } wsCancel() return case <- externalCtx.Done(): pmlog.LogInfo("subscription canceled from consumer") subCancel() continue default: } var gMessage GMessage err = wsConn.ReadJSON(&gMessage) if err != nil { pmlog.LogInfo("gws inloop reading error:", err) subCancel() continue } switch gMessage.Type { case gwsConnectionAck: gMessage := NewGMessage(gwsStart, gQuery) err = wsConn.WriteJSON(gMessage) if err != nil { pmlog.LogError("gws start writing error:", err) } case gwsConnectionKeepAlive: continue case gwsData: data, _ := json.MarshalIndent(gMessage.Payload, " ", " ") go dataHandler(string(data)) // Async handling case gwsConnectionError: pmlog.LogError("gws got connection error message: " + gMessage.GetMessage()) gMessage := NewGMessage(gwsConnectionTerminate, nil) err := wsConn.WriteJSON(gMessage) if err != nil { pmlog.LogError("gws connection terminate writing error:", err) } err = wsConn.Close() pmlog.LogInfo("gws close websocket") if err != nil { pmlog.LogError("gws closing websocket error:", err) } wsCancel() pmlog.LogInfo("gws canceled websocket and exit from loop") return case gwsError: pmlog.LogError("gws got generic error message: " + gMessage.GetMessage()) gMessage := NewGMessage(gwsConnectionTerminate, nil) err := wsConn.WriteJSON(gMessage) if err != nil { pmlog.LogError("gws unable write message:", err) } err = wsConn.Close() pmlog.LogInfo("gws close websocket") if err != nil { pmlog.LogError("gws closing websocket error:", err) } wsCancel() pmlog.LogInfo("gws canceled websocket and exit from loop") return default: pmlog.LogInfo("gws got unknown type of message:", gMessage.GetJson()) continue } } } return loopFunc, extCancel, err }