Merge pull request #1755 from c9s/c9s/xdepthmaker/refactor

IMPROVE: [xdepthmaker] improve price solver integration and hedge method
This commit is contained in:
c9s 2024-09-27 20:24:08 +08:00 committed by GitHub
commit 12455604a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 185 additions and 42 deletions

View File

@ -1,8 +1,9 @@
package twap
package bbgo
import (
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
@ -14,6 +15,8 @@ type BboMonitor struct {
Ask types.PriceVolume
UpdatedTime time.Time
priceImpactRatio fixedpoint.Value
updateCallbacks []func(bid, ask types.PriceVolume)
}
@ -21,6 +24,10 @@ func NewBboMonitor() *BboMonitor {
return &BboMonitor{}
}
func (m *BboMonitor) SetPriceImpactRatio(ratio fixedpoint.Value) {
m.priceImpactRatio = ratio
}
func (m *BboMonitor) UpdateFromBook(book *types.StreamOrderBook) bool {
bestBid, ok1 := book.BestBid()
bestAsk, ok2 := book.BestAsk()
@ -34,11 +41,23 @@ func (m *BboMonitor) UpdateFromBook(book *types.StreamOrderBook) bool {
func (m *BboMonitor) Update(bid, ask types.PriceVolume, t time.Time) bool {
changed := false
if m.Bid.Price.Compare(bid.Price) != 0 || m.Bid.Volume.Compare(bid.Volume) != 0 {
changed = true
if m.priceImpactRatio.IsZero() {
changed = true
} else {
if bid.Price.Sub(m.Bid.Price).Abs().Div(m.Bid.Price).Compare(m.priceImpactRatio) >= 0 {
changed = true
}
}
}
if m.Ask.Price.Compare(ask.Price) != 0 || m.Ask.Volume.Compare(ask.Volume) != 0 {
changed = true
if m.priceImpactRatio.IsZero() {
changed = true
} else {
if ask.Price.Sub(m.Ask.Price).Abs().Div(m.Ask.Price).Compare(m.priceImpactRatio) >= 0 {
changed = true
}
}
}
m.Bid = bid

View File

@ -1,6 +1,6 @@
// Code generated by "callbackgen -type BboMonitor"; DO NOT EDIT.
package twap
package bbgo
import (
"github.com/c9s/bbgo/pkg/types"

View File

@ -1,6 +1,7 @@
package pricesolver
import (
"context"
"sync"
log "github.com/sirupsen/logrus"
@ -69,6 +70,28 @@ func (m *SimplePriceSolver) UpdateFromTrade(trade types.Trade) {
m.Update(trade.Symbol, trade.Price)
}
func (m *SimplePriceSolver) BindStream(stream types.Stream) {
stream.OnKLineClosed(func(k types.KLine) {
m.Update(k.Symbol, k.Close)
})
}
func (m *SimplePriceSolver) UpdateFromTickers(ctx context.Context, ex types.Exchange, symbols ...string) error {
for _, symbol := range symbols {
ticker, err := ex.QueryTicker(ctx, symbol)
if err != nil {
return err
}
price := ticker.GetValidPrice()
if !price.IsZero() {
m.Update(symbol, price)
}
}
return nil
}
func (m *SimplePriceSolver) inferencePrice(asset string, assetPrice fixedpoint.Value, preferredFiats ...string) (fixedpoint.Value, bool) {
// log.Infof("inferencePrice %s = %f", asset, assetPrice.Float64())
quotePrices, ok := m.pricesByBase[asset]

View File

@ -16,6 +16,8 @@ import (
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/pricesolver"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/strategy/xmaker"
"github.com/c9s/bbgo/pkg/types"
@ -185,6 +187,8 @@ type HedgeStrategy string
const (
HedgeStrategyMarket HedgeStrategy = "market"
HedgeStrategyBboCounterParty1 HedgeStrategy = "bbo-counter-party-1"
HedgeStrategyBboCounterParty3 HedgeStrategy = "bbo-counter-party-3"
HedgeStrategyBboCounterParty5 HedgeStrategy = "bbo-counter-party-5"
HedgeStrategyBboQueue1 HedgeStrategy = "bbo-queue-1"
)
@ -206,7 +210,8 @@ type Strategy struct {
// HedgeExchange session name
HedgeExchange string `json:"hedgeExchange"`
UpdateInterval types.Duration `json:"updateInterval"`
FastLayerUpdateInterval types.Duration `json:"fastLayerUpdateInterval"`
NumOfFastLayers int `json:"numOfFastLayers"`
HedgeInterval types.Duration `json:"hedgeInterval"`
@ -244,6 +249,8 @@ type Strategy struct {
// RecoverTrade tries to find the missing trades via the REStful API
RecoverTrade bool `json:"recoverTrade"`
PriceImpactRatio fixedpoint.Value `json:"priceImpactRatio"`
RecoverTradeScanPeriod types.Duration `json:"recoverTradeScanPeriod"`
NumLayers int `json:"numLayers"`
@ -267,11 +274,16 @@ type Strategy struct {
lastSourcePrice fixedpoint.MutexValue
stopC chan struct{}
stopC chan struct{}
fullReplenishTriggerC sigchan.Chan
logger logrus.FieldLogger
connectivityGroup *types.ConnectivityGroup
makerConnectivity, hedgerConnectivity *types.Connectivity
connectivityGroup *types.ConnectivityGroup
priceSolver *pricesolver.SimplePriceSolver
bboMonitor *bbgo.BboMonitor
}
func (s *Strategy) ID() string {
@ -317,8 +329,10 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
})
hedgeSession.Subscribe(types.KLineChannel, s.HedgeSymbol, types.SubscribeOptions{Interval: "1m"})
hedgeSession.Subscribe(types.KLineChannel, hedgeSession.Exchange.PlatformFeeCurrency()+"USDT", types.SubscribeOptions{Interval: "1m"})
makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
makerSession.Subscribe(types.KLineChannel, makerSession.Exchange.PlatformFeeCurrency()+"USDT", types.SubscribeOptions{Interval: "1m"})
}
func (s *Strategy) Validate() error {
@ -342,8 +356,12 @@ func (s *Strategy) Validate() error {
}
func (s *Strategy) Defaults() error {
if s.UpdateInterval == 0 {
s.UpdateInterval = types.Duration(5 * time.Second)
if s.FastLayerUpdateInterval == 0 {
s.FastLayerUpdateInterval = types.Duration(5 * time.Second)
}
if s.NumOfFastLayers == 0 {
s.NumOfFastLayers = 5
}
if s.FullReplenishInterval == 0 {
@ -391,7 +409,7 @@ func (s *Strategy) Defaults() error {
}
func (s *Strategy) quoteWorker(ctx context.Context) {
updateTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
updateTicker := time.NewTicker(util.MillisecondsJitter(s.FastLayerUpdateInterval.Duration(), 200))
defer updateTicker.Stop()
fullReplenishTicker := time.NewTicker(util.MillisecondsJitter(s.FullReplenishInterval.Duration(), 200))
@ -405,8 +423,6 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
s.updateQuote(ctx, 0)
lastOrderReplenishTime := time.Now()
for {
select {
case <-ctx.Done():
@ -416,9 +432,15 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return
case <-s.fullReplenishTriggerC:
// force trigger full replenish
s.updateQuote(ctx, 0)
case <-fullReplenishTicker.C:
s.updateQuote(ctx, 0)
lastOrderReplenishTime = time.Now()
case <-updateTicker.C:
s.updateQuote(ctx, s.NumOfFastLayers)
case sig, ok := <-s.sourceBook.C:
// when any book change event happened
@ -426,19 +448,10 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
return
}
if time.Since(lastOrderReplenishTime) < 10*time.Second {
continue
}
switch sig.Type {
case types.BookSignalSnapshot:
changed := s.bboMonitor.UpdateFromBook(s.sourceBook)
if changed || sig.Type == types.BookSignalSnapshot {
s.updateQuote(ctx, 0)
case types.BookSignalUpdate:
s.updateQuote(ctx, 5)
}
lastOrderReplenishTime = time.Now()
}
}
}
@ -560,15 +573,42 @@ func (s *Strategy) CrossRun(
s.sourceBook = types.NewStreamBook(s.HedgeSymbol, s.hedgeSession.ExchangeName)
s.sourceBook.BindStream(s.hedgeSession.MarketDataStream)
s.priceSolver = pricesolver.NewSimplePriceResolver(s.makerSession.Markets())
s.priceSolver.BindStream(s.hedgeSession.MarketDataStream)
s.priceSolver.BindStream(s.makerSession.MarketDataStream)
s.bboMonitor = bbgo.NewBboMonitor()
if !s.PriceImpactRatio.IsZero() {
s.bboMonitor.SetPriceImpactRatio(s.PriceImpactRatio)
}
if err := s.priceSolver.UpdateFromTickers(ctx, s.makerSession.Exchange,
s.Symbol, s.makerSession.Exchange.PlatformFeeCurrency()+"USDT"); err != nil {
return err
}
if err := s.priceSolver.UpdateFromTickers(ctx, s.hedgeSession.Exchange, s.HedgeSymbol); err != nil {
return err
}
s.makerSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(k types.KLine) {
s.priceSolver.Update(k.Symbol, k.Close)
feeToken := s.makerSession.Exchange.PlatformFeeCurrency()
if feePrice, ok := s.priceSolver.ResolvePrice(feeToken, "USDT"); ok {
s.Position.SetFeeAverageCost(feeToken, feePrice)
}
}))
s.stopC = make(chan struct{})
s.fullReplenishTriggerC = sigchan.New(1)
makerConnectivity := types.NewConnectivity()
makerConnectivity.Bind(s.makerSession.UserDataStream)
s.makerConnectivity = types.NewConnectivity()
s.makerConnectivity.Bind(s.makerSession.UserDataStream)
hedgerConnectivity := types.NewConnectivity()
hedgerConnectivity.Bind(s.hedgeSession.UserDataStream)
s.hedgerConnectivity = types.NewConnectivity()
s.hedgerConnectivity.Bind(s.hedgeSession.UserDataStream)
connGroup := types.NewConnectivityGroup(makerConnectivity, hedgerConnectivity)
connGroup := types.NewConnectivityGroup(s.makerConnectivity, s.hedgerConnectivity)
s.connectivityGroup = connGroup
if s.RecoverTrade {
@ -593,10 +633,12 @@ func (s *Strategy) CrossRun(
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
bbgo.Notify("Shutting down %s: %s", ID, s.Symbol)
close(s.stopC)
// wait for the quoter to stop
time.Sleep(s.UpdateInterval.Duration())
time.Sleep(s.FastLayerUpdateInterval.Duration())
if err := s.MakerOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("graceful cancel %s order error", s.Symbol)
@ -610,8 +652,13 @@ func (s *Strategy) CrossRun(
log.WithError(err).Errorf("unable to cancel all orders")
}
// process collected trades
s.HedgeOrderExecutor.TradeCollector().Process()
s.MakerOrderExecutor.TradeCollector().Process()
bbgo.Sync(ctx, s)
bbgo.Notify("%s: %s position", ID, s.Symbol, s.Position)
bbgo.Notify("Shutdown %s: %s position", ID, s.Symbol, s.Position)
})
return nil
@ -635,11 +682,19 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) error {
quantity = fixedpoint.Min(s.HedgeMaxOrderQuantity, quantity)
}
defer func() {
s.fullReplenishTriggerC.Emit()
}()
switch s.HedgeStrategy {
case HedgeStrategyMarket:
return s.executeHedgeMarket(ctx, side, quantity)
case HedgeStrategyBboCounterParty1:
return s.executeHedgeBboCounterParty1(ctx, side, quantity)
return s.executeHedgeBboCounterPartyWithIndex(ctx, side, 1, quantity)
case HedgeStrategyBboCounterParty3:
return s.executeHedgeBboCounterPartyWithIndex(ctx, side, 3, quantity)
case HedgeStrategyBboCounterParty5:
return s.executeHedgeBboCounterPartyWithIndex(ctx, side, 5, quantity)
case HedgeStrategyBboQueue1:
return s.executeHedgeBboQueue1(ctx, side, quantity)
default:
@ -647,14 +702,17 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) error {
}
}
func (s *Strategy) executeHedgeBboCounterParty1(
func (s *Strategy) executeHedgeBboCounterPartyWithIndex(
ctx context.Context,
side types.SideType,
idx int,
quantity fixedpoint.Value,
) error {
price := s.lastSourcePrice.Get()
if sourcePrice := s.getSourceBboPrice(side.Reverse()); sourcePrice.Sign() > 0 {
price = sourcePrice
sideBook := s.sourceBook.SideBook(side.Reverse())
if pv, ok := sideBook.ElemOrLast(idx); ok {
price = pv.Price
}
if price.IsZero() {
@ -1068,6 +1126,15 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
return
}
// if it's disconnected or context is canceled, then return
select {
case <-ctx.Done():
return
case <-s.makerConnectivity.DisconnectedC():
return
default:
}
bestBid, bestAsk, hasPrice := s.sourceBook.BestBidAndAsk()
if !hasPrice {
return
@ -1126,7 +1193,7 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
_, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil {
s.logger.WithError(err).Errorf("order error: %s", err.Error())
s.logger.WithError(err).Errorf("submit order error: %s", err.Error())
return
}
}
@ -1141,10 +1208,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.Exchange
return nil
}
log.Infof("found existing open orders:")
types.OrderSlice(openOrders).Print()
return session.Exchange.CancelOrders(ctx, openOrders...)
return tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, s.Symbol, openOrders)
}
func selectSessions2(

View File

@ -1398,7 +1398,6 @@ func (s *Strategy) CrossRun(
// initialize the price resolver
sourceMarkets := s.sourceSession.Markets()
s.priceSolver = pricesolver.NewSimplePriceResolver(sourceMarkets)
makerSession, ok := sessions[s.MakerExchange]
if !ok {
@ -1475,6 +1474,9 @@ func (s *Strategy) CrossRun(
})
}
s.priceSolver = pricesolver.NewSimplePriceResolver(sourceMarkets)
s.priceSolver.BindStream(s.sourceSession.MarketDataStream)
s.sourceSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(k types.KLine) {
s.priceSolver.Update(k.Symbol, k.Close)
feeToken := s.sourceSession.Exchange.PlatformFeeCurrency()

View File

@ -231,7 +231,7 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
ticker := time.NewTimer(e.updateInterval)
defer ticker.Stop()
monitor := NewBboMonitor()
monitor := bbgo.NewBboMonitor()
for {
select {

View File

@ -166,6 +166,12 @@ func (c *Connectivity) ConnectedC() chan struct{} {
return c.connectedC
}
func (c *Connectivity) DisconnectedC() chan struct{} {
c.mu.Lock()
defer c.mu.Unlock()
return c.disconnectedC
}
func (c *Connectivity) Bind(stream Stream) {
stream.OnConnect(c.handleConnect)
stream.OnDisconnect(c.handleDisconnect)

View File

@ -79,6 +79,19 @@ func (slice PriceVolumeSlice) First() (PriceVolume, bool) {
return PriceVolume{}, false
}
// ElemOrLast returns the element on the index i, if i is out of range, it will return the last element
func (slice PriceVolumeSlice) ElemOrLast(i int) (PriceVolume, bool) {
if len(slice) == 0 {
return PriceVolume{}, false
}
if i > len(slice)-1 {
return slice[len(slice)-1], true
}
return slice[i], true
}
func (slice PriceVolumeSlice) IndexByQuoteVolumeDepth(requiredQuoteVolume fixedpoint.Value) int {
var totalQuoteVolume = fixedpoint.Zero
for x, pv := range slice {

View File

@ -18,6 +18,22 @@ type Ticker struct {
Sell fixedpoint.Value // `sell` from Max, `askPrice` from binance
}
func (t *Ticker) GetValidPrice() fixedpoint.Value {
if !t.Last.IsZero() {
return t.Last
}
if !t.Buy.IsZero() {
return t.Buy
}
if !t.Sell.IsZero() {
return t.Sell
}
return t.Open
}
func (t *Ticker) String() string {
return fmt.Sprintf("O:%s H:%s L:%s LAST:%s BID/ASK:%s/%s TIME:%s", t.Open, t.High, t.Low, t.Last, t.Buy, t.Sell, t.Time.String())
}