From 16862e72087b7bd4a506603d878398e33e1087cb Mon Sep 17 00:00:00 2001 From: TonyQ Date: Wed, 22 Dec 2021 21:01:11 +0800 Subject: [PATCH] exchange/stream : implement booktickerupdate event for ftx and binance --- go.mod | 1 + go.sum | 4 ++ pkg/exchange/binance/convert.go | 3 +- pkg/exchange/binance/parse.go | 41 ++++++++++++++++++++ pkg/exchange/binance/stream.go | 8 ++++ pkg/exchange/binance/stream_callbacks.go | 12 ++++++ pkg/exchange/ftx/stream.go | 7 +++- pkg/exchange/ftx/stream_message_handler.go | 30 +++++++++++++++ pkg/exchange/ftx/websocket_messages.go | 44 ++++++++++++++++++++++ pkg/types/bookticker.go | 22 +++++++++++ pkg/types/standardstream_callbacks.go | 12 ++++++ pkg/types/stream.go | 3 ++ 12 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 pkg/types/bookticker.go diff --git a/go.mod b/go.mod index 2a5b2f218..0a73d6a9b 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ go 1.13 require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/adshao/go-binance/v2 v2.3.2 + github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11 // indirect github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9 github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect diff --git a/go.sum b/go.sum index 6ffb010cb..63a854071 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11 h1:RzBf5LlDphNVpr28T+7P4RE6zzAWA8EliTf8WhDaNFE= +github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11/go.mod h1:LKqRir4fL00uSbKpY3L2Tx8Uu65QrpbrZeKcYfZqPDE= github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9 h1:Wlr5DjDOf5Kygoo0LoUthxwAhNwLEXMWHqCKXbMHCsw= github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -434,6 +436,7 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -528,6 +531,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/exchange/binance/convert.go b/pkg/exchange/binance/convert.go index 2d28d2360..b9c44a38a 100644 --- a/pkg/exchange/binance/convert.go +++ b/pkg/exchange/binance/convert.go @@ -498,9 +498,10 @@ func convertSubscription(s types.Subscription) string { switch s.Channel { case types.KLineChannel: return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String()) - case types.BookChannel: return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol)) + case types.BookTickerChannel: + return fmt.Sprintf("%s@bookTicker", strings.ToLower(s.Symbol)) } return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel) diff --git a/pkg/exchange/binance/parse.go b/pkg/exchange/binance/parse.go index ec7ccb14c..76aa64be2 100644 --- a/pkg/exchange/binance/parse.go +++ b/pkg/exchange/binance/parse.go @@ -264,12 +264,20 @@ func ParseEvent(message string) (interface{}, error) { } eventType := string(val.GetStringBytes("e")) + if eventType == "" && IsBookTicker(val) { + eventType = "bookticker" + } switch eventType { case "kline": var event KLineEvent err := json.Unmarshal([]byte(message), &event) return &event, err + case "bookticker": + var event BookTickerEvent + err := json.Unmarshal([]byte(message), &event) + event.Event = eventType + return &event, err case "outboundAccountPosition": var event OutboundAccountPositionEvent @@ -320,6 +328,14 @@ func ParseEvent(message string) (interface{}, error) { return nil, fmt.Errorf("unsupported message: %s", message) } +// IsBookTicker document ref :https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams +//use key recognition because there's no identify in the content. +func IsBookTicker(val *fastjson.Value) bool { + return !val.Exists("e") && val.Exists("u") && + val.Exists("s") && val.Exists("b") && + val.Exists("B") && val.Exists("a") && val.Exists("A") +} + type DepthEntry struct { PriceLevel string Quantity string @@ -718,3 +734,28 @@ type EventBase struct { Event string `json:"e"` // event Time int64 `json:"E"` } + +type BookTickerEvent struct { + EventBase + Symbol string `json:"s"` + Buy fixedpoint.Value `json:"b"` + BuySize fixedpoint.Value `json:"B"` + Sell fixedpoint.Value `json:"a"` + SellSize fixedpoint.Value `json:"A"` + //"u":400900217, // order book updateId + //"s":"BNBUSDT", // symbol + //"b":"25.35190000", // best bid price + //"B":"31.21000000", // best bid qty + //"a":"25.36520000", // best ask price + //"A":"40.66000000" // best ask qty +} + +func (k *BookTickerEvent) BookTicker() types.BookTicker { + return types.BookTicker{ + Symbol: k.Symbol, + Buy: k.Buy, + BuySize: k.BuySize, + Sell: k.Sell, + SellSize: k.SellSize, + } +} diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index ced2d9e24..50f308eae 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -88,6 +88,7 @@ type Stream struct { outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent) outboundAccountPositionEventCallbacks []func(event *OutboundAccountPositionEvent) executionReportEventCallbacks []func(event *ExecutionReportEvent) + bookTickerEventCallbacks []func(event *BookTickerEvent) orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent) @@ -182,6 +183,10 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { } }) + stream.OnBookTickerEvent(func(e *BookTickerEvent) { + stream.EmitBookTickerUpdate(e.BookTicker()) + }) + stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) { switch e.CurrentExecutionType { @@ -608,6 +613,9 @@ func (s *Stream) read(ctx context.Context) { case *KLineEvent: s.EmitKLineEvent(e) + case *BookTickerEvent: + s.EmitBookTickerEvent(e) + case *DepthEvent: s.EmitDepthEvent(e) diff --git a/pkg/exchange/binance/stream_callbacks.go b/pkg/exchange/binance/stream_callbacks.go index 5c7c3c051..a5b533123 100644 --- a/pkg/exchange/binance/stream_callbacks.go +++ b/pkg/exchange/binance/stream_callbacks.go @@ -104,6 +104,16 @@ func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) { } } +func (s *Stream) OnBookTickerEvent(cb func(event *BookTickerEvent)) { + s.bookTickerEventCallbacks = append(s.bookTickerEventCallbacks, cb) +} + +func (s *Stream) EmitBookTickerEvent(event *BookTickerEvent) { + for _, cb := range s.bookTickerEventCallbacks { + cb(event) + } +} + func (s *Stream) OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) { s.orderTradeUpdateEventCallbacks = append(s.orderTradeUpdateEventCallbacks, cb) } @@ -135,5 +145,7 @@ type StreamEventHub interface { OnExecutionReportEvent(cb func(event *ExecutionReportEvent)) + OnBookTickerEvent(cb func(event *BookTickerEvent)) + OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) } diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index bb35d7761..ef5e1fe37 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -123,7 +123,12 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su Channel: orderBookChannel, Market: toLocalSymbol(TrimUpperString(symbol)), }) - + } else if channel == types.BookTickerChannel { + s.addSubscription(websocketRequest{ + Operation: subscribe, + Channel: bookTickerChannel, + Market: toLocalSymbol(TrimUpperString(symbol)), + }) } else if channel == types.KLineChannel { // FTX does not support kline channel, do polling interval := types.Interval(option.Interval) diff --git a/pkg/exchange/ftx/stream_message_handler.go b/pkg/exchange/ftx/stream_message_handler.go index 894abce65..13f32295d 100644 --- a/pkg/exchange/ftx/stream_message_handler.go +++ b/pkg/exchange/ftx/stream_message_handler.go @@ -29,6 +29,8 @@ func (h *messageHandler) handleMessage(message []byte) { switch r.Channel { case orderBookChannel: h.handleOrderBook(r) + case bookTickerChannel: + h.handleBookTicker(r) case privateOrdersChannel: h.handlePrivateOrders(r) case privateTradesChannel: @@ -81,6 +83,34 @@ func (h *messageHandler) handleOrderBook(response websocketResponse) { } } +func (h *messageHandler) handleBookTicker(response websocketResponse) { + if response.Type == subscribedRespType { + h.handleSubscribedMessage(response) + return + } + + r, err := response.toBookTickerResponse() + if err != nil { + logger.WithError(err).Errorf("failed to convert the book ticker") + return + } + + globalBookTicker, err := toGlobalBookTicker(r) + if err != nil { + logger.WithError(err).Errorf("failed to generate book ticker") + return + } + + switch r.Type { + case updateRespType: + // emit updates, not the whole orderbook + h.EmitBookTickerUpdate(globalBookTicker) + default: + logger.Errorf("unsupported book ticker data type %s", r.Type) + return + } +} + func (h *messageHandler) handlePrivateOrders(response websocketResponse) { if response.Type == subscribedRespType { h.handleSubscribedMessage(response) diff --git a/pkg/exchange/ftx/websocket_messages.go b/pkg/exchange/ftx/websocket_messages.go index 1dbaad05a..217a6b90e 100644 --- a/pkg/exchange/ftx/websocket_messages.go +++ b/pkg/exchange/ftx/websocket_messages.go @@ -23,6 +23,7 @@ const unsubscribe operation = "unsubscribe" type channel string const orderBookChannel channel = "orderbook" +const bookTickerChannel channel = "ticker" const privateOrdersChannel channel = "orders" const privateTradesChannel channel = "fills" @@ -194,6 +195,24 @@ func (r websocketResponse) toErrResponse() errResponse { } } +//sample :{"bid": 49194.0, "ask": 49195.0, "bidSize": 0.0775, "askSize": 0.0247, "last": 49200.0, "time": 1640171788.9339821} +func (r websocketResponse) toBookTickerResponse() (bookTickerResponse, error) { + if r.Channel != bookTickerChannel { + return bookTickerResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion) + } + + var o bookTickerResponse + if err := json.Unmarshal(r.Data, &o); err != nil { + return bookTickerResponse{}, err + } + + o.mandatoryFields = r.mandatoryFields + o.Market = r.Market + o.Timestamp = nanoToTime(o.Time) + + return o, nil +} + func (r websocketResponse) toPublicOrderBookResponse() (orderBookResponse, error) { if r.Channel != orderBookChannel { return orderBookResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion) @@ -236,6 +255,18 @@ type orderBookResponse struct { Asks [][]json.Number `json:"asks"` } +type bookTickerResponse struct { + mandatoryFields + Market string `json:"market"` + Bid fixedpoint.Value `json:"bid"` + Ask fixedpoint.Value `json:"ask"` + BidSize fixedpoint.Value `json:"bidSize"` + AskSize fixedpoint.Value `json:"askSize"` + Last fixedpoint.Value `json:"last"` + Time float64 `json:"time"` + Timestamp time.Time +} + // only 100 orders so we use linear search here func (r *orderBookResponse) update(orderUpdates orderBookResponse) { r.Checksum = orderUpdates.Checksum @@ -369,6 +400,19 @@ func toGlobalOrderBook(r orderBookResponse) (types.SliceOrderBook, error) { }, nil } +func toGlobalBookTicker(r bookTickerResponse) (types.BookTicker, error) { + return types.BookTicker{ + // ex. BTC/USDT + Symbol: toGlobalSymbol(strings.ToUpper(r.Market)), + //Time: r.Timestamp, + Buy: r.Bid, + BuySize: r.BidSize, + Sell: r.Ask, + SellSize: r.AskSize, + //Last: r.Last, + }, nil +} + func toPriceVolumeSlice(orders [][]json.Number) (types.PriceVolumeSlice, error) { var pv types.PriceVolumeSlice for _, o := range orders { diff --git a/pkg/types/bookticker.go b/pkg/types/bookticker.go new file mode 100644 index 000000000..5bb6b4e33 --- /dev/null +++ b/pkg/types/bookticker.go @@ -0,0 +1,22 @@ +package types + +import ( + "fmt" + "github.com/c9s/bbgo/pkg/fixedpoint" +) + +// BookTicker time exists in ftx, not in binance +// last exists in ftx, not in binance +type BookTicker struct { + //Time time.Time + Symbol string + Buy fixedpoint.Value // `buy` from Max, `bidPrice` from binance + BuySize fixedpoint.Value + Sell fixedpoint.Value // `sell` from Max, `askPrice` from binance + SellSize fixedpoint.Value + //Last fixedpoint.Value +} + +func (b BookTicker) String() string { + return fmt.Sprintf("BookTicker { Symbol: %s,Buy: %f , BuySize: %f, Sell: %f, SellSize :%f } ", b.Symbol, b.Buy.Float64(), b.BuySize.Float64(), b.Sell.Float64(), b.SellSize.Float64()) +} diff --git a/pkg/types/standardstream_callbacks.go b/pkg/types/standardstream_callbacks.go index 52f7760fe..1be8fdb2b 100644 --- a/pkg/types/standardstream_callbacks.go +++ b/pkg/types/standardstream_callbacks.go @@ -104,6 +104,16 @@ func (stream *StandardStream) EmitBookUpdate(book SliceOrderBook) { } } +func (stream *StandardStream) OnBookTickerUpdate(cb func(bookTicker BookTicker)) { + stream.bookTickerUpdateCallbacks = append(stream.bookTickerUpdateCallbacks, cb) +} + +func (stream *StandardStream) EmitBookTickerUpdate(bookTicker BookTicker) { + for _, cb := range stream.bookTickerUpdateCallbacks { + cb(bookTicker) + } +} + func (stream *StandardStream) OnBookSnapshot(cb func(book SliceOrderBook)) { stream.bookSnapshotCallbacks = append(stream.bookSnapshotCallbacks, cb) } @@ -155,6 +165,8 @@ type StandardStreamEventHub interface { OnBookUpdate(cb func(book SliceOrderBook)) + OnBookTickerUpdate(cb func(bookTicker BookTicker)) + OnBookSnapshot(cb func(book SliceOrderBook)) OnPositionUpdate(cb func(position PositionMap)) diff --git a/pkg/types/stream.go b/pkg/types/stream.go index 7d10d905a..19d278426 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -20,6 +20,7 @@ type Channel string var BookChannel = Channel("book") var KLineChannel = Channel("kline") +var BookTickerChannel = Channel("bookticker") //go:generate callbackgen -type StandardStream -interface type StandardStream struct { @@ -50,6 +51,8 @@ type StandardStream struct { bookUpdateCallbacks []func(book SliceOrderBook) + bookTickerUpdateCallbacks []func(bookTicker BookTicker) + bookSnapshotCallbacks []func(book SliceOrderBook) // Futures