Merge pull request #1765 from c9s/chiahung/max/use-depth-to-build-orderbook

FEATURE: [max] use QueryDepth to build orderbook
This commit is contained in:
kbearXD 2024-10-16 14:13:53 +08:00 committed by GitHub
commit cf8a321aed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 29 deletions

View File

@ -178,7 +178,7 @@ func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
}
func (e *Exchange) NewStream() types.Stream {
stream := NewStream(e.key, e.secret)
stream := NewStream(e, e.key, e.secret)
stream.MarginSettings = e.MarginSettings
return stream
}

View File

@ -164,12 +164,15 @@ func parsePublicTradeEvent(val *fastjson.Value) (*PublicTradeEvent, error) {
}
type BookEvent struct {
Event string `json:"e"`
Market string `json:"M"`
Channel string `json:"c"`
Timestamp int64 `json:"t"` // Millisecond timestamp
Bids types.PriceVolumeSlice
Asks types.PriceVolumeSlice
Event string `json:"e"`
Market string `json:"M"`
Channel string `json:"c"`
Timestamp int64 `json:"t"` // Millisecond timestamp
Bids types.PriceVolumeSlice
Asks types.PriceVolumeSlice
FirstUpdateID int64 `json:"fi"`
LastUpdateID int64 `json:"li"`
Version int64 `json:"v"`
}
func (e *BookEvent) Time() time.Time {
@ -181,6 +184,7 @@ func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook, err error) {
snapshot.Time = e.Time()
snapshot.Bids = e.Bids
snapshot.Asks = e.Asks
snapshot.LastUpdateId = e.LastUpdateID
return snapshot, nil
}
@ -212,10 +216,13 @@ func parseKLineEvent(val *fastjson.Value) (*KLineEvent, error) {
func parseBookEvent(val *fastjson.Value) (event *BookEvent, err error) {
event = &BookEvent{
Event: string(val.GetStringBytes("e")),
Market: string(val.GetStringBytes("M")),
Channel: string(val.GetStringBytes("c")),
Timestamp: val.GetInt64("T"),
Event: string(val.GetStringBytes("e")),
Market: string(val.GetStringBytes("M")),
Channel: string(val.GetStringBytes("c")),
Timestamp: val.GetInt64("T"),
FirstUpdateID: val.GetInt64("fi"),
LastUpdateID: val.GetInt64("li"),
Version: val.GetInt64("v"),
}
// t := time.Unix(0, event.Timestamp*int64(time.Millisecond))

View File

@ -11,6 +11,7 @@ import (
"github.com/google/uuid"
"github.com/c9s/bbgo/pkg/depth"
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/types"
)
@ -40,19 +41,24 @@ type Stream struct {
accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent)
accountUpdateEventCallbacks []func(e max.AccountUpdateEvent)
// depthBuffers is used for storing the depth info
depthBuffers map[string]*depth.Buffer
}
func NewStream(key, secret string) *Stream {
func NewStream(ex *Exchange, key, secret string) *Stream {
stream := &Stream{
StandardStream: types.NewStandardStream(),
key: key,
// pragma: allowlist nextline secret
secret: secret,
secret: secret,
depthBuffers: make(map[string]*depth.Buffer),
}
stream.SetEndpointCreator(stream.getEndpoint)
stream.SetParser(max.ParseMessage)
stream.SetDispatcher(stream.dispatchEvent)
stream.OnConnect(stream.handleConnect)
stream.OnDisconnect(stream.handleDisconnect)
stream.OnAuthEvent(func(e max.AuthEvent) {
log.Infof("max websocket connection authenticated: %+v", e)
stream.EmitAuth()
@ -62,9 +68,9 @@ func NewStream(key, secret string) *Stream {
stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent)
stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent)
stream.OnTradeUpdateEvent(stream.handleTradeEvent)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnAccountSnapshotEvent(stream.handleAccountSnapshotEvent)
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
stream.OnBookEvent(stream.handleBookEvent(ex))
return stream
}
@ -156,6 +162,13 @@ func (s *Stream) handleConnect() {
}
}
func (s *Stream) handleDisconnect() {
log.Debugf("resetting depth snapshots...")
for _, f := range s.depthBuffers {
f.Reset()
}
}
func (s *Stream) handleKLineEvent(e max.KLineEvent) {
kline := e.KLine.KLine()
s.EmitKLine(kline)
@ -200,21 +213,38 @@ func (s *Stream) handleTradeEvent(e max.TradeUpdateEvent) {
}
}
func (s *Stream) handleBookEvent(e max.BookEvent) {
newBook, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
return
}
newBook.Symbol = toGlobalSymbol(e.Market)
newBook.Time = e.Time()
switch e.Event {
case "snapshot":
s.EmitBookSnapshot(newBook)
case "update":
s.EmitBookUpdate(newBook)
func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
return func(e max.BookEvent) {
symbol := toGlobalSymbol(e.Market)
f, ok := s.depthBuffers[symbol]
if ok {
err := f.AddUpdate(types.SliceOrderBook{
Symbol: toGlobalSymbol(e.Market),
Time: e.Time(),
Bids: e.Bids,
Asks: e.Asks,
}, e.FirstUpdateID, e.LastUpdateID)
if err != nil {
log.WithError(err).Errorf("found missing %s update event", e.Market)
}
} else {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
log.Infof("fetching %s depth...", e.Market)
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
return ex.QueryDepth(context.Background(), e.Market, 50)
})
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
s.EmitBookSnapshot(snapshot)
for _, u := range updates {
s.EmitBookUpdate(u.Object)
}
})
f.OnPush(func(update depth.Update) {
s.EmitBookUpdate(update.Object)
})
s.depthBuffers[symbol] = f
}
}
}