From 0708cee962f6a8510dfc41f76e8c40f50e4205bc Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 1 Oct 2020 16:48:08 +0800 Subject: [PATCH] implement private message parsing --- bbgo/exchange/max/maxapi/auth.go | 31 +++ bbgo/exchange/max/maxapi/public_websocket.go | 64 +++-- .../publicwebsocketservice_callbacks.go | 26 +- bbgo/exchange/max/maxapi/userdata.go | 236 ++++++++++++++++++ 4 files changed, 321 insertions(+), 36 deletions(-) create mode 100644 bbgo/exchange/max/maxapi/auth.go create mode 100644 bbgo/exchange/max/maxapi/userdata.go diff --git a/bbgo/exchange/max/maxapi/auth.go b/bbgo/exchange/max/maxapi/auth.go new file mode 100644 index 000000000..492cf3b3e --- /dev/null +++ b/bbgo/exchange/max/maxapi/auth.go @@ -0,0 +1,31 @@ +package max + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" +) + +type AuthMessage struct { + Action string `json:"action"` + APIKey string `json:"apiKey"` + Nonce int64 `json:"nonce"` + Signature string `json:"signature"` + ID string `json:"id"` +} + +type AuthEvent struct { + Event string + ID string + Timestamp int64 +} + +func signPayload(payload string, secret string) string { + var sig = hmac.New(sha256.New, []byte(secret)) + _, err := sig.Write([]byte(payload)) + if err != nil { + return "" + } + return hex.EncodeToString(sig.Sum(nil)) +} + diff --git a/bbgo/exchange/max/maxapi/public_websocket.go b/bbgo/exchange/max/maxapi/public_websocket.go index e6693f8fb..5fdb9faf0 100644 --- a/bbgo/exchange/max/maxapi/public_websocket.go +++ b/bbgo/exchange/max/maxapi/public_websocket.go @@ -2,8 +2,11 @@ package max import ( "context" + "fmt" "time" + "github.com/google/uuid" + "github.com/gorilla/websocket" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -32,10 +35,10 @@ var SubscribeAction = "subscribe" var UnsubscribeAction = "unsubscribe" //go:generate callbackgen -type PublicWebSocketService -type PublicWebSocketService struct { - BaseURL string +type WebSocketService struct { + baseURL, key, secret string - Conn *websocket.Conn + conn *websocket.Conn reconnectC chan struct{} @@ -52,38 +55,53 @@ type PublicWebSocketService struct { subscriptionEventCallbacks []func(e SubscriptionEvent) } -func NewPublicWebSocketService(wsURL string) *PublicWebSocketService { - return &PublicWebSocketService{ +func NewWebSocketService(wsURL string, key, secret string) *WebSocketService { + return &WebSocketService{ + key: key, + secret: secret, reconnectC: make(chan struct{}, 1), - BaseURL: wsURL, + baseURL: wsURL, } } -func (s *PublicWebSocketService) Connect(ctx context.Context) error { +func (s *WebSocketService) Connect(ctx context.Context) error { // pre-allocate the websocket client, the websocket client can be used for reconnecting. go s.read(ctx) return s.connect(ctx) } -func (s *PublicWebSocketService) connect(ctx context.Context) error { +func (s *WebSocketService) sendAuthMessage() error { + nonce := time.Now().UnixNano() / int64(time.Millisecond) + + auth := &AuthMessage{ + Action: "auth", + APIKey: s.key, + Nonce: nonce, + Signature: signPayload(fmt.Sprintf("%d", nonce), s.secret), + ID: uuid.New().String(), + } + return s.conn.WriteJSON(auth) +} + +func (s *WebSocketService) connect(ctx context.Context) error { dialer := websocket.DefaultDialer - conn, _, err := dialer.DialContext(ctx, s.BaseURL, nil) + conn, _, err := dialer.DialContext(ctx, s.baseURL, nil) if err != nil { return err } - s.Conn = conn + s.conn = conn return nil } -func (s *PublicWebSocketService) emitReconnect() { +func (s *WebSocketService) emitReconnect() { select { case s.reconnectC <- struct{}{}: default: } } -func (s *PublicWebSocketService) read(ctx context.Context) { +func (s *WebSocketService) read(ctx context.Context) { for { select { case <-ctx.Done(): @@ -96,7 +114,7 @@ func (s *PublicWebSocketService) read(ctx context.Context) { } default: - mt, msg, err := s.Conn.ReadMessage() + mt, msg, err := s.conn.ReadMessage() if err != nil { s.emitReconnect() @@ -120,7 +138,7 @@ func (s *PublicWebSocketService) read(ctx context.Context) { } } -func (s *PublicWebSocketService) dispatch(msg interface{}) { +func (s *WebSocketService) dispatch(msg interface{}) { switch e := msg.(type) { case *BookEvent: s.EmitBookEvent(*e) @@ -135,18 +153,18 @@ func (s *PublicWebSocketService) dispatch(msg interface{}) { } } -func (s *PublicWebSocketService) ClearSubscriptions() { +func (s *WebSocketService) ClearSubscriptions() { s.Subscriptions = nil } -func (s *PublicWebSocketService) Reconnect() { +func (s *WebSocketService) Reconnect() { logger.Info("reconnecting...") s.emitReconnect() } // Subscribe is a helper method for building subscription request from the internal mapping types. // (Internal public method) -func (s *PublicWebSocketService) Subscribe(channel string, market string) error { +func (s *WebSocketService) Subscribe(channel string, market string) error { s.AddSubscription(Subscription{ Channel: channel, Market: market, @@ -155,11 +173,11 @@ func (s *PublicWebSocketService) Subscribe(channel string, market string) error } // AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint. -func (s *PublicWebSocketService) AddSubscription(subscription Subscription) { +func (s *WebSocketService) AddSubscription(subscription Subscription) { s.Subscriptions = append(s.Subscriptions, subscription) } -func (s *PublicWebSocketService) Resubscribe() { +func (s *WebSocketService) Resubscribe() { // Calling Resubscribe() by websocket is not enough to refresh orderbook. // We still need to get orderbook snapshot by rest client. // Therefore Reconnect() is used to simplify implementation. @@ -173,7 +191,7 @@ func (s *PublicWebSocketService) Resubscribe() { } } -func (s *PublicWebSocketService) SendSubscriptionRequest(action string) error { +func (s *WebSocketService) SendSubscriptionRequest(action string) error { request := WebsocketCommand{ Action: action, Subscriptions: s.Subscriptions, @@ -181,7 +199,7 @@ func (s *PublicWebSocketService) SendSubscriptionRequest(action string) error { logger.Debugf("sending websocket subscription: %+v", request) - if err := s.Conn.WriteJSON(request); err != nil { + if err := s.conn.WriteJSON(request); err != nil { return errors.Wrap(err, "Failed to send subscribe event") } @@ -189,6 +207,6 @@ func (s *PublicWebSocketService) SendSubscriptionRequest(action string) error { } // Close web socket connection -func (s *PublicWebSocketService) Close() error { - return s.Conn.Close() +func (s *WebSocketService) Close() error { + return s.conn.Close() } diff --git a/bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go b/bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go index 2d75cd583..f21792091 100644 --- a/bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go +++ b/bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go @@ -1,64 +1,64 @@ -// Code generated by "callbackgen -type PublicWebSocketService"; DO NOT EDIT. +// Code generated by "callbackgen -type WebSocketService"; DO NOT EDIT. package max import () -func (s *PublicWebSocketService) OnError(cb func(err error)) { +func (s *WebSocketService) OnError(cb func(err error)) { s.errorCallbacks = append(s.errorCallbacks, cb) } -func (s *PublicWebSocketService) EmitError(err error) { +func (s *WebSocketService) EmitError(err error) { for _, cb := range s.errorCallbacks { cb(err) } } -func (s *PublicWebSocketService) OnMessage(cb func(message []byte)) { +func (s *WebSocketService) OnMessage(cb func(message []byte)) { s.messageCallbacks = append(s.messageCallbacks, cb) } -func (s *PublicWebSocketService) EmitMessage(message []byte) { +func (s *WebSocketService) EmitMessage(message []byte) { for _, cb := range s.messageCallbacks { cb(message) } } -func (s *PublicWebSocketService) OnBookEvent(cb func(e BookEvent)) { +func (s *WebSocketService) OnBookEvent(cb func(e BookEvent)) { s.bookEventCallbacks = append(s.bookEventCallbacks, cb) } -func (s *PublicWebSocketService) EmitBookEvent(e BookEvent) { +func (s *WebSocketService) EmitBookEvent(e BookEvent) { for _, cb := range s.bookEventCallbacks { cb(e) } } -func (s *PublicWebSocketService) OnTradeEvent(cb func(e TradeEvent)) { +func (s *WebSocketService) OnTradeEvent(cb func(e TradeEvent)) { s.tradeEventCallbacks = append(s.tradeEventCallbacks, cb) } -func (s *PublicWebSocketService) EmitTradeEvent(e TradeEvent) { +func (s *WebSocketService) EmitTradeEvent(e TradeEvent) { for _, cb := range s.tradeEventCallbacks { cb(e) } } -func (s *PublicWebSocketService) OnErrorEvent(cb func(e ErrorEvent)) { +func (s *WebSocketService) OnErrorEvent(cb func(e ErrorEvent)) { s.errorEventCallbacks = append(s.errorEventCallbacks, cb) } -func (s *PublicWebSocketService) EmitErrorEvent(e ErrorEvent) { +func (s *WebSocketService) EmitErrorEvent(e ErrorEvent) { for _, cb := range s.errorEventCallbacks { cb(e) } } -func (s *PublicWebSocketService) OnSubscriptionEvent(cb func(e SubscriptionEvent)) { +func (s *WebSocketService) OnSubscriptionEvent(cb func(e SubscriptionEvent)) { s.subscriptionEventCallbacks = append(s.subscriptionEventCallbacks, cb) } -func (s *PublicWebSocketService) EmitSubscriptionEvent(e SubscriptionEvent) { +func (s *WebSocketService) EmitSubscriptionEvent(e SubscriptionEvent) { for _, cb := range s.subscriptionEventCallbacks { cb(e) } diff --git a/bbgo/exchange/max/maxapi/userdata.go b/bbgo/exchange/max/maxapi/userdata.go new file mode 100644 index 000000000..55f14931a --- /dev/null +++ b/bbgo/exchange/max/maxapi/userdata.go @@ -0,0 +1,236 @@ +package max + +import ( + "github.com/pkg/errors" + "github.com/valyala/fastjson" +) + +func ParsePrivateEvent(message []byte) (interface{}, error) { + var fp fastjson.Parser + var v, err = fp.ParseBytes(message) + if err != nil { + return nil, errors.Wrap(err, "fail to parse account info raw message") + } + + eventType := string(v.GetStringBytes("e")) + switch eventType { + case "order_snapshot": + return parserOrderSnapshotEvent(v) + + case "order_update": + return parseOrderUpdateEvent(v) + + case "trade_snapshot": + return parseTradeSnapshotEvent(v) + + case "trade_update": + return parseTradeUpdateEvent(v) + + case "account_snapshot": + return parserAccountSnapshotEvent(v) + + case "account_update": + return parserAccountUpdateEvent(v) + + case "authenticated": + return parserAuthEvent(v) + + case "error": + logger.Errorf("error %s", message) + } + + return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", message) +} + +var ( + errParseOrder = errors.New("failed parse order") + errParseTrade = errors.New("failed parse trade") + errParseAccount = errors.New("failed parse account") +) + +type OrderUpdate struct { + Event string `json:"e"` + ID uint64 `json:"i"` + Side string `json:"sd"` + OrderType string `json:"ot"` + Price string `json:"p"` + Volume string `json:"v"` + AveragePrice string `json:"ap"` + State string `json:"S"` + Market string `json:"M"` + + RemainingVolume string `json:"rv"` + ExecutedVolume string `json:"ev"` + + TradesCount int64 `json:"tc"` + + GroupID int64 `json:"gi"` + ClientOID string `json:"ci"` + CreatedAtMs int64 `json:"T"` +} + +func parserOrderUpdate(v *fastjson.Value) (OrderUpdate, error) { + return OrderUpdate{ + Event: string(v.GetStringBytes("e")), + ID: v.GetUint64("i"), + Side: string(v.GetStringBytes("sd")), + Market: string(v.GetStringBytes("M")), + OrderType: string(v.GetStringBytes("ot")), + State: string(v.GetStringBytes("S")), + Price: string(v.GetStringBytes("p")), + AveragePrice: string(v.GetStringBytes("ap")), + Volume: string(v.GetStringBytes("v")), + RemainingVolume: string(v.GetStringBytes("rv")), + ExecutedVolume: string(v.GetStringBytes("ev")), + TradesCount: v.GetInt64("tc"), + GroupID: v.GetInt64("gi"), + ClientOID: string(v.GetStringBytes("ci")), + CreatedAtMs: v.GetInt64("T"), + }, nil +} + +func parseOrderUpdateEvent(v *fastjson.Value) (OrderUpdate, error) { + rawOrders := v.GetArray("o") + if len(rawOrders) == 0 { + return OrderUpdate{}, errParseOrder + } + + return parserOrderUpdate(rawOrders[0]) +} + +type OrderSnapshot []OrderUpdate + +func parserOrderSnapshotEvent(v *fastjson.Value) (orderSnapshot OrderSnapshot, err error) { + var errCount int + + rawOrders := v.GetArray("o") + for _, ov := range rawOrders { + o, e := parserOrderUpdate(ov) + if e != nil { + errCount++ + err = e + } else { + orderSnapshot = append(orderSnapshot, o) + } + } + + if errCount > 0 { + err = errors.Wrapf(err, "failed to parse order snapshot. %d errors in order snapshot. The last error: ", errCount) + } + + return +} + +type TradeUpdate struct { + ID uint64 `json:"i"` + Side string `json:"sd"` + Price string `json:"p"` + Volume string `json:"v"` + Market string `json:"M"` + + Fee string `json:"f"` + FeeCurrency string `json:"fc"` + Timestamp int64 `json:"T"` + + OrderID uint64 `json:"oi"` +} + +func parseTradeUpdate(v *fastjson.Value) (TradeUpdate, error) { + return TradeUpdate{ + ID: v.GetUint64("i"), + Side: string(v.GetStringBytes("sd")), + Price: string(v.GetStringBytes("p")), + Volume: string(v.GetStringBytes("v")), + Market: string(v.GetStringBytes("M")), + Fee: string(v.GetStringBytes("f")), + FeeCurrency: string(v.GetStringBytes("fc")), + Timestamp: v.GetInt64("T"), + OrderID: v.GetUint64("oi"), + }, nil +} + +func parseTradeUpdateEvent(v *fastjson.Value) (TradeUpdate, error) { + rawTrades := v.GetArray("t") + if len(rawTrades) == 0 { + return TradeUpdate{}, errParseTrade + } + + return parseTradeUpdate(rawTrades[0]) +} + +type TradeSnapshot []TradeUpdate + +func parseTradeSnapshotEvent(v *fastjson.Value) (tradeSnapshot TradeSnapshot, err error) { + var errCount int + + rawTrades := v.GetArray("t") + for _, tv := range rawTrades { + t, e := parseTradeUpdate(tv) + if e != nil { + errCount++ + err = e + } else { + tradeSnapshot = append(tradeSnapshot, t) + } + } + + if errCount > 0 { + err = errors.Wrapf(err, "failed to parse trade snapshot. %d errors in trade snapshot. The last error: ", errCount) + } + + return +} + +type Balance struct { + Currency string `json:"cu"` + Available string `json:"av"` + Locked string `json:"l"` +} + +func parseBalance(v *fastjson.Value) (Balance, error) { + return Balance{ + Currency: string(v.GetStringBytes("cu")), + Available: string(v.GetStringBytes("av")), + Locked: string(v.GetStringBytes("l")), + }, nil +} + +func parserAccountUpdateEvent(v *fastjson.Value) (Balance, error) { + rawBalances := v.GetArray("B") + if len(rawBalances) == 0 { + return Balance{}, errParseAccount + } + + return parseBalance(rawBalances[0]) +} + +type BalanceSnapshot []Balance + +func parserAccountSnapshotEvent(v *fastjson.Value) (balanceSnapshot BalanceSnapshot, err error) { + var errCount int + + rawBalances := v.GetArray("B") + for _, bv := range rawBalances { + b, e := parseBalance(bv) + if e != nil { + errCount++ + err = e + } else { + balanceSnapshot = append(balanceSnapshot, b) + } + } + + if errCount > 0 { + err = errors.Wrapf(err, "failed to parse balance snapshot. %d errors in balance snapshot. The last error: ", errCount) + } + + return +} + +func parserAuthEvent(v *fastjson.Value) (AuthEvent, error) { + return AuthEvent{ + Event: string(v.GetStringBytes("e")), + ID: string(v.GetStringBytes("i")), + Timestamp: v.GetInt64("T"), + }, nil +}