bbgo_origin/pkg/strategy/xalign/strategy.go

373 lines
9.9 KiB
Go
Raw Normal View History

2023-06-08 07:54:32 +00:00
package xalign
import (
"context"
"errors"
"fmt"
2023-06-08 10:05:58 +00:00
"strings"
"sync"
2023-06-08 07:54:32 +00:00
"time"
2023-06-21 09:36:09 +00:00
"github.com/sirupsen/logrus"
2023-06-08 07:54:32 +00:00
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
2023-06-08 07:54:32 +00:00
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
const ID = "xalign"
2023-06-21 09:36:09 +00:00
var log = logrus.WithField("strategy", ID)
2023-06-08 07:54:32 +00:00
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
2023-06-21 07:56:59 +00:00
type TimeBalance struct {
types.Balance
Time time.Time
}
2023-06-08 07:54:32 +00:00
type QuoteCurrencyPreference struct {
Buy []string `json:"buy"`
Sell []string `json:"sell"`
}
type Strategy struct {
*bbgo.Environment
Interval types.Interval `json:"interval"`
PreferredSessions []string `json:"sessions"`
PreferredQuoteCurrencies *QuoteCurrencyPreference `json:"quoteCurrencies"`
ExpectedBalances map[string]fixedpoint.Value `json:"expectedBalances"`
UseTakerOrder bool `json:"useTakerOrder"`
DryRun bool `json:"dryRun"`
2023-06-21 07:59:15 +00:00
BalanceToleranceRange fixedpoint.Value `json:"balanceToleranceRange"`
2023-06-21 07:56:59 +00:00
Duration types.Duration `json:"for"`
faultBalanceRecords map[string][]TimeBalance
2023-06-08 07:54:32 +00:00
2023-06-13 04:27:38 +00:00
sessions map[string]*bbgo.ExchangeSession
2023-06-13 04:22:43 +00:00
orderBooks map[string]*bbgo.ActiveOrderBook
2023-06-13 15:23:41 +00:00
orderStore *core.OrderStore
2023-06-08 07:54:32 +00:00
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) InstanceID() string {
2023-06-08 10:05:58 +00:00
var cs []string
for cur := range s.ExpectedBalances {
cs = append(cs, cur)
}
return ID + strings.Join(s.PreferredSessions, "-") + strings.Join(cs, "-")
2023-06-08 07:54:32 +00:00
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
// session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
}
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
}
2023-06-21 07:56:59 +00:00
func (s *Strategy) Defaults() error {
2023-06-21 07:59:15 +00:00
s.BalanceToleranceRange = fixedpoint.NewFromFloat(0.01)
2023-06-21 07:56:59 +00:00
return nil
}
2023-06-08 07:54:32 +00:00
func (s *Strategy) Validate() error {
if s.PreferredQuoteCurrencies == nil {
return errors.New("quoteCurrencies is not defined")
}
return nil
}
func (s *Strategy) aggregateBalances(ctx context.Context, sessions map[string]*bbgo.ExchangeSession) (totalBalances types.BalanceMap, sessionBalances map[string]types.BalanceMap) {
totalBalances = make(types.BalanceMap)
sessionBalances = make(map[string]types.BalanceMap)
// iterate the sessions and record them
for sessionName, session := range sessions {
// update the account balances and the margin information
if _, err := session.UpdateAccount(ctx); err != nil {
log.WithError(err).Errorf("can not update account")
return
}
account := session.GetAccount()
balances := account.Balances()
sessionBalances[sessionName] = balances
totalBalances = totalBalances.Add(balances)
}
return totalBalances, sessionBalances
}
func (s *Strategy) selectSessionForCurrency(ctx context.Context, sessions map[string]*bbgo.ExchangeSession, currency string, changeQuantity fixedpoint.Value) (*bbgo.ExchangeSession, *types.SubmitOrder) {
for _, sessionName := range s.PreferredSessions {
session := sessions[sessionName]
var taker = s.UseTakerOrder
2023-06-08 07:54:32 +00:00
var side types.SideType
var quoteCurrencies []string
if changeQuantity.Sign() > 0 {
quoteCurrencies = s.PreferredQuoteCurrencies.Buy
side = types.SideTypeBuy
} else {
quoteCurrencies = s.PreferredQuoteCurrencies.Sell
side = types.SideTypeSell
}
for _, quoteCurrency := range quoteCurrencies {
symbol := currency + quoteCurrency
market, ok := session.Market(symbol)
if !ok {
continue
}
ticker, err := session.Exchange.QueryTicker(ctx, symbol)
if err != nil {
log.WithError(err).Errorf("unable to query ticker on %s", symbol)
continue
}
2023-06-13 05:44:31 +00:00
spread := ticker.Sell.Sub(ticker.Buy)
2023-06-08 07:54:32 +00:00
// changeQuantity > 0 = buy
// changeQuantity < 0 = sell
q := changeQuantity.Abs()
// a fast filtering
2023-06-13 05:49:22 +00:00
if q.Compare(market.MinQuantity) < 0 {
log.Debugf("skip dust quantity: %f", q.Float64())
2023-06-13 05:49:22 +00:00
continue
}
2023-06-13 05:40:39 +00:00
log.Infof("%s changeQuantity: %f ticker: %+v market: %+v", symbol, changeQuantity.Float64(), ticker, market)
2023-06-08 07:54:32 +00:00
switch side {
case types.SideTypeBuy:
price := ticker.Sell
if taker {
price = ticker.Sell
2023-06-13 05:44:31 +00:00
} else if spread.Compare(market.TickSize) > 0 {
price = ticker.Sell.Sub(market.TickSize)
2023-06-08 07:54:32 +00:00
} else {
price = ticker.Buy
}
quoteBalance, ok := session.Account.Balance(quoteCurrency)
if !ok {
continue
}
requiredQuoteAmount := q.Mul(price)
2023-06-09 03:04:31 +00:00
requiredQuoteAmount = requiredQuoteAmount.Round(market.PricePrecision, fixedpoint.Up)
2023-06-13 04:42:07 +00:00
if requiredQuoteAmount.Compare(quoteBalance.Available) > 0 {
log.Warnf("required quote amount %f > quote balance %v, skip", requiredQuoteAmount.Float64(), quoteBalance)
2023-06-08 07:54:32 +00:00
continue
}
if quantity, ok := market.GreaterThanMinimalOrderQuantity(side, price, requiredQuoteAmount); ok {
return session, &types.SubmitOrder{
Symbol: symbol,
Side: side,
Type: types.OrderTypeLimit,
Quantity: quantity,
Price: price,
Market: market,
TimeInForce: types.TimeInForceGTC,
}
2023-06-08 07:54:32 +00:00
}
case types.SideTypeSell:
price := ticker.Buy
if taker {
price = ticker.Buy
2023-06-13 05:44:31 +00:00
} else if spread.Compare(market.TickSize) > 0 {
price = ticker.Buy.Add(market.TickSize)
2023-06-08 07:54:32 +00:00
} else {
price = ticker.Sell
}
baseBalance, ok := session.Account.Balance(currency)
if !ok {
continue
}
if q.Compare(baseBalance.Available) > 0 {
log.Warnf("required base amount %f < available base balance %v, skip", q.Float64(), baseBalance)
continue
2023-06-08 07:54:32 +00:00
}
if quantity, ok := market.GreaterThanMinimalOrderQuantity(side, price, q); ok {
return session, &types.SubmitOrder{
Symbol: symbol,
Side: side,
Type: types.OrderTypeLimit,
Quantity: quantity,
Price: price,
Market: market,
TimeInForce: types.TimeInForceGTC,
}
2023-06-08 07:54:32 +00:00
}
}
}
}
return nil, nil
}
func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
instanceID := s.InstanceID()
_ = instanceID
2023-06-21 07:56:59 +00:00
s.faultBalanceRecords = make(map[string][]TimeBalance)
2023-06-13 04:27:38 +00:00
s.sessions = make(map[string]*bbgo.ExchangeSession)
2023-06-13 04:22:43 +00:00
s.orderBooks = make(map[string]*bbgo.ActiveOrderBook)
2023-06-08 07:54:32 +00:00
s.orderStore = core.NewOrderStore("")
2023-06-13 15:23:41 +00:00
2023-06-08 07:54:32 +00:00
for _, sessionName := range s.PreferredSessions {
session, ok := sessions[sessionName]
if !ok {
return fmt.Errorf("incorrect preferred session name: %s is not defined", sessionName)
}
2023-06-13 15:23:41 +00:00
s.orderStore.BindStream(session.UserDataStream)
2023-06-08 07:54:32 +00:00
orderBook := bbgo.NewActiveOrderBook("")
orderBook.BindStream(session.UserDataStream)
2023-06-13 04:22:43 +00:00
s.orderBooks[sessionName] = orderBook
2023-06-13 04:27:38 +00:00
s.sessions[sessionName] = session
2023-06-08 07:54:32 +00:00
}
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for n, session := range s.sessions {
if ob, ok := s.orderBooks[n]; ok {
_ = ob.GracefulCancel(ctx, session.Exchange)
}
}
})
2023-06-08 07:54:32 +00:00
go func() {
2023-06-13 04:27:38 +00:00
s.align(ctx, s.sessions)
2023-06-08 07:54:32 +00:00
2023-06-08 08:00:43 +00:00
ticker := time.NewTicker(s.Interval.Duration())
2023-06-08 07:54:32 +00:00
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
2023-06-13 04:27:38 +00:00
s.align(ctx, s.sessions)
2023-06-08 07:54:32 +00:00
}
}
}()
return nil
}
2023-06-21 07:56:59 +00:00
func (s *Strategy) recordBalance(totalBalances types.BalanceMap) {
now := time.Now()
for currency, expectedBalance := range s.ExpectedBalances {
q := s.calculateRefillQuantity(totalBalances, currency, expectedBalance)
rf := q.Div(expectedBalance).Abs().Float64()
2023-06-21 07:59:15 +00:00
tr := s.BalanceToleranceRange.Float64()
if rf > tr {
2023-06-21 07:56:59 +00:00
balance := totalBalances[currency]
s.faultBalanceRecords[currency] = append(s.faultBalanceRecords[currency], TimeBalance{
Time: now,
Balance: balance,
})
} else {
// reset counter
s.faultBalanceRecords[currency] = nil
}
}
}
2023-06-08 07:54:32 +00:00
func (s *Strategy) align(ctx context.Context, sessions map[string]*bbgo.ExchangeSession) {
for sessionName, session := range sessions {
2023-06-13 04:22:43 +00:00
ob, ok := s.orderBooks[sessionName]
if !ok {
log.Errorf("orderbook on session %s not found", sessionName)
return
}
if ok {
if err := ob.GracefulCancel(ctx, session.Exchange); err != nil {
log.WithError(err).Errorf("can not cancel order")
}
2023-06-08 07:54:32 +00:00
}
}
2023-06-21 07:56:59 +00:00
totalBalances, sessionBalances := s.aggregateBalances(ctx, sessions)
_ = sessionBalances
s.recordBalance(totalBalances)
2023-06-08 07:54:32 +00:00
for currency, expectedBalance := range s.ExpectedBalances {
q := s.calculateRefillQuantity(totalBalances, currency, expectedBalance)
2023-06-21 07:56:59 +00:00
if s.Duration > 0 {
log.Infof("checking fault balance records...")
if faultBalance, ok := s.faultBalanceRecords[currency]; ok && len(faultBalance) > 0 {
if time.Since(faultBalance[0].Time) < s.Duration.Duration() {
log.Infof("%s fault record since: %s < persistence period %s", currency, faultBalance[0].Time, s.Duration.Duration())
continue
}
}
}
2023-06-08 07:54:32 +00:00
selectedSession, submitOrder := s.selectSessionForCurrency(ctx, sessions, currency, q)
if selectedSession != nil && submitOrder != nil {
log.Infof("placing order on %s: %+v", selectedSession.Name, submitOrder)
2023-06-08 07:54:32 +00:00
2023-06-13 15:21:07 +00:00
bbgo.Notify("Aligning position on exchange session %s, delta: %f", selectedSession.Name, q.Float64(), submitOrder)
2023-06-13 05:47:01 +00:00
if s.DryRun {
return
}
2023-06-08 07:54:32 +00:00
createdOrder, err := selectedSession.Exchange.SubmitOrder(ctx, *submitOrder)
if err != nil {
log.WithError(err).Errorf("can not place order")
return
}
if createdOrder != nil {
2023-06-13 04:24:25 +00:00
if ob, ok := s.orderBooks[selectedSession.Name]; ok {
ob.Add(*createdOrder)
} else {
log.Errorf("orderbook %s not found", selectedSession.Name)
}
2023-06-13 04:22:43 +00:00
s.orderBooks[selectedSession.Name].Add(*createdOrder)
2023-06-08 07:54:32 +00:00
}
}
}
}
func (s *Strategy) calculateRefillQuantity(totalBalances types.BalanceMap, currency string, expectedBalance fixedpoint.Value) fixedpoint.Value {
if b, ok := totalBalances[currency]; ok {
netBalance := b.Net()
return expectedBalance.Sub(netBalance)
}
return expectedBalance
}