exchange/stream : implement booktickerupdate event for ftx and

binance
This commit is contained in:
TonyQ 2021-12-22 21:01:11 +08:00
parent 388cfe0854
commit 16862e7208
12 changed files with 185 additions and 2 deletions

1
go.mod
View File

@ -7,6 +7,7 @@ go 1.13
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/adshao/go-binance/v2 v2.3.2
github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11 // indirect
github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9
github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect

4
go.sum
View File

@ -34,6 +34,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI=
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11 h1:RzBf5LlDphNVpr28T+7P4RE6zzAWA8EliTf8WhDaNFE=
github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11/go.mod h1:LKqRir4fL00uSbKpY3L2Tx8Uu65QrpbrZeKcYfZqPDE=
github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9 h1:Wlr5DjDOf5Kygoo0LoUthxwAhNwLEXMWHqCKXbMHCsw=
github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -434,6 +436,7 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -528,6 +531,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -498,9 +498,10 @@ func convertSubscription(s types.Subscription) string {
switch s.Channel {
case types.KLineChannel:
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case types.BookChannel:
return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol))
case types.BookTickerChannel:
return fmt.Sprintf("%s@bookTicker", strings.ToLower(s.Symbol))
}
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)

View File

@ -264,12 +264,20 @@ func ParseEvent(message string) (interface{}, error) {
}
eventType := string(val.GetStringBytes("e"))
if eventType == "" && IsBookTicker(val) {
eventType = "bookticker"
}
switch eventType {
case "kline":
var event KLineEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
case "bookticker":
var event BookTickerEvent
err := json.Unmarshal([]byte(message), &event)
event.Event = eventType
return &event, err
case "outboundAccountPosition":
var event OutboundAccountPositionEvent
@ -320,6 +328,14 @@ func ParseEvent(message string) (interface{}, error) {
return nil, fmt.Errorf("unsupported message: %s", message)
}
// IsBookTicker document ref :https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
//use key recognition because there's no identify in the content.
func IsBookTicker(val *fastjson.Value) bool {
return !val.Exists("e") && val.Exists("u") &&
val.Exists("s") && val.Exists("b") &&
val.Exists("B") && val.Exists("a") && val.Exists("A")
}
type DepthEntry struct {
PriceLevel string
Quantity string
@ -718,3 +734,28 @@ type EventBase struct {
Event string `json:"e"` // event
Time int64 `json:"E"`
}
type BookTickerEvent struct {
EventBase
Symbol string `json:"s"`
Buy fixedpoint.Value `json:"b"`
BuySize fixedpoint.Value `json:"B"`
Sell fixedpoint.Value `json:"a"`
SellSize fixedpoint.Value `json:"A"`
//"u":400900217, // order book updateId
//"s":"BNBUSDT", // symbol
//"b":"25.35190000", // best bid price
//"B":"31.21000000", // best bid qty
//"a":"25.36520000", // best ask price
//"A":"40.66000000" // best ask qty
}
func (k *BookTickerEvent) BookTicker() types.BookTicker {
return types.BookTicker{
Symbol: k.Symbol,
Buy: k.Buy,
BuySize: k.BuySize,
Sell: k.Sell,
SellSize: k.SellSize,
}
}

View File

@ -88,6 +88,7 @@ type Stream struct {
outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent)
outboundAccountPositionEventCallbacks []func(event *OutboundAccountPositionEvent)
executionReportEventCallbacks []func(event *ExecutionReportEvent)
bookTickerEventCallbacks []func(event *BookTickerEvent)
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
@ -182,6 +183,10 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
}
})
stream.OnBookTickerEvent(func(e *BookTickerEvent) {
stream.EmitBookTickerUpdate(e.BookTicker())
})
stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) {
switch e.CurrentExecutionType {
@ -608,6 +613,9 @@ func (s *Stream) read(ctx context.Context) {
case *KLineEvent:
s.EmitKLineEvent(e)
case *BookTickerEvent:
s.EmitBookTickerEvent(e)
case *DepthEvent:
s.EmitDepthEvent(e)

View File

@ -104,6 +104,16 @@ func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) {
}
}
func (s *Stream) OnBookTickerEvent(cb func(event *BookTickerEvent)) {
s.bookTickerEventCallbacks = append(s.bookTickerEventCallbacks, cb)
}
func (s *Stream) EmitBookTickerEvent(event *BookTickerEvent) {
for _, cb := range s.bookTickerEventCallbacks {
cb(event)
}
}
func (s *Stream) OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) {
s.orderTradeUpdateEventCallbacks = append(s.orderTradeUpdateEventCallbacks, cb)
}
@ -135,5 +145,7 @@ type StreamEventHub interface {
OnExecutionReportEvent(cb func(event *ExecutionReportEvent))
OnBookTickerEvent(cb func(event *BookTickerEvent))
OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent))
}

