From 50bfd8ee0e866bf4611ed794271ccfe8abf3035d Mon Sep 17 00:00:00 2001 From: Edwin Date: Fri, 1 Sep 2023 17:02:08 +0800 Subject: [PATCH] pkg/exchange: add time to SliceOrderBook --- pkg/cmd/orderbook.go | 10 +++--- pkg/exchange/binance/exchange.go | 2 ++ pkg/exchange/binance/parse.go | 1 + pkg/exchange/binance/stream.go | 1 + pkg/exchange/bybit/stream.go | 1 + pkg/exchange/bybit/stream_test.go | 2 ++ pkg/exchange/bybit/types.go | 7 ++++- pkg/exchange/kucoin/exchange.go | 1 + pkg/exchange/kucoin/stream.go | 1 + pkg/exchange/kucoin/websocket.go | 1 + pkg/exchange/max/maxapi/public_parser.go | 39 ++++++++++++------------ pkg/exchange/max/stream.go | 1 + pkg/exchange/okex/parse.go | 1 + pkg/types/sliceorderbook.go | 7 +++++ 14 files changed, 51 insertions(+), 24 deletions(-) diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index 9eba051d6..87b8b0087 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -71,9 +71,10 @@ var orderbookCmd = &cobra.Command{ } if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("ASK | %f x %f / %f x %f | BID", + log.Infof("ASK | %f x %f / %f x %f | BID | %s", ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64()) + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) } }) @@ -84,9 +85,10 @@ var orderbookCmd = &cobra.Command{ orderBook.Update(book) if bid, ask, ok := orderBook.BestBidAndAsk(); ok { - log.Infof("ASK | %f x %f / %f x %f | BID", + log.Infof("ASK | %f x %f / %f x %f | BID | %s", ask.Volume.Float64(), ask.Price.Float64(), - bid.Price.Float64(), bid.Volume.Float64()) + bid.Price.Float64(), bid.Volume.Float64(), + book.Time.String()) } }) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 3e62068c6..9286be367 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -1346,6 +1346,8 @@ func (e *Exchange) QueryDepth(ctx context.Context, symbol string) (snapshot type func convertDepth(snapshot types.SliceOrderBook, symbol string, finalUpdateID int64, response *binance.DepthResponse) (types.SliceOrderBook, int64, error) { snapshot.Symbol = symbol + // empty time since the API does not provide time information. + snapshot.Time = time.Time{} finalUpdateID = response.LastUpdateID for _, entry := range response.Bids { // entry.Price, Quantity: entry.Quantity diff --git a/pkg/exchange/binance/parse.go b/pkg/exchange/binance/parse.go index b2990d977..a1350faf4 100644 --- a/pkg/exchange/binance/parse.go +++ b/pkg/exchange/binance/parse.go @@ -461,6 +461,7 @@ func (e *DepthEvent) String() (o string) { func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) { book.Symbol = e.Symbol + book.Time = types.NewMillisecondTimestampFromInt(e.EventBase.Time).Time() // already in descending order book.Bids = e.Bids diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 9cf5d27b2..cc09d8eb7 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -88,6 +88,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie if ok { err := f.AddUpdate(types.SliceOrderBook{ Symbol: e.Symbol, + Time: types.NewMillisecondTimestampFromInt(e.EventBase.Time).Time(), Bids: e.Bids, Asks: e.Asks, }, e.FirstUpdateID, e.FinalUpdateID) diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index 2359030f9..fdf284603 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -131,6 +131,7 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) { } book.Type = e.WebSocketTopicEvent.Type + book.ServerTime = e.WebSocketTopicEvent.Ts.Time() return &book, nil case TopicTypeKLine: diff --git a/pkg/exchange/bybit/stream_test.go b/pkg/exchange/bybit/stream_test.go index 195c6fea2..04de5d373 100644 --- a/pkg/exchange/bybit/stream_test.go +++ b/pkg/exchange/bybit/stream_test.go @@ -35,6 +35,7 @@ func getTestClientOrSkip(t *testing.T) *Stream { } func TestStream(t *testing.T) { + t.Skip() s := getTestClientOrSkip(t) t.Run("Auth test", func(t *testing.T) { @@ -182,6 +183,7 @@ func TestStream_parseWebSocketEvent(t *testing.T) { UpdateId: fixedpoint.NewFromFloat(1854104), SequenceId: fixedpoint.NewFromFloat(10559247733), Type: DataTypeDelta, + ServerTime: types.NewMillisecondTimestampFromInt(1691130685111).Time(), }, *book) }) diff --git a/pkg/exchange/bybit/types.go b/pkg/exchange/bybit/types.go index 6b62f6563..04e63e3ca 100644 --- a/pkg/exchange/bybit/types.go +++ b/pkg/exchange/bybit/types.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi" "github.com/c9s/bbgo/pkg/fixedpoint" @@ -117,14 +118,18 @@ type BookEvent struct { SequenceId fixedpoint.Value `json:"seq"` // internal use - // Type can be one of snapshot or delta. Copied from WebSocketTopicEvent.Type + // Copied from WebSocketTopicEvent.Type, WebSocketTopicEvent.Ts + // Type can be one of snapshot or delta. Type DataType + // ServerTime using the websocket timestamp as server time. Since the event not provide server time information. + ServerTime time.Time } func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook) { snapshot.Symbol = e.Symbol snapshot.Bids = e.Bids snapshot.Asks = e.Asks + snapshot.Time = e.ServerTime return snapshot } diff --git a/pkg/exchange/kucoin/exchange.go b/pkg/exchange/kucoin/exchange.go index be2d252c6..09ad8066f 100644 --- a/pkg/exchange/kucoin/exchange.go +++ b/pkg/exchange/kucoin/exchange.go @@ -438,6 +438,7 @@ func (e *Exchange) QueryDepth(ctx context.Context, symbol string) (types.SliceOr return types.SliceOrderBook{ Symbol: toGlobalSymbol(symbol), + Time: orderBook.Time.Time(), Bids: orderBook.Bids, Asks: orderBook.Asks, }, sequence, nil diff --git a/pkg/exchange/kucoin/stream.go b/pkg/exchange/kucoin/stream.go index f6ad67f97..7b2bfe8b9 100644 --- a/pkg/exchange/kucoin/stream.go +++ b/pkg/exchange/kucoin/stream.go @@ -73,6 +73,7 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) { if ok { f.AddUpdate(types.SliceOrderBook{ Symbol: toGlobalSymbol(e.Symbol), + Time: e.Time.Time(), Bids: e.Changes.Bids, Asks: e.Changes.Asks, }, e.SequenceStart, e.SequenceEnd) diff --git a/pkg/exchange/kucoin/websocket.go b/pkg/exchange/kucoin/websocket.go index 4ec493aad..b5ddc8d51 100644 --- a/pkg/exchange/kucoin/websocket.go +++ b/pkg/exchange/kucoin/websocket.go @@ -80,6 +80,7 @@ type WebSocketOrderBookL2Event struct { Asks types.PriceVolumeSlice `json:"asks"` Bids types.PriceVolumeSlice `json:"bids"` } `json:"changes"` + Time types.MillisecondTimestamp `json:"time"` } type WebSocketCandleEvent struct { diff --git a/pkg/exchange/max/maxapi/public_parser.go b/pkg/exchange/max/maxapi/public_parser.go index 66f0cf88a..d96799f50 100644 --- a/pkg/exchange/max/maxapi/public_parser.go +++ b/pkg/exchange/max/maxapi/public_parser.go @@ -81,25 +81,25 @@ type KLineEvent struct { } /* -{ - "c": "kline", - "M": "btcusdt", - "e": "update", - "T": 1602999650179, - "k": { - "ST": 1602999900000, - "ET": 1602999900000, - "M": "btcusdt", - "R": "5m", - "O": "11417.21", - "H": "11417.21", - "L": "11417.21", - "C": "11417.21", - "v": "0", - "ti": 0, - "x": false - } -} + { + "c": "kline", + "M": "btcusdt", + "e": "update", + "T": 1602999650179, + "k": { + "ST": 1602999900000, + "ET": 1602999900000, + "M": "btcusdt", + "R": "5m", + "O": "11417.21", + "H": "11417.21", + "L": "11417.21", + "C": "11417.21", + "v": "0", + "ti": 0, + "x": false + } + } */ type KLinePayload struct { StartTime int64 `json:"ST"` @@ -175,6 +175,7 @@ func (e *BookEvent) Time() time.Time { func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook, err error) { snapshot.Symbol = strings.ToUpper(e.Market) + snapshot.Time = e.Time() for _, bid := range e.Bids { pv, err := bid.PriceVolumePair() diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 89f885d1d..27efbb51a 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -188,6 +188,7 @@ func (s *Stream) handleBookEvent(e max.BookEvent) { } newBook.Symbol = toGlobalSymbol(e.Market) + newBook.Time = e.Time() switch e.Event { case "snapshot": diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 6b9c019ed..2505f17d6 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -81,6 +81,7 @@ func (data *BookEvent) BookTicker() types.BookTicker { func (data *BookEvent) Book() types.SliceOrderBook { book := types.SliceOrderBook{ Symbol: data.Symbol, + Time: types.NewMillisecondTimestampFromInt(data.MillisecondTimestamp).Time(), } for _, bid := range data.Bids { diff --git a/pkg/types/sliceorderbook.go b/pkg/types/sliceorderbook.go index 777e30333..d48fb528a 100644 --- a/pkg/types/sliceorderbook.go +++ b/pkg/types/sliceorderbook.go @@ -12,11 +12,14 @@ import ( // SliceOrderBook is a general order book structure which could be used // for RESTful responses and websocket stream parsing +// //go:generate callbackgen -type SliceOrderBook type SliceOrderBook struct { Symbol string Bids PriceVolumeSlice Asks PriceVolumeSlice + // Time represents the server time. If empty, it indicates that the server does not provide this information. + Time time.Time lastUpdateTime time.Time @@ -162,6 +165,8 @@ func (b *SliceOrderBook) String() string { sb.WriteString("BOOK ") sb.WriteString(b.Symbol) sb.WriteString("\n") + sb.WriteString(b.Time.Format(time.RFC1123)) + sb.WriteString("\n") if len(b.Asks) > 0 { sb.WriteString("ASKS:\n") @@ -187,6 +192,7 @@ func (b *SliceOrderBook) String() string { func (b *SliceOrderBook) CopyDepth(limit int) OrderBook { var book SliceOrderBook book.Symbol = b.Symbol + book.Time = b.Time book.Bids = b.Bids.CopyDepth(limit) book.Asks = b.Asks.CopyDepth(limit) return &book @@ -195,6 +201,7 @@ func (b *SliceOrderBook) CopyDepth(limit int) OrderBook { func (b *SliceOrderBook) Copy() OrderBook { var book SliceOrderBook book.Symbol = b.Symbol + book.Time = b.Time book.Bids = b.Bids.Copy() book.Asks = b.Asks.Copy() return &book