implement private message parsing

This commit is contained in:
c9s 2020-10-01 16:48:08 +08:00
parent 94fb026149
commit 0708cee962
4 changed files with 321 additions and 36 deletions

View File

@ -0,0 +1,31 @@
package max
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
)
type AuthMessage struct {
Action string `json:"action"`
APIKey string `json:"apiKey"`
Nonce int64 `json:"nonce"`
Signature string `json:"signature"`
ID string `json:"id"`
}
type AuthEvent struct {
Event string
ID string
Timestamp int64
}
func signPayload(payload string, secret string) string {
var sig = hmac.New(sha256.New, []byte(secret))
_, err := sig.Write([]byte(payload))
if err != nil {
return ""
}
return hex.EncodeToString(sig.Sum(nil))
}

View File

@ -2,8 +2,11 @@ package max
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -32,10 +35,10 @@ var SubscribeAction = "subscribe"
var UnsubscribeAction = "unsubscribe" var UnsubscribeAction = "unsubscribe"
//go:generate callbackgen -type PublicWebSocketService //go:generate callbackgen -type PublicWebSocketService
type PublicWebSocketService struct { type WebSocketService struct {
BaseURL string baseURL, key, secret string
Conn *websocket.Conn conn *websocket.Conn
reconnectC chan struct{} reconnectC chan struct{}
@ -52,38 +55,53 @@ type PublicWebSocketService struct {
subscriptionEventCallbacks []func(e SubscriptionEvent) subscriptionEventCallbacks []func(e SubscriptionEvent)
} }
func NewPublicWebSocketService(wsURL string) *PublicWebSocketService { func NewWebSocketService(wsURL string, key, secret string) *WebSocketService {
return &PublicWebSocketService{ return &WebSocketService{
key: key,
secret: secret,
reconnectC: make(chan struct{}, 1), reconnectC: make(chan struct{}, 1),
BaseURL: wsURL, baseURL: wsURL,
} }
} }
func (s *PublicWebSocketService) Connect(ctx context.Context) error { func (s *WebSocketService) Connect(ctx context.Context) error {
// pre-allocate the websocket client, the websocket client can be used for reconnecting. // pre-allocate the websocket client, the websocket client can be used for reconnecting.
go s.read(ctx) go s.read(ctx)
return s.connect(ctx) return s.connect(ctx)
} }
func (s *PublicWebSocketService) connect(ctx context.Context) error { func (s *WebSocketService) sendAuthMessage() error {
nonce := time.Now().UnixNano() / int64(time.Millisecond)
auth := &AuthMessage{
Action: "auth",
APIKey: s.key,
Nonce: nonce,
Signature: signPayload(fmt.Sprintf("%d", nonce), s.secret),
ID: uuid.New().String(),
}
return s.conn.WriteJSON(auth)
}
func (s *WebSocketService) connect(ctx context.Context) error {
dialer := websocket.DefaultDialer dialer := websocket.DefaultDialer
conn, _, err := dialer.DialContext(ctx, s.BaseURL, nil) conn, _, err := dialer.DialContext(ctx, s.baseURL, nil)
if err != nil { if err != nil {
return err return err
} }
s.Conn = conn s.conn = conn
return nil return nil
} }
func (s *PublicWebSocketService) emitReconnect() { func (s *WebSocketService) emitReconnect() {
select { select {
case s.reconnectC <- struct{}{}: case s.reconnectC <- struct{}{}:
default: default:
} }
} }
func (s *PublicWebSocketService) read(ctx context.Context) { func (s *WebSocketService) read(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -96,7 +114,7 @@ func (s *PublicWebSocketService) read(ctx context.Context) {
} }
default: default:
mt, msg, err := s.Conn.ReadMessage() mt, msg, err := s.conn.ReadMessage()
if err != nil { if err != nil {
s.emitReconnect() s.emitReconnect()
@ -120,7 +138,7 @@ func (s *PublicWebSocketService) read(ctx context.Context) {
} }
} }
func (s *PublicWebSocketService) dispatch(msg interface{}) { func (s *WebSocketService) dispatch(msg interface{}) {
switch e := msg.(type) { switch e := msg.(type) {
case *BookEvent: case *BookEvent:
s.EmitBookEvent(*e) s.EmitBookEvent(*e)
@ -135,18 +153,18 @@ func (s *PublicWebSocketService) dispatch(msg interface{}) {
} }
} }
func (s *PublicWebSocketService) ClearSubscriptions() { func (s *WebSocketService) ClearSubscriptions() {
s.Subscriptions = nil s.Subscriptions = nil
} }
func (s *PublicWebSocketService) Reconnect() { func (s *WebSocketService) Reconnect() {
logger.Info("reconnecting...") logger.Info("reconnecting...")
s.emitReconnect() s.emitReconnect()
} }
// Subscribe is a helper method for building subscription request from the internal mapping types. // Subscribe is a helper method for building subscription request from the internal mapping types.
// (Internal public method) // (Internal public method)
func (s *PublicWebSocketService) Subscribe(channel string, market string) error { func (s *WebSocketService) Subscribe(channel string, market string) error {
s.AddSubscription(Subscription{ s.AddSubscription(Subscription{
Channel: channel, Channel: channel,
Market: market, Market: market,
@ -155,11 +173,11 @@ func (s *PublicWebSocketService) Subscribe(channel string, market string) error
} }
// AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint. // AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint.
func (s *PublicWebSocketService) AddSubscription(subscription Subscription) { func (s *WebSocketService) AddSubscription(subscription Subscription) {
s.Subscriptions = append(s.Subscriptions, subscription) s.Subscriptions = append(s.Subscriptions, subscription)
} }
func (s *PublicWebSocketService) Resubscribe() { func (s *WebSocketService) Resubscribe() {
// Calling Resubscribe() by websocket is not enough to refresh orderbook. // Calling Resubscribe() by websocket is not enough to refresh orderbook.
// We still need to get orderbook snapshot by rest client. // We still need to get orderbook snapshot by rest client.
// Therefore Reconnect() is used to simplify implementation. // Therefore Reconnect() is used to simplify implementation.
@ -173,7 +191,7 @@ func (s *PublicWebSocketService) Resubscribe() {
} }
} }
func (s *PublicWebSocketService) SendSubscriptionRequest(action string) error { func (s *WebSocketService) SendSubscriptionRequest(action string) error {
request := WebsocketCommand{ request := WebsocketCommand{
Action: action, Action: action,
Subscriptions: s.Subscriptions, Subscriptions: s.Subscriptions,
@ -181,7 +199,7 @@ func (s *PublicWebSocketService) SendSubscriptionRequest(action string) error {
logger.Debugf("sending websocket subscription: %+v", request) logger.Debugf("sending websocket subscription: %+v", request)
if err := s.Conn.WriteJSON(request); err != nil { if err := s.conn.WriteJSON(request); err != nil {
return errors.Wrap(err, "Failed to send subscribe event") return errors.Wrap(err, "Failed to send subscribe event")
} }
@ -189,6 +207,6 @@ func (s *PublicWebSocketService) SendSubscriptionRequest(action string) error {
} }
// Close web socket connection // Close web socket connection
func (s *PublicWebSocketService) Close() error { func (s *WebSocketService) Close() error {
return s.Conn.Close() return s.conn.Close()
} }

View File

@ -1,64 +1,64 @@
// Code generated by "callbackgen -type PublicWebSocketService"; DO NOT EDIT. // Code generated by "callbackgen -type WebSocketService"; DO NOT EDIT.
package max package max
import () import ()
func (s *PublicWebSocketService) OnError(cb func(err error)) { func (s *WebSocketService) OnError(cb func(err error)) {
s.errorCallbacks = append(s.errorCallbacks, cb) s.errorCallbacks = append(s.errorCallbacks, cb)
} }
func (s *PublicWebSocketService) EmitError(err error) { func (s *WebSocketService) EmitError(err error) {
for _, cb := range s.errorCallbacks { for _, cb := range s.errorCallbacks {
cb(err) cb(err)
} }
} }
func (s *PublicWebSocketService) OnMessage(cb func(message []byte)) { func (s *WebSocketService) OnMessage(cb func(message []byte)) {
s.messageCallbacks = append(s.messageCallbacks, cb) s.messageCallbacks = append(s.messageCallbacks, cb)
} }
func (s *PublicWebSocketService) EmitMessage(message []byte) { func (s *WebSocketService) EmitMessage(message []byte) {
for _, cb := range s.messageCallbacks { for _, cb := range s.messageCallbacks {
cb(message) cb(message)
} }
} }
func (s *PublicWebSocketService) OnBookEvent(cb func(e BookEvent)) { func (s *WebSocketService) OnBookEvent(cb func(e BookEvent)) {
s.bookEventCallbacks = append(s.bookEventCallbacks, cb) s.bookEventCallbacks = append(s.bookEventCallbacks, cb)
} }
func (s *PublicWebSocketService) EmitBookEvent(e BookEvent) { func (s *WebSocketService) EmitBookEvent(e BookEvent) {
for _, cb := range s.bookEventCallbacks { for _, cb := range s.bookEventCallbacks {
cb(e) cb(e)
} }
} }
func (s *PublicWebSocketService) OnTradeEvent(cb func(e TradeEvent)) { func (s *WebSocketService) OnTradeEvent(cb func(e TradeEvent)) {
s.tradeEventCallbacks = append(s.tradeEventCallbacks, cb) s.tradeEventCallbacks = append(s.tradeEventCallbacks, cb)
} }
func (s *PublicWebSocketService) EmitTradeEvent(e TradeEvent) { func (s *WebSocketService) EmitTradeEvent(e TradeEvent) {
for _, cb := range s.tradeEventCallbacks { for _, cb := range s.tradeEventCallbacks {
cb(e) cb(e)
} }
} }
func (s *PublicWebSocketService) OnErrorEvent(cb func(e ErrorEvent)) { func (s *WebSocketService) OnErrorEvent(cb func(e ErrorEvent)) {
s.errorEventCallbacks = append(s.errorEventCallbacks, cb) s.errorEventCallbacks = append(s.errorEventCallbacks, cb)
} }
func (s *PublicWebSocketService) EmitErrorEvent(e ErrorEvent) { func (s *WebSocketService) EmitErrorEvent(e ErrorEvent) {
for _, cb := range s.errorEventCallbacks { for _, cb := range s.errorEventCallbacks {
cb(e) cb(e)
} }
} }
func (s *PublicWebSocketService) OnSubscriptionEvent(cb func(e SubscriptionEvent)) { func (s *WebSocketService) OnSubscriptionEvent(cb func(e SubscriptionEvent)) {
s.subscriptionEventCallbacks = append(s.subscriptionEventCallbacks, cb) s.subscriptionEventCallbacks = append(s.subscriptionEventCallbacks, cb)
} }
func (s *PublicWebSocketService) EmitSubscriptionEvent(e SubscriptionEvent) { func (s *WebSocketService) EmitSubscriptionEvent(e SubscriptionEvent) {
for _, cb := range s.subscriptionEventCallbacks { for _, cb := range s.subscriptionEventCallbacks {
cb(e) cb(e)
} }

View File

@ -0,0 +1,236 @@
package max
import (
"github.com/pkg/errors"
"github.com/valyala/fastjson"
)
func ParsePrivateEvent(message []byte) (interface{}, error) {
var fp fastjson.Parser
var v, err = fp.ParseBytes(message)
if err != nil {
return nil, errors.Wrap(err, "fail to parse account info raw message")
}
eventType := string(v.GetStringBytes("e"))
switch eventType {
case "order_snapshot":
return parserOrderSnapshotEvent(v)
case "order_update":
return parseOrderUpdateEvent(v)
case "trade_snapshot":
return parseTradeSnapshotEvent(v)
case "trade_update":
return parseTradeUpdateEvent(v)
case "account_snapshot":
return parserAccountSnapshotEvent(v)
case "account_update":
return parserAccountUpdateEvent(v)
case "authenticated":
return parserAuthEvent(v)
case "error":
logger.Errorf("error %s", message)
}
return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", message)
}
var (
errParseOrder = errors.New("failed parse order")
errParseTrade = errors.New("failed parse trade")
errParseAccount = errors.New("failed parse account")
)
type OrderUpdate struct {
Event string `json:"e"`
ID uint64 `json:"i"`
Side string `json:"sd"`
OrderType string `json:"ot"`
Price string `json:"p"`
Volume string `json:"v"`
AveragePrice string `json:"ap"`
State string `json:"S"`
Market string `json:"M"`
RemainingVolume string `json:"rv"`
ExecutedVolume string `json:"ev"`
TradesCount int64 `json:"tc"`
GroupID int64 `json:"gi"`
ClientOID string `json:"ci"`
CreatedAtMs int64 `json:"T"`
}
func parserOrderUpdate(v *fastjson.Value) (OrderUpdate, error) {
return OrderUpdate{
Event: string(v.GetStringBytes("e")),
ID: v.GetUint64("i"),
Side: string(v.GetStringBytes("sd")),
Market: string(v.GetStringBytes("M")),
OrderType: string(v.GetStringBytes("ot")),
State: string(v.GetStringBytes("S")),
Price: string(v.GetStringBytes("p")),
AveragePrice: string(v.GetStringBytes("ap")),
Volume: string(v.GetStringBytes("v")),
RemainingVolume: string(v.GetStringBytes("rv")),
ExecutedVolume: string(v.GetStringBytes("ev")),
TradesCount: v.GetInt64("tc"),
GroupID: v.GetInt64("gi"),
ClientOID: string(v.GetStringBytes("ci")),
CreatedAtMs: v.GetInt64("T"),
}, nil
}
func parseOrderUpdateEvent(v *fastjson.Value) (OrderUpdate, error) {
rawOrders := v.GetArray("o")
if len(rawOrders) == 0 {
return OrderUpdate{}, errParseOrder
}
return parserOrderUpdate(rawOrders[0])
}
type OrderSnapshot []OrderUpdate
func parserOrderSnapshotEvent(v *fastjson.Value) (orderSnapshot OrderSnapshot, err error) {
var errCount int
rawOrders := v.GetArray("o")
for _, ov := range rawOrders {
o, e := parserOrderUpdate(ov)
if e != nil {
errCount++
err = e
} else {
orderSnapshot = append(orderSnapshot, o)
}
}
if errCount > 0 {
err = errors.Wrapf(err, "failed to parse order snapshot. %d errors in order snapshot. The last error: ", errCount)
}
return
}
type TradeUpdate struct {
ID uint64 `json:"i"`
Side string `json:"sd"`
Price string `json:"p"`
Volume string `json:"v"`
Market string `json:"M"`
Fee string `json:"f"`
FeeCurrency string `json:"fc"`
Timestamp int64 `json:"T"`
OrderID uint64 `json:"oi"`
}
func parseTradeUpdate(v *fastjson.Value) (TradeUpdate, error) {
return TradeUpdate{
ID: v.GetUint64("i"),
Side: string(v.GetStringBytes("sd")),
Price: string(v.GetStringBytes("p")),
Volume: string(v.GetStringBytes("v")),
Market: string(v.GetStringBytes("M")),
Fee: string(v.GetStringBytes("f")),
FeeCurrency: string(v.GetStringBytes("fc")),
Timestamp: v.GetInt64("T"),
OrderID: v.GetUint64("oi"),
}, nil
}
func parseTradeUpdateEvent(v *fastjson.Value) (TradeUpdate, error) {
rawTrades := v.GetArray("t")
if len(rawTrades) == 0 {
return TradeUpdate{}, errParseTrade
}
return parseTradeUpdate(rawTrades[0])
}
type TradeSnapshot []TradeUpdate
func parseTradeSnapshotEvent(v *fastjson.Value) (tradeSnapshot TradeSnapshot, err error) {
var errCount int
rawTrades := v.GetArray("t")
for _, tv := range rawTrades {
t, e := parseTradeUpdate(tv)
if e != nil {
errCount++
err = e
} else {
tradeSnapshot = append(tradeSnapshot, t)
}
}
if errCount > 0 {
err = errors.Wrapf(err, "failed to parse trade snapshot. %d errors in trade snapshot. The last error: ", errCount)
}
return
}
type Balance struct {
Currency string `json:"cu"`
Available string `json:"av"`
Locked string `json:"l"`
}
func parseBalance(v *fastjson.Value) (Balance, error) {
return Balance{
Currency: string(v.GetStringBytes("cu")),
Available: string(v.GetStringBytes("av")),
Locked: string(v.GetStringBytes("l")),
}, nil
}
func parserAccountUpdateEvent(v *fastjson.Value) (Balance, error) {
rawBalances := v.GetArray("B")
if len(rawBalances) == 0 {
return Balance{}, errParseAccount
}
return parseBalance(rawBalances[0])
}
type BalanceSnapshot []Balance
func parserAccountSnapshotEvent(v *fastjson.Value) (balanceSnapshot BalanceSnapshot, err error) {
var errCount int
rawBalances := v.GetArray("B")
for _, bv := range rawBalances {
b, e := parseBalance(bv)
if e != nil {
errCount++
err = e
} else {
balanceSnapshot = append(balanceSnapshot, b)
}
}
if errCount > 0 {
err = errors.Wrapf(err, "failed to parse balance snapshot. %d errors in balance snapshot. The last error: ", errCount)
}
return
}
func parserAuthEvent(v *fastjson.Value) (AuthEvent, error) {
return AuthEvent{
Event: string(v.GetStringBytes("e")),
ID: string(v.GetStringBytes("i")),
Timestamp: v.GetInt64("T"),
}, nil
}