bbgo_origin/pkg/strategy/deposit2transfer/strategy.go

270 lines
7.1 KiB
Go
Raw Permalink Normal View History

2023-08-05 11:03:14 +00:00
package deposit2transfer
import (
"context"
"errors"
"fmt"
2023-08-07 01:51:07 +00:00
"sort"
"sync"
"time"
2023-08-05 11:03:14 +00:00
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
2023-08-05 11:03:14 +00:00
"github.com/c9s/bbgo/pkg/bbgo"
2023-08-07 01:51:07 +00:00
"github.com/c9s/bbgo/pkg/fixedpoint"
2023-08-05 11:03:14 +00:00
"github.com/c9s/bbgo/pkg/types"
)
2023-08-07 01:51:07 +00:00
type marginTransferService interface {
TransferMarginAccountAsset(ctx context.Context, asset string, amount fixedpoint.Value, io types.TransferDirection) error
}
type spotAccountQueryService interface {
QuerySpotAccount(ctx context.Context) (*types.Account, error)
}
2023-08-05 11:03:14 +00:00
const ID = "deposit2transfer"
var log = logrus.WithField("strategy", ID)
2023-08-07 01:51:07 +00:00
var errMarginTransferNotSupport = errors.New("exchange session does not support margin transfer")
var errDepositHistoryNotSupport = errors.New("exchange session does not support deposit history query")
2023-08-05 11:03:14 +00:00
func init() {
// Register the pointer of the strategy struct,
// so that bbgo knows what struct to be used to unmarshal the configs (YAML or JSON)
// Note: built-in strategies need to imported manually in the bbgo cmd package.
bbgo.RegisterStrategy(ID, &Strategy{})
}
type Strategy struct {
Environment *bbgo.Environment
Assets []string `json:"assets"`
2023-08-07 01:51:07 +00:00
2023-08-08 04:38:23 +00:00
Interval types.Duration `json:"interval"`
2023-08-05 11:03:14 +00:00
2023-08-07 01:51:07 +00:00
marginTransferService marginTransferService
depositHistoryService types.ExchangeTransferService
2023-08-05 11:03:14 +00:00
session *bbgo.ExchangeSession
2023-08-07 01:51:07 +00:00
watchingDeposits map[string]types.Deposit
mu sync.Mutex
logger logrus.FieldLogger
lastAssetDepositTimes map[string]time.Time
2023-08-05 11:03:14 +00:00
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {}
func (s *Strategy) Defaults() error {
2023-08-08 04:38:23 +00:00
if s.Interval == 0 {
s.Interval = types.Duration(5 * time.Minute)
2023-08-05 11:03:14 +00:00
}
if s.logger == nil {
s.logger = log.Dup()
}
2023-08-05 11:03:14 +00:00
return nil
}
func (s *Strategy) Validate() error {
return nil
}
func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s-%s", ID, s.Assets)
2023-08-05 11:03:14 +00:00
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
s.session = session
2023-08-07 01:51:07 +00:00
s.watchingDeposits = make(map[string]types.Deposit)
s.lastAssetDepositTimes = make(map[string]time.Time)
s.logger = s.logger.WithField("exchange", session.ExchangeName)
2023-08-07 01:51:07 +00:00
2023-08-05 11:03:14 +00:00
var ok bool
2023-08-07 01:51:07 +00:00
s.marginTransferService, ok = session.Exchange.(marginTransferService)
if !ok {
return errMarginTransferNotSupport
}
2023-08-05 11:03:14 +00:00
2023-08-07 01:51:07 +00:00
s.depositHistoryService, ok = session.Exchange.(types.ExchangeTransferService)
if !ok {
return errDepositHistoryNotSupport
}
2023-08-05 11:03:14 +00:00
2023-08-08 04:38:23 +00:00
session.UserDataStream.OnStart(func() {
go s.tickWatcher(ctx, s.Interval.Duration())
})
2023-08-07 01:51:07 +00:00
return nil
}
2023-08-05 11:03:14 +00:00
2023-08-07 01:51:07 +00:00
func (s *Strategy) tickWatcher(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
s.checkDeposits(ctx)
2023-08-08 04:38:23 +00:00
2023-08-07 01:51:07 +00:00
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.checkDeposits(ctx)
}
}
}
func (s *Strategy) checkDeposits(ctx context.Context) {
accountLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1)
for _, asset := range s.Assets {
logger := s.logger.WithField("asset", asset)
logger.Debugf("checking %s deposits...", asset)
succeededDeposits, err := s.scanDepositHistory(ctx, asset, 4*time.Hour)
if err != nil {
logger.WithError(err).Errorf("unable to scan deposit history")
return
}
if len(succeededDeposits) == 0 {
logger.Debugf("no %s deposit found", asset)
2023-08-08 04:38:23 +00:00
continue
}
for _, d := range succeededDeposits {
logger.Infof("found succeeded %s deposit: %+v", asset, d)
if err2 := accountLimiter.Wait(ctx); err2 != nil {
logger.WithError(err2).Errorf("rate limiter error")
return
}
// we can't use the account from margin
amount := d.Amount
if service, ok := s.session.Exchange.(spotAccountQueryService); ok {
account, err2 := service.QuerySpotAccount(ctx)
if err2 != nil {
logger.WithError(err2).Errorf("unable to query spot account")
continue
}
if bal, ok := account.Balance(d.Asset); ok {
logger.Infof("spot account balance %s: %+v", d.Asset, bal)
amount = fixedpoint.Min(bal.Available, amount)
} else {
logger.Errorf("unexpected error: %s balance not found", d.Asset)
}
}
bbgo.Notify("Found succeeded deposit %s %s, transferring %s %s into the margin account",
d.Amount.String(), d.Asset,
amount.String(), d.Asset)
if err2 := s.marginTransferService.TransferMarginAccountAsset(ctx, d.Asset, amount, types.TransferIn); err2 != nil {
logger.WithError(err2).Errorf("unable to transfer deposit asset into the margin account")
2023-08-07 01:51:07 +00:00
}
2023-08-05 11:03:14 +00:00
}
2023-08-07 01:51:07 +00:00
}
}
2023-08-05 11:03:14 +00:00
func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duration time.Duration) ([]types.Deposit, error) {
logger := s.logger.WithField("asset", asset)
logger.Debugf("scanning %s deposit history...", asset)
2023-08-07 01:51:07 +00:00
now := time.Now()
since := now.Add(-duration)
deposits, err := s.depositHistoryService.QueryDepositHistory(ctx, asset, since, now)
2023-08-07 01:51:07 +00:00
if err != nil {
return nil, err
2023-08-05 11:03:14 +00:00
}
// sort the recent deposit records in ascending order
2023-08-07 01:51:07 +00:00
sort.Slice(deposits, func(i, j int) bool {
return deposits[i].Time.Time().Before(deposits[j].Time.Time())
})
s.mu.Lock()
2023-08-08 04:38:23 +00:00
defer s.mu.Unlock()
2023-08-07 01:51:07 +00:00
for _, deposit := range deposits {
logger.Debugf("checking deposit: %+v", deposit)
if deposit.Asset != asset {
continue
}
2023-08-07 01:51:07 +00:00
if _, ok := s.watchingDeposits[deposit.TransactionID]; ok {
// if the deposit record is in the watch list, update it
s.watchingDeposits[deposit.TransactionID] = deposit
} else {
switch deposit.Status {
2023-08-08 04:38:23 +00:00
2023-08-07 01:51:07 +00:00
case types.DepositSuccess:
if depositTime, ok := s.lastAssetDepositTimes[asset]; ok {
// if it's newer than the latest deposit time, then we just add it the monitoring list
if deposit.Time.After(depositTime) {
logger.Infof("adding new success deposit: %s", deposit.TransactionID)
s.watchingDeposits[deposit.TransactionID] = deposit
}
} else {
// ignore all initial deposit history that are already success
logger.Infof("ignored succeess deposit: %s %+v", deposit.TransactionID, deposit)
}
2023-08-07 01:51:07 +00:00
case types.DepositCredited, types.DepositPending:
logger.Infof("adding pending deposit: %s", deposit.TransactionID)
2023-08-07 01:51:07 +00:00
s.watchingDeposits[deposit.TransactionID] = deposit
}
}
}
2023-08-05 11:03:14 +00:00
if len(deposits) > 0 {
lastDeposit := deposits[len(deposits)-1]
if lastTime, ok := s.lastAssetDepositTimes[asset]; ok {
s.lastAssetDepositTimes[asset] = later(lastDeposit.Time.Time(), lastTime)
} else {
s.lastAssetDepositTimes[asset] = lastDeposit.Time.Time()
}
}
2023-08-07 01:51:07 +00:00
var succeededDeposits []types.Deposit
for _, deposit := range s.watchingDeposits {
2023-08-07 01:51:07 +00:00
if deposit.Status == types.DepositSuccess {
logger.Infof("found pending -> success deposit: %+v", deposit)
current, required := deposit.GetCurrentConfirmation()
if required > 0 && deposit.UnlockConfirm > 0 && current < deposit.UnlockConfirm {
logger.Infof("deposit %s unlock confirm %d is not reached, current: %d, required: %d, skip this round", deposit.TransactionID, deposit.UnlockConfirm, current, required)
continue
}
2023-08-07 01:51:07 +00:00
succeededDeposits = append(succeededDeposits, deposit)
delete(s.watchingDeposits, deposit.TransactionID)
}
2023-08-05 11:03:14 +00:00
}
2023-08-07 01:51:07 +00:00
return succeededDeposits, nil
2023-08-05 11:03:14 +00:00
}
func later(a, b time.Time) time.Time {
if a.After(b) {
return a
}
return b
}