add websocket message parser

This commit is contained in:
c9s 2021-12-23 02:37:11 +08:00
parent 3fb2e12c24
commit cec4b3dd1e
2 changed files with 162 additions and 67 deletions

View File

@ -1,82 +1,118 @@
package kucoinapi package kucoinapi
import "encoding/json" import (
"encoding/json"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type WebSocketMessageType string
const (
WebSocketMessageTypePing WebSocketMessageType = "ping"
WebSocketMessageTypeSubscribe WebSocketMessageType = "subscribe"
WebSocketMessageTypeUnsubscribe WebSocketMessageType = "unsubscribe"
WebSocketMessageTypeAck WebSocketMessageType = "ack"
WebSocketMessageTypePong WebSocketMessageType = "pong"
WebSocketMessageTypeWelcome WebSocketMessageType = "welcome"
WebSocketMessageTypeMessage WebSocketMessageType = "message"
)
type WebSocketSubject string
const (
WebSocketSubjectTradeTicker WebSocketSubject = "trade.ticker"
WebSocketSubjectTradeSnapshot WebSocketSubject = "trade.snapshot" // ticker snapshot
WebSocketSubjectTradeL2Update WebSocketSubject = "trade.l2update" // order book L2
WebSocketSubjectLevel2 WebSocketSubject = "level2" // level2
WebSocketSubjectTradeCandlesUpdate WebSocketSubject = "trade.candles.update"
// private subjects
WebSocketSubjectOrderChange WebSocketSubject = "orderChange"
WebSocketSubjectAccountBalance WebSocketSubject = "account.balance"
WebSocketSubjectStopOrder WebSocketSubject = "stopOrder"
)
type WebSocketCommand struct { type WebSocketCommand struct {
Id int64 `json:"id"` Id int64 `json:"id"`
Type string `json:"type"` Type string `json:"type"`
Topic string `json:"topic"` Topic string `json:"topic"`
PrivateChannel bool `json:"privateChannel"` PrivateChannel bool `json:"privateChannel"`
Response bool `json:"response"` Response bool `json:"response"`
} }
func (c *WebSocketCommand) JSON() ([]byte, error) { func (c *WebSocketCommand) JSON() ([]byte, error) {
type tt WebSocketCommand type tt WebSocketCommand
var a = (*tt)(c) var a = (*tt)(c)
return json.Marshal(a) return json.Marshal(a)
} }
type WebSocketResponse struct { type WebSocketResponse struct {
Type string `json:"type"` Type WebSocketMessageType `json:"type"`
Topic string `json:"topic"` Topic string `json:"topic"`
Subject string `json:"subject"` Subject WebSocketSubject `json:"subject"`
Data json.RawMessage `json:"data"`
// Object is used for storing the parsed Data
Object interface{} `json:"-"`
} }
type WebSocketTicker struct { type WebSocketTicker struct {
Sequence string `json:"sequence"` Sequence string `json:"sequence"`
Price string `json:"price"` Price fixedpoint.Value `json:"price"`
Size string `json:"size"` Size fixedpoint.Value `json:"size"`
BestAsk string `json:"bestAsk"` BestAsk fixedpoint.Value `json:"bestAsk"`
BestAskSize string `json:"bestAskSize"` BestAskSize fixedpoint.Value `json:"bestAskSize"`
BestBid string `json:"bestBid"` BestBid fixedpoint.Value `json:"bestBid"`
BestBidSize string `json:"bestBidSize"` BestBidSize fixedpoint.Value `json:"bestBidSize"`
} }
type WebSocketOrderBook struct { type WebSocketOrderBookL2 struct {
SequenceStart int64 `json:"sequenceStart"` SequenceStart int64 `json:"sequenceStart"`
SequenceEnd int64 `json:"sequenceEnd"` SequenceEnd int64 `json:"sequenceEnd"`
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
Changes struct { Changes struct {
Asks [][]string `json:"asks"` Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"` Bids [][]string `json:"bids"`
} `json:"changes"` } `json:"changes"`
} }
type WebSocketKLine struct { type WebSocketKLine struct {
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
Candles []string `json:"candles"` Candles []string `json:"candles"`
Time int64 `json:"time"` Time int64 `json:"time"`
} }
type WebSocketPrivateOrder struct { type WebSocketPrivateOrder struct {
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
OrderType string `json:"orderType"` OrderType string `json:"orderType"`
Side string `json:"side"` Side string `json:"side"`
OrderId string `json:"orderId"` OrderId string `json:"orderId"`
Type string `json:"type"` Type string `json:"type"`
OrderTime int64 `json:"orderTime"` OrderTime types.MillisecondTimestamp `json:"orderTime"`
Size string `json:"size"` Price fixedpoint.Value `json:"price"`
FilledSize string `json:"filledSize"` Size fixedpoint.Value `json:"size"`
Price string `json:"price"` FilledSize fixedpoint.Value `json:"filledSize"`
ClientOid string `json:"clientOid"` RemainSize fixedpoint.Value `json:"remainSize"`
RemainSize string `json:"remainSize"` ClientOid string `json:"clientOid"`
Status string `json:"status"` Status string `json:"status"`
Ts int64 `json:"ts"` Ts types.MillisecondTimestamp `json:"ts"`
} }
type WebSocketAccountBalance struct { type WebSocketAccountBalance struct {
Total string `json:"total"` Total fixedpoint.Value `json:"total"`
Available string `json:"available"` Available fixedpoint.Value `json:"available"`
AvailableChange string `json:"availableChange"` AvailableChange fixedpoint.Value `json:"availableChange"`
Currency string `json:"currency"` Currency string `json:"currency"`
Hold string `json:"hold"` Hold fixedpoint.Value `json:"hold"`
HoldChange string `json:"holdChange"` HoldChange fixedpoint.Value `json:"holdChange"`
RelationEvent string `json:"relationEvent"` RelationEvent string `json:"relationEvent"`
RelationEventId string `json:"relationEventId"` RelationEventId string `json:"relationEventId"`
RelationContext struct { RelationContext struct {
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
TradeId string `json:"tradeId"` TradeId string `json:"tradeId"`
OrderId string `json:"orderId"` OrderId string `json:"orderId"`
} `json:"relationContext"` } `json:"relationContext"`
Time string `json:"time"` Time string `json:"time"`
} }

View File

@ -2,6 +2,7 @@ package kucoin
import ( import (
"context" "context"
"encoding/json"
"net" "net"
"sync" "sync"
"time" "time"
@ -235,18 +236,23 @@ func (s *Stream) read(ctx context.Context) {
continue continue
} }
e, err := parseWebsocketPayload(string(message)) e, err := parseWebsocketPayload(message)
if err != nil { if err != nil {
log.WithError(err).Error("message parse error") log.WithError(err).Error("message parse error")
continue
} }
if e != nil { log.Infof("event: %+v", e)
switch et := e.(type) {
if e != nil && e.Object != nil {
switch et := e.Object.(type) {
case *kucoinapi.WebSocketTicker:
case *kucoinapi.WebSocketOrderBookL2:
case *kucoinapi.WebSocketKLine:
case *kucoinapi.WebSocketAccountBalance:
case *kucoinapi.WebSocketPrivateOrder:
/*
case *AccountEvent:
s.EmitOrderDetails(et)
*/
default: default:
log.Warnf("unhandled event: %+v", et) log.Warnf("unhandled event: %+v", et)
@ -300,6 +306,59 @@ func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) {
} }
} }
func parseWebsocketPayload(in string) (interface{}, error) { func parseWebsocketPayload(in []byte) (*kucoinapi.WebSocketResponse, error) {
return nil, nil var resp kucoinapi.WebSocketResponse
var err = json.Unmarshal(in, &resp)
if err != nil {
return nil, err
}
switch resp.Type {
case kucoinapi.WebSocketMessageTypeAck:
return &resp, nil
case kucoinapi.WebSocketMessageTypeMessage:
switch resp.Subject {
case kucoinapi.WebSocketSubjectOrderChange:
var o kucoinapi.WebSocketPrivateOrder
if err := json.Unmarshal(resp.Data, &o) ; err != nil {
return &resp, err
}
resp.Object = &o
case kucoinapi.WebSocketSubjectAccountBalance:
var o kucoinapi.WebSocketAccountBalance
if err := json.Unmarshal(resp.Data, &o) ; err != nil {
return &resp, err
}
resp.Object = &o
case kucoinapi.WebSocketSubjectTradeCandlesUpdate:
var o kucoinapi.WebSocketKLine
if err := json.Unmarshal(resp.Data, &o) ; err != nil {
return &resp, err
}
resp.Object = &o
case kucoinapi.WebSocketSubjectTradeL2Update:
var o kucoinapi.WebSocketOrderBookL2
if err := json.Unmarshal(resp.Data, &o) ; err != nil {
return &resp, err
}
resp.Object = &o
case kucoinapi.WebSocketSubjectTradeTicker:
var o kucoinapi.WebSocketTicker
if err := json.Unmarshal(resp.Data, &o) ; err != nil {
return &resp, err
}
resp.Object = &o
default:
// return nil, fmt.Errorf("kucoin: unsupported subject: %s", resp.Subject)
}
}
return &resp, nil
} }