View File

@ -123,7 +123,12 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su
Channel: orderBookChannel,
Market: toLocalSymbol(TrimUpperString(symbol)),
})
} else if channel == types.BookTickerChannel {
s.addSubscription(websocketRequest{
Operation: subscribe,
Channel: bookTickerChannel,
Market: toLocalSymbol(TrimUpperString(symbol)),
})
} else if channel == types.KLineChannel {
// FTX does not support kline channel, do polling
interval := types.Interval(option.Interval)

View File

@ -29,6 +29,8 @@ func (h *messageHandler) handleMessage(message []byte) {
switch r.Channel {
case orderBookChannel:
h.handleOrderBook(r)
case bookTickerChannel:
h.handleBookTicker(r)
case privateOrdersChannel:
h.handlePrivateOrders(r)
case privateTradesChannel:
@ -81,6 +83,34 @@ func (h *messageHandler) handleOrderBook(response websocketResponse) {
}
}
func (h *messageHandler) handleBookTicker(response websocketResponse) {
if response.Type == subscribedRespType {
h.handleSubscribedMessage(response)
return
}
r, err := response.toBookTickerResponse()
if err != nil {
logger.WithError(err).Errorf("failed to convert the book ticker")
return
}
globalBookTicker, err := toGlobalBookTicker(r)
if err != nil {
logger.WithError(err).Errorf("failed to generate book ticker")
return
}
switch r.Type {
case updateRespType:
// emit updates, not the whole orderbook
h.EmitBookTickerUpdate(globalBookTicker)
default:
logger.Errorf("unsupported book ticker data type %s", r.Type)
return
}
}
func (h *messageHandler) handlePrivateOrders(response websocketResponse) {
if response.Type == subscribedRespType {
h.handleSubscribedMessage(response)

View File

@ -23,6 +23,7 @@ const unsubscribe operation = "unsubscribe"
type channel string
const orderBookChannel channel = "orderbook"
const bookTickerChannel channel = "ticker"
const privateOrdersChannel channel = "orders"
const privateTradesChannel channel = "fills"
@ -194,6 +195,24 @@ func (r websocketResponse) toErrResponse() errResponse {
}
}
//sample :{"bid": 49194.0, "ask": 49195.0, "bidSize": 0.0775, "askSize": 0.0247, "last": 49200.0, "time": 1640171788.9339821}
func (r websocketResponse) toBookTickerResponse() (bookTickerResponse, error) {
if r.Channel != bookTickerChannel {
return bookTickerResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion)
}
var o bookTickerResponse
if err := json.Unmarshal(r.Data, &o); err != nil {
return bookTickerResponse{}, err
}
o.mandatoryFields = r.mandatoryFields
o.Market = r.Market
o.Timestamp = nanoToTime(o.Time)
return o, nil
}
func (r websocketResponse) toPublicOrderBookResponse() (orderBookResponse, error) {
if r.Channel != orderBookChannel {
return orderBookResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion)
@ -236,6 +255,18 @@ type orderBookResponse struct {
Asks [][]json.Number `json:"asks"`
}
type bookTickerResponse struct {
mandatoryFields
Market string `json:"market"`
Bid fixedpoint.Value `json:"bid"`
Ask fixedpoint.Value `json:"ask"`
BidSize fixedpoint.Value `json:"bidSize"`
AskSize fixedpoint.Value `json:"askSize"`
Last fixedpoint.Value `json:"last"`
Time float64 `json:"time"`
Timestamp time.Time
}
// only 100 orders so we use linear search here
func (r *orderBookResponse) update(orderUpdates orderBookResponse) {
r.Checksum = orderUpdates.Checksum
@ -369,6 +400,19 @@ func toGlobalOrderBook(r orderBookResponse) (types.SliceOrderBook, error) {
}, nil
}
func toGlobalBookTicker(r bookTickerResponse) (types.BookTicker, error) {
return types.BookTicker{
// ex. BTC/USDT
Symbol: toGlobalSymbol(strings.ToUpper(r.Market)),
//Time: r.Timestamp,
Buy: r.Bid,
BuySize: r.BidSize,
Sell: r.Ask,
SellSize: r.AskSize,
//Last: r.Last,
}, nil
}
func toPriceVolumeSlice(orders [][]json.Number) (types.PriceVolumeSlice, error) {
var pv types.PriceVolumeSlice
for _, o := range orders {

22
pkg/types/bookticker.go Normal file
View File

@ -0,0 +1,22 @@
package types
import (
"fmt"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
// BookTicker time exists in ftx, not in binance
// last exists in ftx, not in binance
type BookTicker struct {
//Time time.Time
Symbol string
Buy fixedpoint.Value // `buy` from Max, `bidPrice` from binance
BuySize fixedpoint.Value
Sell fixedpoint.Value // `sell` from Max, `askPrice` from binance
SellSize fixedpoint.Value
//Last fixedpoint.Value
}
func (b BookTicker) String() string {
return fmt.Sprintf("BookTicker { Symbol: %s,Buy: %f , BuySize: %f, Sell: %f, SellSize :%f } ", b.Symbol, b.Buy.Float64(), b.BuySize.Float64(), b.Sell.Float64(), b.SellSize.Float64())
}

View File

@ -104,6 +104,16 @@ func (stream *StandardStream) EmitBookUpdate(book SliceOrderBook) {
}
}
func (stream *StandardStream) OnBookTickerUpdate(cb func(bookTicker BookTicker)) {
stream.bookTickerUpdateCallbacks = append(stream.bookTickerUpdateCallbacks, cb)
}
func (stream *StandardStream) EmitBookTickerUpdate(bookTicker BookTicker) {
for _, cb := range stream.bookTickerUpdateCallbacks {
cb(bookTicker)
}
}
func (stream *StandardStream) OnBookSnapshot(cb func(book SliceOrderBook)) {
stream.bookSnapshotCallbacks = append(stream.bookSnapshotCallbacks, cb)
}
@ -155,6 +165,8 @@ type StandardStreamEventHub interface {
OnBookUpdate(cb func(book SliceOrderBook))
OnBookTickerUpdate(cb func(bookTicker BookTicker))
OnBookSnapshot(cb func(book SliceOrderBook))
OnPositionUpdate(cb func(position PositionMap))

View File

@ -20,6 +20,7 @@ type Channel string
var BookChannel = Channel("book")
var KLineChannel = Channel("kline")
var BookTickerChannel = Channel("bookticker")
//go:generate callbackgen -type StandardStream -interface
type StandardStream struct {
@ -50,6 +51,8 @@ type StandardStream struct {
bookUpdateCallbacks []func(book SliceOrderBook)
bookTickerUpdateCallbacks []func(bookTicker BookTicker)
bookSnapshotCallbacks []func(book SliceOrderBook)
// Futures