diff --git a/bbgo/exchange/max/maxapi/public_parser.go b/bbgo/exchange/max/maxapi/public_parser.go index 653505327..203b176ba 100644 --- a/bbgo/exchange/max/maxapi/public_parser.go +++ b/bbgo/exchange/max/maxapi/public_parser.go @@ -17,7 +17,7 @@ const Buy = 1 const Sell = -1 // ParseMessage accepts the raw messages from max public websocket channels and parses them into market data -// Return types: *BookEvent, *TradeEvent, *SubscriptionEvent, *ErrorEvent +// Return types: *BookEvent, *PublicTradeEvent, *SubscriptionEvent, *ErrorEvent func ParseMessage(payload []byte) (interface{}, error) { parser := fastjson.Parser{} val, err := parser.ParseBytes(payload) @@ -30,7 +30,7 @@ func ParseMessage(payload []byte) (interface{}, error) { case "book": return parseBookEvent(val) case "trade": - return parseTradeEvent(val) + return parsePublicTradeEvent(val) case "user": return ParseUserEvent(val) } @@ -68,7 +68,7 @@ func parseTradeEntry(val *fastjson.Value) TradeEntry { } } -type TradeEvent struct { +type PublicTradeEvent struct { Event string `json:"e"` Market string `json:"M"` Channel string `json:"c"` @@ -76,12 +76,12 @@ type TradeEvent struct { Timestamp int64 `json:"T"` } -func (e *TradeEvent) Time() time.Time { +func (e *PublicTradeEvent) Time() time.Time { return time.Unix(0, e.Timestamp*int64(time.Millisecond)) } -func parseTradeEvent(val *fastjson.Value) (*TradeEvent, error) { - event := TradeEvent{ +func parsePublicTradeEvent(val *fastjson.Value) (*PublicTradeEvent, error) { + event := PublicTradeEvent{ Event: string(val.GetStringBytes("e")), Market: string(val.GetStringBytes("M")), Channel: string(val.GetStringBytes("c")), diff --git a/bbgo/exchange/max/maxapi/userdata.go b/bbgo/exchange/max/maxapi/userdata.go index e5ca14d1a..2f357cb27 100644 --- a/bbgo/exchange/max/maxapi/userdata.go +++ b/bbgo/exchange/max/maxapi/userdata.go @@ -6,18 +6,18 @@ import ( ) type BaseEvent struct { - Event string `json:"e"` - Timestamp int64 `json:"T"` + Event string `json:"e"` + Timestamp int64 `json:"T"` } type OrderUpdate struct { - Event string `json:"e"` - ID uint64 `json:"i"` - Side string `json:"sd"` - OrderType string `json:"ot"` + Event string `json:"e"` + ID uint64 `json:"i"` + Side string `json:"sd"` + OrderType string `json:"ot"` - Price string `json:"p"` - StopPrice string `json:"sp"` + Price string `json:"p"` + StopPrice string `json:"sp"` Volume string `json:"v"` AveragePrice string `json:"ap"` @@ -61,7 +61,8 @@ func parserOrderUpdate(v *fastjson.Value) OrderUpdate { } } -func parseOrderUpdateEvent(v *fastjson.Value) (e OrderUpdateEvent) { +func parseOrderUpdateEvent(v *fastjson.Value) *OrderUpdateEvent { + var e OrderUpdateEvent e.Event = string(v.GetStringBytes("e")) e.Timestamp = v.GetInt64("T") @@ -70,16 +71,17 @@ func parseOrderUpdateEvent(v *fastjson.Value) (e OrderUpdateEvent) { e.Orders = append(e.Orders, o) } - return e + return &e } type OrderSnapshotEvent struct { BaseEvent - Orders []OrderUpdate `json:"o"` + Orders []OrderUpdate `json:"o"` } -func parserOrderSnapshotEvent(v *fastjson.Value) (e OrderSnapshotEvent) { +func parserOrderSnapshotEvent(v *fastjson.Value) *OrderSnapshotEvent { + var e OrderSnapshotEvent e.Event = string(v.GetStringBytes("e")) e.Timestamp = v.GetInt64("T") @@ -88,7 +90,7 @@ func parserOrderSnapshotEvent(v *fastjson.Value) (e OrderSnapshotEvent) { e.Orders = append(e.Orders, o) } - return e + return &e } type TradeUpdate struct { @@ -125,7 +127,8 @@ type TradeUpdateEvent struct { Trades []TradeUpdate `json:"t"` } -func parseTradeUpdateEvent(v *fastjson.Value) (e TradeUpdateEvent) { +func parseTradeUpdateEvent(v *fastjson.Value) *TradeUpdateEvent { + var e TradeUpdateEvent e.Event = string(v.GetStringBytes("e")) e.Timestamp = v.GetInt64("T") @@ -133,7 +136,7 @@ func parseTradeUpdateEvent(v *fastjson.Value) (e TradeUpdateEvent) { e.Trades = append(e.Trades, parseTradeUpdate(tv)) } - return e + return &e } type TradeSnapshot []TradeUpdate @@ -144,7 +147,8 @@ type TradeSnapshotEvent struct { Trades []TradeUpdate `json:"t"` } -func parseTradeSnapshotEvent(v *fastjson.Value) (e TradeSnapshotEvent) { +func parseTradeSnapshotEvent(v *fastjson.Value) *TradeSnapshotEvent { + var e TradeSnapshotEvent e.Event = string(v.GetStringBytes("e")) e.Timestamp = v.GetInt64("T") @@ -152,7 +156,7 @@ func parseTradeSnapshotEvent(v *fastjson.Value) (e TradeSnapshotEvent) { e.Trades = append(e.Trades, parseTradeUpdate(tv)) } - return e + return &e } type BalanceMessage struct { @@ -169,13 +173,13 @@ func parseBalance(v *fastjson.Value) BalanceMessage { } } - type AccountUpdateEvent struct { BaseEvent Balances []BalanceMessage `json:"B"` } -func parserAccountUpdateEvent(v *fastjson.Value) (e AccountUpdateEvent) { +func parserAccountUpdateEvent(v *fastjson.Value) *AccountUpdateEvent { + var e AccountUpdateEvent e.Event = string(v.GetStringBytes("e")) e.Timestamp = v.GetInt64("T") @@ -183,7 +187,7 @@ func parserAccountUpdateEvent(v *fastjson.Value) (e AccountUpdateEvent) { e.Balances = append(e.Balances, parseBalance(bv)) } - return e + return &e } type AccountSnapshotEvent struct { @@ -191,7 +195,8 @@ type AccountSnapshotEvent struct { Balances []BalanceMessage `json:"B"` } -func parserAccountSnapshotEvent(v *fastjson.Value) (e AccountSnapshotEvent) { +func parserAccountSnapshotEvent(v *fastjson.Value) *AccountSnapshotEvent { + var e AccountSnapshotEvent e.Event = string(v.GetStringBytes("e")) e.Timestamp = v.GetInt64("T") @@ -199,18 +204,17 @@ func parserAccountSnapshotEvent(v *fastjson.Value) (e AccountSnapshotEvent) { e.Balances = append(e.Balances, parseBalance(bv)) } - return e + return &e } -func parseAuthEvent(v *fastjson.Value) AuthEvent { - return AuthEvent{ +func parseAuthEvent(v *fastjson.Value) *AuthEvent { + return &AuthEvent{ Event: string(v.GetStringBytes("e")), ID: string(v.GetStringBytes("i")), Timestamp: v.GetInt64("T"), } } - func ParseUserEvent(v *fastjson.Value) (interface{}, error) { eventType := string(v.GetStringBytes("e")) switch eventType { diff --git a/bbgo/exchange/max/maxapi/websocket.go b/bbgo/exchange/max/maxapi/websocket.go index 1e9ab1eb1..e9c56a03f 100644 --- a/bbgo/exchange/max/maxapi/websocket.go +++ b/bbgo/exchange/max/maxapi/websocket.go @@ -44,15 +44,23 @@ type WebSocketService struct { // Subscriptions is the subscription request payloads that will be used for sending subscription request Subscriptions []Subscription - connectCallbacks []func(conn *websocket.Conn) + connectCallbacks []func(conn *websocket.Conn) disconnectCallbacks []func(conn *websocket.Conn) errorCallbacks []func(err error) messageCallbacks []func(message []byte) bookEventCallbacks []func(e BookEvent) - tradeEventCallbacks []func(e TradeEvent) + tradeEventCallbacks []func(e PublicTradeEvent) errorEventCallbacks []func(e ErrorEvent) subscriptionEventCallbacks []func(e SubscriptionEvent) + + tradeUpdateEventCallbacks []func(e TradeUpdateEvent) + tradeSnapshotEventCallbacks []func(e TradeSnapshotEvent) + orderUpdateEventCallbacks []func(e OrderUpdateEvent) + orderSnapshotEventCallbacks []func(e OrderSnapshotEvent) + + accountSnapshotEventCallbacks []func(e AccountSnapshotEvent) + accountUpdateEventCallbacks []func(e AccountUpdateEvent) } func NewWebSocketService(wsURL string, key, secret string) *WebSocketService { @@ -80,7 +88,7 @@ func (s *WebSocketService) Connect(ctx context.Context) error { }) // pre-allocate the websocket client, the websocket client can be used for reconnecting. - if err := s.connect(ctx) ; err != nil { + if err := s.connect(ctx); err != nil { return err } go s.read(ctx) @@ -109,8 +117,6 @@ func (s *WebSocketService) connect(ctx context.Context) error { s.conn = conn s.EmitConnect(conn) - - return nil } @@ -160,14 +166,37 @@ func (s *WebSocketService) read(ctx context.Context) { func (s *WebSocketService) dispatch(msg interface{}) { switch e := msg.(type) { + case *BookEvent: s.EmitBookEvent(*e) - case *TradeEvent: + + case *PublicTradeEvent: s.EmitTradeEvent(*e) + case *ErrorEvent: s.EmitErrorEvent(*e) + case *SubscriptionEvent: s.EmitSubscriptionEvent(*e) + + case *TradeSnapshotEvent: + s.EmitTradeSnapshotEvent(*e) + + case *TradeUpdateEvent: + s.EmitTradeUpdateEvent(*e) + + case *AccountSnapshotEvent: + s.EmitAccountSnapshotEvent(*e) + + case *AccountUpdateEvent: + s.EmitAccountUpdateEvent(*e) + + case *OrderSnapshotEvent: + s.EmitOrderSnapshotEvent(*e) + + case *OrderUpdateEvent: + s.EmitOrderUpdateEvent(*e) + default: s.EmitError(errors.Errorf("unsupported %T event: %+v", e, e)) } diff --git a/bbgo/exchange/max/maxapi/websocketservice_callbacks.go b/bbgo/exchange/max/maxapi/websocketservice_callbacks.go index 0644dd761..7b60604dc 100644 --- a/bbgo/exchange/max/maxapi/websocketservice_callbacks.go +++ b/bbgo/exchange/max/maxapi/websocketservice_callbacks.go @@ -56,11 +56,11 @@ func (s *WebSocketService) EmitBookEvent(e BookEvent) { } } -func (s *WebSocketService) OnTradeEvent(cb func(e TradeEvent)) { +func (s *WebSocketService) OnTradeEvent(cb func(e PublicTradeEvent)) { s.tradeEventCallbacks = append(s.tradeEventCallbacks, cb) } -func (s *WebSocketService) EmitTradeEvent(e TradeEvent) { +func (s *WebSocketService) EmitTradeEvent(e PublicTradeEvent) { for _, cb := range s.tradeEventCallbacks { cb(e) } @@ -85,3 +85,63 @@ func (s *WebSocketService) EmitSubscriptionEvent(e SubscriptionEvent) { cb(e) } } + +func (s *WebSocketService) OnTradeUpdateEvent(cb func(e TradeUpdateEvent)) { + s.tradeUpdateEventCallbacks = append(s.tradeUpdateEventCallbacks, cb) +} + +func (s *WebSocketService) EmitTradeUpdateEvent(e TradeUpdateEvent) { + for _, cb := range s.tradeUpdateEventCallbacks { + cb(e) + } +} + +func (s *WebSocketService) OnTradeSnapshotEvent(cb func(e TradeSnapshotEvent)) { + s.tradeSnapshotEventCallbacks = append(s.tradeSnapshotEventCallbacks, cb) +} + +func (s *WebSocketService) EmitTradeSnapshotEvent(e TradeSnapshotEvent) { + for _, cb := range s.tradeSnapshotEventCallbacks { + cb(e) + } +} + +func (s *WebSocketService) OnOrderUpdateEvent(cb func(e OrderUpdateEvent)) { + s.orderUpdateEventCallbacks = append(s.orderUpdateEventCallbacks, cb) +} + +func (s *WebSocketService) EmitOrderUpdateEvent(e OrderUpdateEvent) { + for _, cb := range s.orderUpdateEventCallbacks { + cb(e) + } +} + +func (s *WebSocketService) OnOrderSnapshotEvent(cb func(e OrderSnapshotEvent)) { + s.orderSnapshotEventCallbacks = append(s.orderSnapshotEventCallbacks, cb) +} + +func (s *WebSocketService) EmitOrderSnapshotEvent(e OrderSnapshotEvent) { + for _, cb := range s.orderSnapshotEventCallbacks { + cb(e) + } +} + +func (s *WebSocketService) OnAccountSnapshotEvent(cb func(e AccountSnapshotEvent)) { + s.accountSnapshotEventCallbacks = append(s.accountSnapshotEventCallbacks, cb) +} + +func (s *WebSocketService) EmitAccountSnapshotEvent(e AccountSnapshotEvent) { + for _, cb := range s.accountSnapshotEventCallbacks { + cb(e) + } +} + +func (s *WebSocketService) OnAccountUpdateEvent(cb func(e AccountUpdateEvent)) { + s.accountUpdateEventCallbacks = append(s.accountUpdateEventCallbacks, cb) +} + +func (s *WebSocketService) EmitAccountUpdateEvent(e AccountUpdateEvent) { + for _, cb := range s.accountUpdateEventCallbacks { + cb(e) + } +}