adjust state and reset per day

This commit is contained in:
c9s 2021-03-22 18:48:18 +08:00
parent 6c8babfb27
commit 5de221524f
2 changed files with 70 additions and 16 deletions

View File

@ -55,7 +55,9 @@ crossExchangeStrategies:
sourceExchange: binance
tradingExchange: max
updateInterval: 5s
dailyFeeBudget: 100
dailyMaxVolume: 100
dailyFeeBudgets:
MAX: 100
persistence:
type: redis

View File

@ -31,7 +31,24 @@ func (s *Strategy) ID() string {
}
type State struct {
AccumulativeFees map[string]fixedpoint.Value
AccumulatedFeeStartedAt time.Time `json:"accumulatedFeeStartedAt,omitempty"`
AccumulatedFees map[string]fixedpoint.Value `json:"accumulatedFees,omitempty"`
AccumulatedVolume fixedpoint.Value `json:"accumulatedVolume,omitempty"`
}
func (s *State) IsOver24Hours() bool {
return time.Now().Sub(s.AccumulatedFeeStartedAt) >= 24*time.Hour
}
func (s *State) Reset() {
t := time.Now()
dateTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
log.Infof("resetting accumulated started time to: %s", dateTime)
s.AccumulatedFeeStartedAt = dateTime
s.AccumulatedFees = make(map[string]fixedpoint.Value)
s.AccumulatedVolume = 0
}
type Strategy struct {
@ -43,8 +60,9 @@ type Strategy struct {
SourceExchange string `json:"sourceExchange"`
TradingExchange string `json:"tradingExchange"`
DailyFeeBudget fixedpoint.Value `json:"dailyFeeBudget"`
UpdateInterval types.Duration `json:"updateInterval"`
DailyFeeBudgets map[string]fixedpoint.Value `json:"dailyFeeBudgets,omitempty"`
DailyMaxVolume fixedpoint.Value `json:"dailyMaxVolume,omitempty"`
UpdateInterval types.Duration `json:"updateInterval"`
sourceSession, tradingSession *bbgo.ExchangeSession
sourceMarket, tradingMarket types.Market
@ -59,6 +77,27 @@ type Strategy struct {
stopC chan struct{}
}
func (s *Strategy) isBudgetAllowed() bool {
if s.DailyFeeBudgets == nil {
return true
}
if s.state.AccumulatedFees == nil {
return true
}
for asset, budget := range s.DailyFeeBudgets {
if fee, ok := s.state.AccumulatedFees[asset]; ok {
if fee >= budget {
log.Warnf("accumulative fee %f exceeded the fee budget %f, skipping...", fee.Float64(), budget.Float64())
return false
}
}
}
return true
}
func (s *Strategy) handleTradeUpdate(trade types.Trade) {
log.Infof("received trade %+v", trade)
@ -66,13 +105,18 @@ func (s *Strategy) handleTradeUpdate(trade types.Trade) {
return
}
if s.state.AccumulativeFees == nil {
s.state.AccumulativeFees = make(map[string]fixedpoint.Value)
if s.state.IsOver24Hours() {
s.state.Reset()
}
s.state.AccumulativeFees[trade.FeeCurrency] += fixedpoint.NewFromFloat(trade.Fee)
// safe check
if s.state.AccumulatedFees == nil {
s.state.AccumulatedFees = make(map[string]fixedpoint.Value)
}
log.Infof("accumulative fee: %f %s", s.state.AccumulativeFees[trade.FeeCurrency].Float64(), trade.FeeCurrency)
s.state.AccumulatedFees[trade.FeeCurrency] += fixedpoint.NewFromFloat(trade.Fee)
s.state.AccumulatedVolume += fixedpoint.NewFromFloat(trade.Quantity)
log.Infof("accumulated fee: %f %s", s.state.AccumulatedFees[trade.FeeCurrency].Float64(), trade.FeeCurrency)
}
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
@ -92,7 +136,7 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
tradingSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
}
func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
if s.UpdateInterval == 0 {
s.UpdateInterval = types.Duration(time.Second)
}
@ -123,18 +167,22 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
var state State
// load position
if err := s.Persistence.Load(&state, stateKey); err != nil {
if err := s.Persistence.Load(&state, ID, stateKey); err != nil {
if err != service.ErrPersistenceNotExists {
return err
}
s.state = &State{
AccumulativeFees: make(map[string]fixedpoint.Value),
}
s.state = &State{}
s.state.Reset()
} else {
// loaded successfully
s.state = &state
log.Infof("state is restored: %+v", s.state)
if s.state.IsOver24Hours() {
log.Warn("state is over 24 hours, resetting to zero")
s.state.Reset()
}
}
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
@ -142,7 +190,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
close(s.stopC)
if err := s.Persistence.Save(&s.state, stateKey); err != nil {
if err := s.Persistence.Save(&s.state, ID, stateKey); err != nil {
log.WithError(err).Errorf("can not save state: %+v", s.state)
} else {
log.Infof("state is saved => %+v", s.state)
@ -182,6 +230,10 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
return
case <-ticker.C:
if !s.isBudgetAllowed() {
continue
}
sourceBook := s.sourceBook.Get()
book := s.tradingBook.Get()
bestBid, hasBid := book.BestBid()
@ -200,8 +252,8 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
bestAsk, hasAsk = sourceBook.BestAsk()
}
// if the spread is less than 10 ticks (10 pips), skip
if spread.Float64() < 10*s.tradingMarket.TickSize {
// if the spread is less than 100 ticks (100 pips), skip
if spread.Float64() < 100*s.tradingMarket.TickSize {
log.Warnf("spread too small, we can't place orders: spread=%f bid=%f ask=%f", spread.Float64(), bestBid.Price.Float64(), bestAsk.Price.Float64())
continue
}