max: dispatch private messages

This commit is contained in:
c9s 2020-10-02 21:29:56 +08:00
parent 26674effa1
commit 189a1f7fe7
4 changed files with 132 additions and 39 deletions

View File

@ -17,7 +17,7 @@ const Buy = 1
const Sell = -1
// ParseMessage accepts the raw messages from max public websocket channels and parses them into market data
// Return types: *BookEvent, *TradeEvent, *SubscriptionEvent, *ErrorEvent
// Return types: *BookEvent, *PublicTradeEvent, *SubscriptionEvent, *ErrorEvent
func ParseMessage(payload []byte) (interface{}, error) {
parser := fastjson.Parser{}
val, err := parser.ParseBytes(payload)
@ -30,7 +30,7 @@ func ParseMessage(payload []byte) (interface{}, error) {
case "book":
return parseBookEvent(val)
case "trade":
return parseTradeEvent(val)
return parsePublicTradeEvent(val)
case "user":
return ParseUserEvent(val)
}
@ -68,7 +68,7 @@ func parseTradeEntry(val *fastjson.Value) TradeEntry {
}
}
type TradeEvent struct {
type PublicTradeEvent struct {
Event string `json:"e"`
Market string `json:"M"`
Channel string `json:"c"`
@ -76,12 +76,12 @@ type TradeEvent struct {
Timestamp int64 `json:"T"`
}
func (e *TradeEvent) Time() time.Time {
func (e *PublicTradeEvent) Time() time.Time {
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
}
func parseTradeEvent(val *fastjson.Value) (*TradeEvent, error) {
event := TradeEvent{
func parsePublicTradeEvent(val *fastjson.Value) (*PublicTradeEvent, error) {
event := PublicTradeEvent{
Event: string(val.GetStringBytes("e")),
Market: string(val.GetStringBytes("M")),
Channel: string(val.GetStringBytes("c")),

View File

@ -6,18 +6,18 @@ import (
)
type BaseEvent struct {
Event string `json:"e"`
Timestamp int64 `json:"T"`
Event string `json:"e"`
Timestamp int64 `json:"T"`
}
type OrderUpdate struct {
Event string `json:"e"`
ID uint64 `json:"i"`
Side string `json:"sd"`
OrderType string `json:"ot"`
Event string `json:"e"`
ID uint64 `json:"i"`
Side string `json:"sd"`
OrderType string `json:"ot"`
Price string `json:"p"`
StopPrice string `json:"sp"`
Price string `json:"p"`
StopPrice string `json:"sp"`
Volume string `json:"v"`
AveragePrice string `json:"ap"`
@ -61,7 +61,8 @@ func parserOrderUpdate(v *fastjson.Value) OrderUpdate {
}
}
func parseOrderUpdateEvent(v *fastjson.Value) (e OrderUpdateEvent) {
func parseOrderUpdateEvent(v *fastjson.Value) *OrderUpdateEvent {
var e OrderUpdateEvent
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
@ -70,16 +71,17 @@ func parseOrderUpdateEvent(v *fastjson.Value) (e OrderUpdateEvent) {
e.Orders = append(e.Orders, o)
}
return e
return &e
}
type OrderSnapshotEvent struct {
BaseEvent
Orders []OrderUpdate `json:"o"`
Orders []OrderUpdate `json:"o"`
}
func parserOrderSnapshotEvent(v *fastjson.Value) (e OrderSnapshotEvent) {
func parserOrderSnapshotEvent(v *fastjson.Value) *OrderSnapshotEvent {
var e OrderSnapshotEvent
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
@ -88,7 +90,7 @@ func parserOrderSnapshotEvent(v *fastjson.Value) (e OrderSnapshotEvent) {
e.Orders = append(e.Orders, o)
}
return e
return &e
}
type TradeUpdate struct {
@ -125,7 +127,8 @@ type TradeUpdateEvent struct {
Trades []TradeUpdate `json:"t"`
}
func parseTradeUpdateEvent(v *fastjson.Value) (e TradeUpdateEvent) {
func parseTradeUpdateEvent(v *fastjson.Value) *TradeUpdateEvent {
var e TradeUpdateEvent
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
@ -133,7 +136,7 @@ func parseTradeUpdateEvent(v *fastjson.Value) (e TradeUpdateEvent) {
e.Trades = append(e.Trades, parseTradeUpdate(tv))
}
return e
return &e
}
type TradeSnapshot []TradeUpdate
@ -144,7 +147,8 @@ type TradeSnapshotEvent struct {
Trades []TradeUpdate `json:"t"`
}
func parseTradeSnapshotEvent(v *fastjson.Value) (e TradeSnapshotEvent) {
func parseTradeSnapshotEvent(v *fastjson.Value) *TradeSnapshotEvent {
var e TradeSnapshotEvent
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
@ -152,7 +156,7 @@ func parseTradeSnapshotEvent(v *fastjson.Value) (e TradeSnapshotEvent) {
e.Trades = append(e.Trades, parseTradeUpdate(tv))
}
return e
return &e
}
type BalanceMessage struct {
@ -169,13 +173,13 @@ func parseBalance(v *fastjson.Value) BalanceMessage {
}
}
type AccountUpdateEvent struct {
BaseEvent
Balances []BalanceMessage `json:"B"`
}
func parserAccountUpdateEvent(v *fastjson.Value) (e AccountUpdateEvent) {
func parserAccountUpdateEvent(v *fastjson.Value) *AccountUpdateEvent {
var e AccountUpdateEvent
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
@ -183,7 +187,7 @@ func parserAccountUpdateEvent(v *fastjson.Value) (e AccountUpdateEvent) {
e.Balances = append(e.Balances, parseBalance(bv))
}
return e
return &e
}
type AccountSnapshotEvent struct {
@ -191,7 +195,8 @@ type AccountSnapshotEvent struct {
Balances []BalanceMessage `json:"B"`
}
func parserAccountSnapshotEvent(v *fastjson.Value) (e AccountSnapshotEvent) {
func parserAccountSnapshotEvent(v *fastjson.Value) *AccountSnapshotEvent {
var e AccountSnapshotEvent
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
@ -199,18 +204,17 @@ func parserAccountSnapshotEvent(v *fastjson.Value) (e AccountSnapshotEvent) {
e.Balances = append(e.Balances, parseBalance(bv))
}
return e
return &e
}
func parseAuthEvent(v *fastjson.Value) AuthEvent {
return AuthEvent{
func parseAuthEvent(v *fastjson.Value) *AuthEvent {
return &AuthEvent{
Event: string(v.GetStringBytes("e")),
ID: string(v.GetStringBytes("i")),
Timestamp: v.GetInt64("T"),
}
}
func ParseUserEvent(v *fastjson.Value) (interface{}, error) {
eventType := string(v.GetStringBytes("e"))
switch eventType {

View File

@ -44,15 +44,23 @@ type WebSocketService struct {
// Subscriptions is the subscription request payloads that will be used for sending subscription request
Subscriptions []Subscription
connectCallbacks []func(conn *websocket.Conn)
connectCallbacks []func(conn *websocket.Conn)
disconnectCallbacks []func(conn *websocket.Conn)
errorCallbacks []func(err error)
messageCallbacks []func(message []byte)
bookEventCallbacks []func(e BookEvent)
tradeEventCallbacks []func(e TradeEvent)
tradeEventCallbacks []func(e PublicTradeEvent)
errorEventCallbacks []func(e ErrorEvent)
subscriptionEventCallbacks []func(e SubscriptionEvent)
tradeUpdateEventCallbacks []func(e TradeUpdateEvent)
tradeSnapshotEventCallbacks []func(e TradeSnapshotEvent)
orderUpdateEventCallbacks []func(e OrderUpdateEvent)
orderSnapshotEventCallbacks []func(e OrderSnapshotEvent)
accountSnapshotEventCallbacks []func(e AccountSnapshotEvent)
accountUpdateEventCallbacks []func(e AccountUpdateEvent)
}
func NewWebSocketService(wsURL string, key, secret string) *WebSocketService {
@ -80,7 +88,7 @@ func (s *WebSocketService) Connect(ctx context.Context) error {
})
// pre-allocate the websocket client, the websocket client can be used for reconnecting.
if err := s.connect(ctx) ; err != nil {
if err := s.connect(ctx); err != nil {
return err
}
go s.read(ctx)
@ -109,8 +117,6 @@ func (s *WebSocketService) connect(ctx context.Context) error {
s.conn = conn
s.EmitConnect(conn)
return nil
}
@ -160,14 +166,37 @@ func (s *WebSocketService) read(ctx context.Context) {
func (s *WebSocketService) dispatch(msg interface{}) {
switch e := msg.(type) {
case *BookEvent:
s.EmitBookEvent(*e)
case *TradeEvent:
case *PublicTradeEvent:
s.EmitTradeEvent(*e)
case *ErrorEvent:
s.EmitErrorEvent(*e)
case *SubscriptionEvent:
s.EmitSubscriptionEvent(*e)
case *TradeSnapshotEvent:
s.EmitTradeSnapshotEvent(*e)
case *TradeUpdateEvent:
s.EmitTradeUpdateEvent(*e)
case *AccountSnapshotEvent:
s.EmitAccountSnapshotEvent(*e)
case *AccountUpdateEvent:
s.EmitAccountUpdateEvent(*e)
case *OrderSnapshotEvent:
s.EmitOrderSnapshotEvent(*e)
case *OrderUpdateEvent:
s.EmitOrderUpdateEvent(*e)
default:
s.EmitError(errors.Errorf("unsupported %T event: %+v", e, e))
}

View File

@ -56,11 +56,11 @@ func (s *WebSocketService) EmitBookEvent(e BookEvent) {
}
}
func (s *WebSocketService) OnTradeEvent(cb func(e TradeEvent)) {
func (s *WebSocketService) OnTradeEvent(cb func(e PublicTradeEvent)) {
s.tradeEventCallbacks = append(s.tradeEventCallbacks, cb)
}
func (s *WebSocketService) EmitTradeEvent(e TradeEvent) {
func (s *WebSocketService) EmitTradeEvent(e PublicTradeEvent) {
for _, cb := range s.tradeEventCallbacks {
cb(e)
}
@ -85,3 +85,63 @@ func (s *WebSocketService) EmitSubscriptionEvent(e SubscriptionEvent) {
cb(e)
}
}
func (s *WebSocketService) OnTradeUpdateEvent(cb func(e TradeUpdateEvent)) {
s.tradeUpdateEventCallbacks = append(s.tradeUpdateEventCallbacks, cb)
}
func (s *WebSocketService) EmitTradeUpdateEvent(e TradeUpdateEvent) {
for _, cb := range s.tradeUpdateEventCallbacks {
cb(e)
}
}
func (s *WebSocketService) OnTradeSnapshotEvent(cb func(e TradeSnapshotEvent)) {
s.tradeSnapshotEventCallbacks = append(s.tradeSnapshotEventCallbacks, cb)
}
func (s *WebSocketService) EmitTradeSnapshotEvent(e TradeSnapshotEvent) {
for _, cb := range s.tradeSnapshotEventCallbacks {
cb(e)
}
}
func (s *WebSocketService) OnOrderUpdateEvent(cb func(e OrderUpdateEvent)) {
s.orderUpdateEventCallbacks = append(s.orderUpdateEventCallbacks, cb)
}
func (s *WebSocketService) EmitOrderUpdateEvent(e OrderUpdateEvent) {
for _, cb := range s.orderUpdateEventCallbacks {
cb(e)
}
}
func (s *WebSocketService) OnOrderSnapshotEvent(cb func(e OrderSnapshotEvent)) {
s.orderSnapshotEventCallbacks = append(s.orderSnapshotEventCallbacks, cb)
}
func (s *WebSocketService) EmitOrderSnapshotEvent(e OrderSnapshotEvent) {
for _, cb := range s.orderSnapshotEventCallbacks {
cb(e)
}
}
func (s *WebSocketService) OnAccountSnapshotEvent(cb func(e AccountSnapshotEvent)) {
s.accountSnapshotEventCallbacks = append(s.accountSnapshotEventCallbacks, cb)
}
func (s *WebSocketService) EmitAccountSnapshotEvent(e AccountSnapshotEvent) {
for _, cb := range s.accountSnapshotEventCallbacks {
cb(e)
}
}
func (s *WebSocketService) OnAccountUpdateEvent(cb func(e AccountUpdateEvent)) {
s.accountUpdateEventCallbacks = append(s.accountUpdateEventCallbacks, cb)
}
func (s *WebSocketService) EmitAccountUpdateEvent(e AccountUpdateEvent) {
for _, cb := range s.accountUpdateEventCallbacks {
cb(e)
}
}