mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-14 11:03:53 +00:00
WIP: use depth to build orderbook
This commit is contained in:
parent
0a6dc7f59f
commit
bd83832d2d
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -14,6 +15,12 @@ import (
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type PV struct {
|
||||||
|
bids types.PriceVolumeSlice
|
||||||
|
asks types.PriceVolumeSlice
|
||||||
|
t int64
|
||||||
|
}
|
||||||
|
|
||||||
// go run ./cmd/bbgo orderbook --session=binance --symbol=BTCUSDT
|
// go run ./cmd/bbgo orderbook --session=binance --symbol=BTCUSDT
|
||||||
var orderbookCmd = &cobra.Command{
|
var orderbookCmd = &cobra.Command{
|
||||||
Use: "orderbook --session=[exchange_name] --symbol=[pair_name]",
|
Use: "orderbook --session=[exchange_name] --symbol=[pair_name]",
|
||||||
|
@ -56,9 +63,16 @@ var orderbookCmd = &cobra.Command{
|
||||||
|
|
||||||
orderBook := types.NewMutexOrderBook(symbol, session.Exchange.Name())
|
orderBook := types.NewMutexOrderBook(symbol, session.Exchange.Name())
|
||||||
|
|
||||||
|
pv1c := make(chan PV, 5000)
|
||||||
|
pv2c := make(chan PV, 5000)
|
||||||
|
|
||||||
s := session.Exchange.NewStream()
|
s := session.Exchange.NewStream()
|
||||||
|
session.Key = "new"
|
||||||
|
s2 := session.Exchange.NewStream()
|
||||||
s.SetPublicOnly()
|
s.SetPublicOnly()
|
||||||
|
s2.SetPublicOnly()
|
||||||
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
|
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
|
||||||
|
s2.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
|
||||||
s.OnBookSnapshot(func(book types.SliceOrderBook) {
|
s.OnBookSnapshot(func(book types.SliceOrderBook) {
|
||||||
if dumpDepthUpdate {
|
if dumpDepthUpdate {
|
||||||
log.Infof("orderbook snapshot: %s", book.String())
|
log.Infof("orderbook snapshot: %s", book.String())
|
||||||
|
@ -70,11 +84,19 @@ var orderbookCmd = &cobra.Command{
|
||||||
log.WithError(err).Panicf("invalid error book snapshot")
|
log.WithError(err).Panicf("invalid error book snapshot")
|
||||||
}
|
}
|
||||||
|
|
||||||
if bid, ask, ok := orderBook.BestBidAndAsk(); ok {
|
/*
|
||||||
log.Infof("ASK | %f x %f / %f x %f | BID | %s",
|
if bid, ask, ok := orderBook.BestBidAndAsk(); ok {
|
||||||
ask.Volume.Float64(), ask.Price.Float64(),
|
log.Infof("1) ASK | %f x %f / %f x %f | BID | %s",
|
||||||
bid.Price.Float64(), bid.Volume.Float64(),
|
ask.Volume.Float64(), ask.Price.Float64(),
|
||||||
book.Time.String())
|
bid.Price.Float64(), bid.Volume.Float64(),
|
||||||
|
book.Time.String())
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
pv1c <- PV{
|
||||||
|
asks: orderBook.SideBook(types.SideTypeSell),
|
||||||
|
bids: orderBook.SideBook(types.SideTypeBuy),
|
||||||
|
t: book.LastUpdateId,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -84,11 +106,64 @@ var orderbookCmd = &cobra.Command{
|
||||||
}
|
}
|
||||||
orderBook.Update(book)
|
orderBook.Update(book)
|
||||||
|
|
||||||
if bid, ask, ok := orderBook.BestBidAndAsk(); ok {
|
/*
|
||||||
log.Infof("ASK | %f x %f / %f x %f | BID | %s",
|
if bid, ask, ok := orderBook.BestBidAndAsk(); ok {
|
||||||
ask.Volume.Float64(), ask.Price.Float64(),
|
log.Infof("1) ASK | %f x %f / %f x %f | BID | %s",
|
||||||
bid.Price.Float64(), bid.Volume.Float64(),
|
ask.Volume.Float64(), ask.Price.Float64(),
|
||||||
book.Time.String())
|
bid.Price.Float64(), bid.Volume.Float64(),
|
||||||
|
book.Time.String())
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
pv1c <- PV{
|
||||||
|
asks: orderBook.SideBook(types.SideTypeSell),
|
||||||
|
bids: orderBook.SideBook(types.SideTypeBuy),
|
||||||
|
t: book.LastUpdateId,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
s2.OnBookSnapshot(func(book types.SliceOrderBook) {
|
||||||
|
if dumpDepthUpdate {
|
||||||
|
log.Infof("orderbook snapshot: %s", book.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
orderBook.Load(book)
|
||||||
|
|
||||||
|
if ok, err := orderBook.IsValid(); !ok {
|
||||||
|
log.WithError(err).Panicf("invalid error book snapshot")
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
if bid, ask, ok := orderBook.BestBidAndAsk(); ok {
|
||||||
|
log.Infof("2) ASK | %f x %f / %f x %f | BID | %s",
|
||||||
|
ask.Volume.Float64(), ask.Price.Float64(),
|
||||||
|
bid.Price.Float64(), bid.Volume.Float64(),
|
||||||
|
book.Time.String())
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
pv2c <- PV{
|
||||||
|
asks: orderBook.SideBook(types.SideTypeSell),
|
||||||
|
bids: orderBook.SideBook(types.SideTypeBuy),
|
||||||
|
t: book.LastUpdateId,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
s2.OnBookUpdate(func(book types.SliceOrderBook) {
|
||||||
|
if dumpDepthUpdate {
|
||||||
|
log.Infof("orderbook update: %s", book.String())
|
||||||
|
}
|
||||||
|
orderBook.Update(book)
|
||||||
|
|
||||||
|
/*
|
||||||
|
if bid, ask, ok := orderBook.BestBidAndAsk(); ok {
|
||||||
|
log.Infof("2) ASK | %f x %f / %f x %f | BID | %s",
|
||||||
|
ask.Volume.Float64(), ask.Price.Float64(),
|
||||||
|
bid.Price.Float64(), bid.Volume.Float64(),
|
||||||
|
book.Time.String())
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
pv2c <- PV{
|
||||||
|
asks: orderBook.SideBook(types.SideTypeSell),
|
||||||
|
bids: orderBook.SideBook(types.SideTypeBuy),
|
||||||
|
t: book.LastUpdateId,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -96,6 +171,9 @@ var orderbookCmd = &cobra.Command{
|
||||||
if err := s.Connect(ctx); err != nil {
|
if err := s.Connect(ctx); err != nil {
|
||||||
return fmt.Errorf("failed to connect to %s", sessionName)
|
return fmt.Errorf("failed to connect to %s", sessionName)
|
||||||
}
|
}
|
||||||
|
if err := s2.Connect(ctx); err != nil {
|
||||||
|
return fmt.Errorf("failed to connect to %s", sessionName)
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("connected")
|
log.Infof("connected")
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -103,9 +181,86 @@ var orderbookCmd = &cobra.Command{
|
||||||
if err := s.Close(); err != nil {
|
if err := s.Close(); err != nil {
|
||||||
log.WithError(err).Errorf("connection close error")
|
log.WithError(err).Errorf("connection close error")
|
||||||
}
|
}
|
||||||
|
if err := s2.Close(); err != nil {
|
||||||
|
log.WithError(err).Errorf("connection close error")
|
||||||
|
}
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
flag := false
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case pv1 := <-pv1c:
|
||||||
|
pv2 := <-pv2c
|
||||||
|
|
||||||
|
if flag && pv1.t != pv2.t {
|
||||||
|
log.Info("skip")
|
||||||
|
}
|
||||||
|
|
||||||
|
for pv1.t != pv2.t {
|
||||||
|
if pv1.t < pv2.t {
|
||||||
|
pv1 = <-pv1c
|
||||||
|
} else {
|
||||||
|
pv2 = <-pv2c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("version of 1: %d, version of 2: %d", pv1.t, pv2.t)
|
||||||
|
|
||||||
|
flag = true
|
||||||
|
|
||||||
|
if len(pv1.asks) != len(pv2.asks) {
|
||||||
|
log.Info("not equal for length")
|
||||||
|
log.Infof("pv1 asks: %+v", pv1.asks)
|
||||||
|
log.Infof("pv2 asks: %+v", pv2.asks)
|
||||||
|
x, _ := json.Marshal(pv1.asks)
|
||||||
|
log.Infof("pv1 bids: %s", string(x))
|
||||||
|
y, _ := json.Marshal(pv2.asks)
|
||||||
|
log.Infof("pv2 bids: %s", string(y))
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range pv1.asks {
|
||||||
|
if !pv1.asks[i].Equals(pv2.asks[i]) {
|
||||||
|
log.Info("not euqal for value")
|
||||||
|
log.Infof("pv1 asks: %+v", pv1.asks)
|
||||||
|
log.Infof("pv2 asks: %+v", pv2.asks)
|
||||||
|
x, _ := json.Marshal(pv1.asks)
|
||||||
|
log.Infof("pv1 bids: %s", string(x))
|
||||||
|
y, _ := json.Marshal(pv2.asks)
|
||||||
|
log.Infof("pv2 bids: %s", string(y))
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pv1.bids) != len(pv2.bids) {
|
||||||
|
log.Info("not equal for length")
|
||||||
|
x, _ := json.Marshal(pv1.bids)
|
||||||
|
log.Infof("pv1 bids: %s", string(x))
|
||||||
|
y, _ := json.Marshal(pv2.bids)
|
||||||
|
log.Infof("pv2 bids: %s", string(y))
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range pv1.bids {
|
||||||
|
if !pv1.bids[i].Equals(pv2.bids[i]) {
|
||||||
|
log.Info("not euqal for value")
|
||||||
|
x, _ := json.Marshal(pv1.bids)
|
||||||
|
log.Infof("pv1 bids: %s", string(x))
|
||||||
|
y, _ := json.Marshal(pv2.bids)
|
||||||
|
log.Infof("pv2 bids: %s", string(y))
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
|
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
|
|
@ -178,7 +178,7 @@ func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Exchange) NewStream() types.Stream {
|
func (e *Exchange) NewStream() types.Stream {
|
||||||
stream := NewStream(e.key, e.secret)
|
stream := NewStream(e, e.key, e.secret)
|
||||||
stream.MarginSettings = e.MarginSettings
|
stream.MarginSettings = e.MarginSettings
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,12 +164,15 @@ func parsePublicTradeEvent(val *fastjson.Value) (*PublicTradeEvent, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type BookEvent struct {
|
type BookEvent struct {
|
||||||
Event string `json:"e"`
|
Event string `json:"e"`
|
||||||
Market string `json:"M"`
|
Market string `json:"M"`
|
||||||
Channel string `json:"c"`
|
Channel string `json:"c"`
|
||||||
Timestamp int64 `json:"t"` // Millisecond timestamp
|
Timestamp int64 `json:"t"` // Millisecond timestamp
|
||||||
Bids types.PriceVolumeSlice
|
Bids types.PriceVolumeSlice
|
||||||
Asks types.PriceVolumeSlice
|
Asks types.PriceVolumeSlice
|
||||||
|
FirstUpdateID int64 `json:"fi"`
|
||||||
|
LastUpdateID int64 `json:"li"`
|
||||||
|
Version int64 `json:"v"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *BookEvent) Time() time.Time {
|
func (e *BookEvent) Time() time.Time {
|
||||||
|
@ -181,6 +184,7 @@ func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook, err error) {
|
||||||
snapshot.Time = e.Time()
|
snapshot.Time = e.Time()
|
||||||
snapshot.Bids = e.Bids
|
snapshot.Bids = e.Bids
|
||||||
snapshot.Asks = e.Asks
|
snapshot.Asks = e.Asks
|
||||||
|
snapshot.LastUpdateId = e.LastUpdateID
|
||||||
return snapshot, nil
|
return snapshot, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,10 +216,13 @@ func parseKLineEvent(val *fastjson.Value) (*KLineEvent, error) {
|
||||||
|
|
||||||
func parseBookEvent(val *fastjson.Value) (event *BookEvent, err error) {
|
func parseBookEvent(val *fastjson.Value) (event *BookEvent, err error) {
|
||||||
event = &BookEvent{
|
event = &BookEvent{
|
||||||
Event: string(val.GetStringBytes("e")),
|
Event: string(val.GetStringBytes("e")),
|
||||||
Market: string(val.GetStringBytes("M")),
|
Market: string(val.GetStringBytes("M")),
|
||||||
Channel: string(val.GetStringBytes("c")),
|
Channel: string(val.GetStringBytes("c")),
|
||||||
Timestamp: val.GetInt64("T"),
|
Timestamp: val.GetInt64("T"),
|
||||||
|
FirstUpdateID: val.GetInt64("fi"),
|
||||||
|
LastUpdateID: val.GetInt64("li"),
|
||||||
|
Version: val.GetInt64("v"),
|
||||||
}
|
}
|
||||||
|
|
||||||
// t := time.Unix(0, event.Timestamp*int64(time.Millisecond))
|
// t := time.Unix(0, event.Timestamp*int64(time.Millisecond))
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/depth"
|
||||||
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
|
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
@ -40,19 +41,24 @@ type Stream struct {
|
||||||
|
|
||||||
accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent)
|
accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent)
|
||||||
accountUpdateEventCallbacks []func(e max.AccountUpdateEvent)
|
accountUpdateEventCallbacks []func(e max.AccountUpdateEvent)
|
||||||
|
|
||||||
|
// depthBuffers is used for storing the depth info
|
||||||
|
depthBuffers map[string]*depth.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStream(key, secret string) *Stream {
|
func NewStream(ex *Exchange, key, secret string) *Stream {
|
||||||
stream := &Stream{
|
stream := &Stream{
|
||||||
StandardStream: types.NewStandardStream(),
|
StandardStream: types.NewStandardStream(),
|
||||||
key: key,
|
key: key,
|
||||||
// pragma: allowlist nextline secret
|
// pragma: allowlist nextline secret
|
||||||
secret: secret,
|
secret: secret,
|
||||||
|
depthBuffers: make(map[string]*depth.Buffer),
|
||||||
}
|
}
|
||||||
stream.SetEndpointCreator(stream.getEndpoint)
|
stream.SetEndpointCreator(stream.getEndpoint)
|
||||||
stream.SetParser(max.ParseMessage)
|
stream.SetParser(max.ParseMessage)
|
||||||
stream.SetDispatcher(stream.dispatchEvent)
|
stream.SetDispatcher(stream.dispatchEvent)
|
||||||
stream.OnConnect(stream.handleConnect)
|
stream.OnConnect(stream.handleConnect)
|
||||||
|
stream.OnDisconnect(stream.handleDisconnect)
|
||||||
stream.OnAuthEvent(func(e max.AuthEvent) {
|
stream.OnAuthEvent(func(e max.AuthEvent) {
|
||||||
log.Infof("max websocket connection authenticated: %+v", e)
|
log.Infof("max websocket connection authenticated: %+v", e)
|
||||||
stream.EmitAuth()
|
stream.EmitAuth()
|
||||||
|
@ -62,9 +68,13 @@ func NewStream(key, secret string) *Stream {
|
||||||
stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent)
|
stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent)
|
||||||
stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent)
|
stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent)
|
||||||
stream.OnTradeUpdateEvent(stream.handleTradeEvent)
|
stream.OnTradeUpdateEvent(stream.handleTradeEvent)
|
||||||
stream.OnBookEvent(stream.handleBookEvent)
|
|
||||||
stream.OnAccountSnapshotEvent(stream.handleAccountSnapshotEvent)
|
stream.OnAccountSnapshotEvent(stream.handleAccountSnapshotEvent)
|
||||||
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
|
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
|
||||||
|
if key == "new" {
|
||||||
|
stream.OnBookEvent(stream.handleBookEventNew(ex))
|
||||||
|
} else {
|
||||||
|
stream.OnBookEvent(stream.handleBookEvent)
|
||||||
|
}
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,6 +166,13 @@ func (s *Stream) handleConnect() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Stream) handleDisconnect() {
|
||||||
|
log.Debugf("resetting depth snapshots...")
|
||||||
|
for _, f := range s.depthBuffers {
|
||||||
|
f.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Stream) handleKLineEvent(e max.KLineEvent) {
|
func (s *Stream) handleKLineEvent(e max.KLineEvent) {
|
||||||
kline := e.KLine.KLine()
|
kline := e.KLine.KLine()
|
||||||
s.EmitKLine(kline)
|
s.EmitKLine(kline)
|
||||||
|
@ -218,6 +235,41 @@ func (s *Stream) handleBookEvent(e max.BookEvent) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Stream) handleBookEventNew(ex *Exchange) func(e max.BookEvent) {
|
||||||
|
return func(e max.BookEvent) {
|
||||||
|
symbol := toGlobalSymbol(e.Market)
|
||||||
|
f, ok := s.depthBuffers[symbol]
|
||||||
|
if ok {
|
||||||
|
err := f.AddUpdate(types.SliceOrderBook{
|
||||||
|
Symbol: toGlobalSymbol(e.Market),
|
||||||
|
Time: e.Time(),
|
||||||
|
Bids: e.Bids,
|
||||||
|
Asks: e.Asks,
|
||||||
|
}, e.FirstUpdateID, e.LastUpdateID)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Errorf("found missing %s update event", e.Market)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
|
||||||
|
log.Infof("fetching %s depth...", e.Market)
|
||||||
|
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
|
||||||
|
return ex.QueryDepth(context.Background(), e.Market, 50)
|
||||||
|
})
|
||||||
|
f.SetBufferingPeriod(time.Second)
|
||||||
|
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
|
||||||
|
s.EmitBookSnapshot(snapshot)
|
||||||
|
for _, u := range updates {
|
||||||
|
s.EmitBookUpdate(u.Object)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
f.OnPush(func(update depth.Update) {
|
||||||
|
s.EmitBookUpdate(update.Object)
|
||||||
|
})
|
||||||
|
s.depthBuffers[symbol] = f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Stream) handleAccountSnapshotEvent(e max.AccountSnapshotEvent) {
|
func (s *Stream) handleAccountSnapshotEvent(e max.AccountSnapshotEvent) {
|
||||||
snapshot := map[string]types.Balance{}
|
snapshot := map[string]types.Balance{}
|
||||||
for _, bm := range e.Balances {
|
for _, bm := range e.Balances {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user