From 1bbed2e47740dc75b696d8bf51cd862fad046030 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 3 Oct 2020 20:09:22 +0800 Subject: [PATCH] support binance depth event parsing --- bbgo/account.go | 2 +- bbgo/exchange/binance/exchange.go | 2 +- bbgo/exchange/binance/parse.go | 70 +++++++++++++++++++++++ bbgo/exchange/binance/stream.go | 43 ++++++++------ bbgo/exchange/binance/stream_callbacks.go | 12 ++++ bbgo/store.go | 2 +- bbgo/trader.go | 8 +-- bbgo/types/exchange.go | 2 +- bbgo/types/orderbook.go | 2 +- bbgo/types/stream.go | 2 +- 10 files changed, 118 insertions(+), 27 deletions(-) diff --git a/bbgo/account.go b/bbgo/account.go index a05828cad..d1a9c0058 100644 --- a/bbgo/account.go +++ b/bbgo/account.go @@ -24,7 +24,7 @@ func LoadAccount(ctx context.Context, exchange *binance.Exchange) (*Account, err }, err } -func (a *Account) BindPrivateStream(stream types.PrivateStream) { +func (a *Account) BindPrivateStream(stream types.Stream) { stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) { a.mu.Lock() defer a.mu.Unlock() diff --git a/bbgo/exchange/binance/exchange.go b/bbgo/exchange/binance/exchange.go index aa89fa236..42aa2c8e8 100644 --- a/bbgo/exchange/binance/exchange.go +++ b/bbgo/exchange/binance/exchange.go @@ -42,7 +42,7 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6 return util.MustParseFloat(resp.Price), nil } -func (e *Exchange) NewPrivateStream() (types.PrivateStream, error) { +func (e *Exchange) NewStream() types.Stream { return NewStream(e.Client) } diff --git a/bbgo/exchange/binance/parse.go b/bbgo/exchange/binance/parse.go index 3345a34c7..00312b149 100644 --- a/bbgo/exchange/binance/parse.go +++ b/bbgo/exchange/binance/parse.go @@ -232,6 +232,9 @@ func ParseEvent(message string) (interface{}, error) { err := json.Unmarshal([]byte(message), &event) return &event, err + case "depthUpdate": + return parseDepthEvent(val) + default: id := val.GetInt("id") if id > 0 { @@ -242,6 +245,73 @@ func ParseEvent(message string) (interface{}, error) { return nil, fmt.Errorf("unsupported message: %s", message) } +type DepthEntry struct { + PriceLevel string + Quantity string +} + +type DepthEvent struct { + EventBase + + Symbol string `json:"s"` + FirstUpdateID int64 `json:"U"` + FinalUpdateID int64 `json:"u"` + + Bids []DepthEntry + Asks []DepthEntry +} + +func parseDepthEntry(val *fastjson.Value) (*DepthEntry, error) { + arr, err := val.Array() + if err != nil { + return nil, err + } + + if len(arr) < 2 { + return nil, errors.New("incorrect depth entry element length") + } + + return &DepthEntry{ + PriceLevel: string(arr[0].GetStringBytes()), + Quantity: string(arr[1].GetStringBytes()), + }, nil +} + +func parseDepthEvent(val *fastjson.Value) (*DepthEvent, error) { + var err error + var depth = &DepthEvent{ + EventBase: EventBase{ + Event: string(val.GetStringBytes("e")), + Time: val.GetInt64("E"), + }, + Symbol: string(val.GetStringBytes("s")), + FirstUpdateID: val.GetInt64("U"), + FinalUpdateID: val.GetInt64("u"), + } + + for _, ev := range val.GetArray("b") { + entry, err2 := parseDepthEntry(ev) + if err2 != nil { + err = err2 + continue + } + + depth.Bids = append(depth.Bids, *entry) + } + + for _, ev := range val.GetArray("a") { + entry, err2 := parseDepthEntry(ev) + if err2 != nil { + err = err2 + continue + } + + depth.Asks = append(depth.Asks, *entry) + } + + return depth, err +} + type KLine struct { StartTime int64 `json:"t"` EndTime int64 `json:"T"` diff --git a/bbgo/exchange/binance/stream.go b/bbgo/exchange/binance/stream.go index 08d8720f9..4ce351f3d 100644 --- a/bbgo/exchange/binance/stream.go +++ b/bbgo/exchange/binance/stream.go @@ -32,6 +32,7 @@ type Stream struct { connectCallbacks []func(stream *Stream) // custom callbacks + depthEventCallbacks []func(e *DepthEvent) kLineEventCallbacks []func(e *KLineEvent) kLineClosedEventCallbacks []func(e *KLineEvent) @@ -40,7 +41,7 @@ type Stream struct { executionReportEventCallbacks []func(event *ExecutionReportEvent) } -func NewStream(client *binance.Client) (*Stream, error) { +func NewStream(client *binance.Client) *Stream { // binance BalanceUpdate = withdrawal or deposit changes /* stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) { @@ -91,9 +92,26 @@ func NewStream(client *binance.Client) (*Stream, error) { } }) - return stream, nil -} + stream.OnConnect(func(stream *Stream) { + var params []string + for _, subscription := range stream.Subscriptions { + params = append(params, convertSubscription(subscription)) + } + log.Infof("[binance] subscribing channels: %+v", params) + err := stream.Conn.WriteJSON(StreamRequest{ + Method: "SUBSCRIBE", + Params: params, + ID: 1, + }) + + if err != nil { + log.WithError(err).Error("subscribe error") + } + }) + + return stream +} func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { url := "wss://stream.binance.com:9443/ws/" + listenKey @@ -124,18 +142,7 @@ func (s *Stream) connect(ctx context.Context) error { s.Conn = conn s.EmitConnect(s) - - var params []string - for _, subscription := range s.Subscriptions { - params = append(params, convertSubscription(subscription)) - } - - log.Infof("[binance] subscribing channels: %+v", params) - return conn.WriteJSON(StreamRequest{ - Method: "SUBSCRIBE", - Params: params, - ID: 1, - }) + return nil } func convertSubscription(s types.Subscription) string { @@ -245,6 +252,10 @@ func (s *Stream) read(ctx context.Context) { log.Info(e.Event, " ", e.KLine, " ", e.KLine.Interval) s.EmitKLineEvent(e) + case *DepthEvent: + log.Info(e.Event, " ", "asks:", e.Asks, "bids:", e.Bids) + s.EmitDepthEvent(e) + case *ExecutionReportEvent: log.Info(e.Event, " ", e) s.EmitExecutionReportEvent(e) @@ -277,5 +288,3 @@ func maskListenKey(listenKey string) string { maskKey := listenKey[0:5] return maskKey + strings.Repeat("*", len(listenKey)-1-5) } - - diff --git a/bbgo/exchange/binance/stream_callbacks.go b/bbgo/exchange/binance/stream_callbacks.go index 9472ff264..2e340d408 100644 --- a/bbgo/exchange/binance/stream_callbacks.go +++ b/bbgo/exchange/binance/stream_callbacks.go @@ -14,6 +14,16 @@ func (s *Stream) EmitConnect(stream *Stream) { } } +func (s *Stream) OnDepthEvent(cb func(e *DepthEvent)) { + s.depthEventCallbacks = append(s.depthEventCallbacks, cb) +} + +func (s *Stream) EmitDepthEvent(e *DepthEvent) { + for _, cb := range s.depthEventCallbacks { + cb(e) + } +} + func (s *Stream) OnKLineEvent(cb func(e *KLineEvent)) { s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb) } @@ -67,6 +77,8 @@ func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) { type StreamEventHub interface { OnConnect(cb func(stream *Stream)) + OnDepthEvent(cb func(e *DepthEvent)) + OnKLineEvent(cb func(e *KLineEvent)) OnKLineClosedEvent(cb func(e *KLineEvent)) diff --git a/bbgo/store.go b/bbgo/store.go index c6fa00b4a..f9333a5cf 100644 --- a/bbgo/store.go +++ b/bbgo/store.go @@ -28,7 +28,7 @@ func NewMarketDataStore() *MarketDataStore { } } -func (store *MarketDataStore) BindPrivateStream(stream types.PrivateStream) { +func (store *MarketDataStore) BindPrivateStream(stream types.Stream) { stream.OnKLineClosed(store.handleKLineClosed) } diff --git a/bbgo/trader.go b/bbgo/trader.go index 6af52deb9..ac6f0e84f 100644 --- a/bbgo/trader.go +++ b/bbgo/trader.go @@ -20,7 +20,7 @@ import ( // MarketStrategy represents the single Exchange strategy type MarketStrategy interface { OnLoad(tradingContext *Context, trader types.Trader) error - OnNewStream(stream types.PrivateStream) error + OnNewStream(stream types.Stream) error } type ExchangeSession struct { @@ -28,7 +28,7 @@ type ExchangeSession struct { Account *Account - Stream types.PrivateStream + Stream types.Stream Subscriptions []types.Subscription @@ -181,7 +181,7 @@ func (trader *Trader) Connect(ctx context.Context) (err error) { return err } - session.Stream, err = session.Exchange.NewPrivateStream() + session.Stream = session.Exchange.NewStream() if err != nil { return err } @@ -354,7 +354,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy) return nil, err } - stream, err := trader.Exchange.NewPrivateStream() + stream := trader.Exchange.NewStream() if err != nil { return nil, err } diff --git a/bbgo/types/exchange.go b/bbgo/types/exchange.go index fbf12ad96..ce9a28f04 100644 --- a/bbgo/types/exchange.go +++ b/bbgo/types/exchange.go @@ -8,7 +8,7 @@ import ( type Exchange interface { PlatformFeeCurrency() string - NewPrivateStream() (PrivateStream, error) + NewStream() Stream QueryAccountBalances(ctx context.Context) (map[string]Balance, error) diff --git a/bbgo/types/orderbook.go b/bbgo/types/orderbook.go index 1d0879798..632bd9cae 100644 --- a/bbgo/types/orderbook.go +++ b/bbgo/types/orderbook.go @@ -227,7 +227,7 @@ func NewStreamBook(symbol string) *StreamOrderBook { } } -func (sb *StreamOrderBook) BindStream(stream PrivateStream) { +func (sb *StreamOrderBook) BindStream(stream Stream) { stream.OnBookSnapshot(func(book OrderBook) { sb.Load(book) sb.C.Emit() diff --git a/bbgo/types/stream.go b/bbgo/types/stream.go index ce6953233..bbe39f241 100644 --- a/bbgo/types/stream.go +++ b/bbgo/types/stream.go @@ -4,7 +4,7 @@ import ( "context" ) -type PrivateStream interface { +type Stream interface { StandardStreamEventHub Subscribe(channel Channel, symbol string, options SubscribeOptions)