mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 06:53:52 +00:00
Merge pull request #389 from tony1223/feature/388-bookticker
exchange/stream : implement booktickerupdate event for ftx and binance
This commit is contained in:
commit
bcbf7c3f3b
1
go.mod
1
go.mod
|
@ -7,6 +7,7 @@ go 1.13
|
|||
require (
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||
github.com/adshao/go-binance/v2 v2.3.2
|
||||
github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11 // indirect
|
||||
github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9
|
||||
github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482
|
||||
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -34,6 +34,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
|
|||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI=
|
||||
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
|
||||
github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11 h1:RzBf5LlDphNVpr28T+7P4RE6zzAWA8EliTf8WhDaNFE=
|
||||
github.com/c9s/callbackgen v0.0.0-20211221175315-609bae1eec11/go.mod h1:LKqRir4fL00uSbKpY3L2Tx8Uu65QrpbrZeKcYfZqPDE=
|
||||
github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9 h1:Wlr5DjDOf5Kygoo0LoUthxwAhNwLEXMWHqCKXbMHCsw=
|
||||
github.com/c9s/rockhopper v1.2.1-0.20210217093258-2661955904a9/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
|
@ -434,6 +436,7 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
|
|||
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
|
||||
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
|
||||
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
|
||||
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
|
@ -528,6 +531,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
|
|||
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE=
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
|
@ -498,9 +498,10 @@ func convertSubscription(s types.Subscription) string {
|
|||
switch s.Channel {
|
||||
case types.KLineChannel:
|
||||
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
|
||||
|
||||
case types.BookChannel:
|
||||
return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol))
|
||||
case types.BookTickerChannel:
|
||||
return fmt.Sprintf("%s@bookTicker", strings.ToLower(s.Symbol))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
|
||||
|
|
|
@ -264,12 +264,20 @@ func ParseEvent(message string) (interface{}, error) {
|
|||
}
|
||||
|
||||
eventType := string(val.GetStringBytes("e"))
|
||||
if eventType == "" && IsBookTicker(val) {
|
||||
eventType = "bookticker"
|
||||
}
|
||||
|
||||
switch eventType {
|
||||
case "kline":
|
||||
var event KLineEvent
|
||||
err := json.Unmarshal([]byte(message), &event)
|
||||
return &event, err
|
||||
case "bookticker":
|
||||
var event BookTickerEvent
|
||||
err := json.Unmarshal([]byte(message), &event)
|
||||
event.Event = eventType
|
||||
return &event, err
|
||||
|
||||
case "outboundAccountPosition":
|
||||
var event OutboundAccountPositionEvent
|
||||
|
@ -320,6 +328,14 @@ func ParseEvent(message string) (interface{}, error) {
|
|||
return nil, fmt.Errorf("unsupported message: %s", message)
|
||||
}
|
||||
|
||||
// IsBookTicker document ref :https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
|
||||
//use key recognition because there's no identify in the content.
|
||||
func IsBookTicker(val *fastjson.Value) bool {
|
||||
return !val.Exists("e") && val.Exists("u") &&
|
||||
val.Exists("s") && val.Exists("b") &&
|
||||
val.Exists("B") && val.Exists("a") && val.Exists("A")
|
||||
}
|
||||
|
||||
type DepthEntry struct {
|
||||
PriceLevel string
|
||||
Quantity string
|
||||
|
@ -718,3 +734,28 @@ type EventBase struct {
|
|||
Event string `json:"e"` // event
|
||||
Time int64 `json:"E"`
|
||||
}
|
||||
|
||||
type BookTickerEvent struct {
|
||||
EventBase
|
||||
Symbol string `json:"s"`
|
||||
Buy fixedpoint.Value `json:"b"`
|
||||
BuySize fixedpoint.Value `json:"B"`
|
||||
Sell fixedpoint.Value `json:"a"`
|
||||
SellSize fixedpoint.Value `json:"A"`
|
||||
//"u":400900217, // order book updateId
|
||||
//"s":"BNBUSDT", // symbol
|
||||
//"b":"25.35190000", // best bid price
|
||||
//"B":"31.21000000", // best bid qty
|
||||
//"a":"25.36520000", // best ask price
|
||||
//"A":"40.66000000" // best ask qty
|
||||
}
|
||||
|
||||
func (k *BookTickerEvent) BookTicker() types.BookTicker {
|
||||
return types.BookTicker{
|
||||
Symbol: k.Symbol,
|
||||
Buy: k.Buy,
|
||||
BuySize: k.BuySize,
|
||||
Sell: k.Sell,
|
||||
SellSize: k.SellSize,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ type Stream struct {
|
|||
outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent)
|
||||
outboundAccountPositionEventCallbacks []func(event *OutboundAccountPositionEvent)
|
||||
executionReportEventCallbacks []func(event *ExecutionReportEvent)
|
||||
bookTickerEventCallbacks []func(event *BookTickerEvent)
|
||||
|
||||
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
|
||||
|
||||
|
@ -182,6 +183,10 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
|
|||
}
|
||||
})
|
||||
|
||||
stream.OnBookTickerEvent(func(e *BookTickerEvent) {
|
||||
stream.EmitBookTickerUpdate(e.BookTicker())
|
||||
})
|
||||
|
||||
stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) {
|
||||
switch e.CurrentExecutionType {
|
||||
|
||||
|
@ -608,6 +613,9 @@ func (s *Stream) read(ctx context.Context) {
|
|||
case *KLineEvent:
|
||||
s.EmitKLineEvent(e)
|
||||
|
||||
case *BookTickerEvent:
|
||||
s.EmitBookTickerEvent(e)
|
||||
|
||||
case *DepthEvent:
|
||||
s.EmitDepthEvent(e)
|
||||
|
||||
|
|
|
@ -104,6 +104,16 @@ func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Stream) OnBookTickerEvent(cb func(event *BookTickerEvent)) {
|
||||
s.bookTickerEventCallbacks = append(s.bookTickerEventCallbacks, cb)
|
||||
}
|
||||
|
||||
func (s *Stream) EmitBookTickerEvent(event *BookTickerEvent) {
|
||||
for _, cb := range s.bookTickerEventCallbacks {
|
||||
cb(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) {
|
||||
s.orderTradeUpdateEventCallbacks = append(s.orderTradeUpdateEventCallbacks, cb)
|
||||
}
|
||||
|
@ -135,5 +145,7 @@ type StreamEventHub interface {
|
|||
|
||||
OnExecutionReportEvent(cb func(event *ExecutionReportEvent))
|
||||
|
||||
OnBookTickerEvent(cb func(event *BookTickerEvent))
|
||||
|
||||
OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent))
|
||||
}
|
||||
|
|
|
@ -123,7 +123,12 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su
|
|||
Channel: orderBookChannel,
|
||||
Market: toLocalSymbol(TrimUpperString(symbol)),
|
||||
})
|
||||
|
||||
} else if channel == types.BookTickerChannel {
|
||||
s.addSubscription(websocketRequest{
|
||||
Operation: subscribe,
|
||||
Channel: bookTickerChannel,
|
||||
Market: toLocalSymbol(TrimUpperString(symbol)),
|
||||
})
|
||||
} else if channel == types.KLineChannel {
|
||||
// FTX does not support kline channel, do polling
|
||||
interval := types.Interval(option.Interval)
|
||||
|
|
|
@ -29,6 +29,8 @@ func (h *messageHandler) handleMessage(message []byte) {
|
|||
switch r.Channel {
|
||||
case orderBookChannel:
|
||||
h.handleOrderBook(r)
|
||||
case bookTickerChannel:
|
||||
h.handleBookTicker(r)
|
||||
case privateOrdersChannel:
|
||||
h.handlePrivateOrders(r)
|
||||
case privateTradesChannel:
|
||||
|
@ -81,6 +83,34 @@ func (h *messageHandler) handleOrderBook(response websocketResponse) {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *messageHandler) handleBookTicker(response websocketResponse) {
|
||||
if response.Type == subscribedRespType {
|
||||
h.handleSubscribedMessage(response)
|
||||
return
|
||||
}
|
||||
|
||||
r, err := response.toBookTickerResponse()
|
||||
if err != nil {
|
||||
logger.WithError(err).Errorf("failed to convert the book ticker")
|
||||
return
|
||||
}
|
||||
|
||||
globalBookTicker, err := toGlobalBookTicker(r)
|
||||
if err != nil {
|
||||
logger.WithError(err).Errorf("failed to generate book ticker")
|
||||
return
|
||||
}
|
||||
|
||||
switch r.Type {
|
||||
case updateRespType:
|
||||
// emit updates, not the whole orderbook
|
||||
h.EmitBookTickerUpdate(globalBookTicker)
|
||||
default:
|
||||
logger.Errorf("unsupported book ticker data type %s", r.Type)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *messageHandler) handlePrivateOrders(response websocketResponse) {
|
||||
if response.Type == subscribedRespType {
|
||||
h.handleSubscribedMessage(response)
|
||||
|
|
|
@ -23,6 +23,7 @@ const unsubscribe operation = "unsubscribe"
|
|||
type channel string
|
||||
|
||||
const orderBookChannel channel = "orderbook"
|
||||
const bookTickerChannel channel = "ticker"
|
||||
const privateOrdersChannel channel = "orders"
|
||||
const privateTradesChannel channel = "fills"
|
||||
|
||||
|
@ -194,6 +195,24 @@ func (r websocketResponse) toErrResponse() errResponse {
|
|||
}
|
||||
}
|
||||
|
||||
//sample :{"bid": 49194.0, "ask": 49195.0, "bidSize": 0.0775, "askSize": 0.0247, "last": 49200.0, "time": 1640171788.9339821}
|
||||
func (r websocketResponse) toBookTickerResponse() (bookTickerResponse, error) {
|
||||
if r.Channel != bookTickerChannel {
|
||||
return bookTickerResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion)
|
||||
}
|
||||
|
||||
var o bookTickerResponse
|
||||
if err := json.Unmarshal(r.Data, &o); err != nil {
|
||||
return bookTickerResponse{}, err
|
||||
}
|
||||
|
||||
o.mandatoryFields = r.mandatoryFields
|
||||
o.Market = r.Market
|
||||
o.Timestamp = nanoToTime(o.Time)
|
||||
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (r websocketResponse) toPublicOrderBookResponse() (orderBookResponse, error) {
|
||||
if r.Channel != orderBookChannel {
|
||||
return orderBookResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion)
|
||||
|
@ -236,6 +255,18 @@ type orderBookResponse struct {
|
|||
Asks [][]json.Number `json:"asks"`
|
||||
}
|
||||
|
||||
type bookTickerResponse struct {
|
||||
mandatoryFields
|
||||
Market string `json:"market"`
|
||||
Bid fixedpoint.Value `json:"bid"`
|
||||
Ask fixedpoint.Value `json:"ask"`
|
||||
BidSize fixedpoint.Value `json:"bidSize"`
|
||||
AskSize fixedpoint.Value `json:"askSize"`
|
||||
Last fixedpoint.Value `json:"last"`
|
||||
Time float64 `json:"time"`
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// only 100 orders so we use linear search here
|
||||
func (r *orderBookResponse) update(orderUpdates orderBookResponse) {
|
||||
r.Checksum = orderUpdates.Checksum
|
||||
|
@ -369,6 +400,19 @@ func toGlobalOrderBook(r orderBookResponse) (types.SliceOrderBook, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func toGlobalBookTicker(r bookTickerResponse) (types.BookTicker, error) {
|
||||
return types.BookTicker{
|
||||
// ex. BTC/USDT
|
||||
Symbol: toGlobalSymbol(strings.ToUpper(r.Market)),
|
||||
//Time: r.Timestamp,
|
||||
Buy: r.Bid,
|
||||
BuySize: r.BidSize,
|
||||
Sell: r.Ask,
|
||||
SellSize: r.AskSize,
|
||||
//Last: r.Last,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func toPriceVolumeSlice(orders [][]json.Number) (types.PriceVolumeSlice, error) {
|
||||
var pv types.PriceVolumeSlice
|
||||
for _, o := range orders {
|
||||
|
|
22
pkg/types/bookticker.go
Normal file
22
pkg/types/bookticker.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
)
|
||||
|
||||
// BookTicker time exists in ftx, not in binance
|
||||
// last exists in ftx, not in binance
|
||||
type BookTicker struct {
|
||||
//Time time.Time
|
||||
Symbol string
|
||||
Buy fixedpoint.Value // `buy` from Max, `bidPrice` from binance
|
||||
BuySize fixedpoint.Value
|
||||
Sell fixedpoint.Value // `sell` from Max, `askPrice` from binance
|
||||
SellSize fixedpoint.Value
|
||||
//Last fixedpoint.Value
|
||||
}
|
||||
|
||||
func (b BookTicker) String() string {
|
||||
return fmt.Sprintf("BookTicker { Symbol: %s,Buy: %f , BuySize: %f, Sell: %f, SellSize :%f } ", b.Symbol, b.Buy.Float64(), b.BuySize.Float64(), b.Sell.Float64(), b.SellSize.Float64())
|
||||
}
|
|
@ -104,6 +104,16 @@ func (stream *StandardStream) EmitBookUpdate(book SliceOrderBook) {
|
|||
}
|
||||
}
|
||||
|
||||
func (stream *StandardStream) OnBookTickerUpdate(cb func(bookTicker BookTicker)) {
|
||||
stream.bookTickerUpdateCallbacks = append(stream.bookTickerUpdateCallbacks, cb)
|
||||
}
|
||||
|
||||
func (stream *StandardStream) EmitBookTickerUpdate(bookTicker BookTicker) {
|
||||
for _, cb := range stream.bookTickerUpdateCallbacks {
|
||||
cb(bookTicker)
|
||||
}
|
||||
}
|
||||
|
||||
func (stream *StandardStream) OnBookSnapshot(cb func(book SliceOrderBook)) {
|
||||
stream.bookSnapshotCallbacks = append(stream.bookSnapshotCallbacks, cb)
|
||||
}
|
||||
|
@ -155,6 +165,8 @@ type StandardStreamEventHub interface {
|
|||
|
||||
OnBookUpdate(cb func(book SliceOrderBook))
|
||||
|
||||
OnBookTickerUpdate(cb func(bookTicker BookTicker))
|
||||
|
||||
OnBookSnapshot(cb func(book SliceOrderBook))
|
||||
|
||||
OnPositionUpdate(cb func(position PositionMap))
|
||||
|
|
|
@ -20,6 +20,7 @@ type Channel string
|
|||
var BookChannel = Channel("book")
|
||||
|
||||
var KLineChannel = Channel("kline")
|
||||
var BookTickerChannel = Channel("bookticker")
|
||||
|
||||
//go:generate callbackgen -type StandardStream -interface
|
||||
type StandardStream struct {
|
||||
|
@ -50,6 +51,8 @@ type StandardStream struct {
|
|||
|
||||
bookUpdateCallbacks []func(book SliceOrderBook)
|
||||
|
||||
bookTickerUpdateCallbacks []func(bookTicker BookTicker)
|
||||
|
||||
bookSnapshotCallbacks []func(book SliceOrderBook)
|
||||
|
||||
// Futures
|
||||
|
|
Loading…
Reference in New Issue
Block a user