package deposit2transfer import ( "context" "errors" "fmt" "sort" "sync" "time" "github.com/sirupsen/logrus" "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/types" ) type marginTransferService interface { TransferMarginAccountAsset(ctx context.Context, asset string, amount fixedpoint.Value, io types.TransferDirection) error } type spotAccountQueryService interface { QuerySpotAccount(ctx context.Context) (*types.Account, error) } const ID = "deposit2transfer" var log = logrus.WithField("strategy", ID) var errMarginTransferNotSupport = errors.New("exchange session does not support margin transfer") var errDepositHistoryNotSupport = errors.New("exchange session does not support deposit history query") func init() { // Register the pointer of the strategy struct, // so that bbgo knows what struct to be used to unmarshal the configs (YAML or JSON) // Note: built-in strategies need to imported manually in the bbgo cmd package. bbgo.RegisterStrategy(ID, &Strategy{}) } type SlackAlert struct { Channel string `json:"channel"` Mentions []string `json:"mentions"` } type Strategy struct { Environment *bbgo.Environment Assets []string `json:"assets"` Interval types.Duration `json:"interval"` TransferDelay types.Duration `json:"transferDelay"` SlackAlert *SlackAlert `json:"slackAlert"` marginTransferService marginTransferService depositHistoryService types.ExchangeTransferService session *bbgo.ExchangeSession watchingDeposits map[string]types.Deposit mu sync.Mutex logger logrus.FieldLogger lastAssetDepositTimes map[string]time.Time } func (s *Strategy) ID() string { return ID } func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {} func (s *Strategy) Defaults() error { if s.Interval == 0 { s.Interval = types.Duration(5 * time.Minute) } if s.TransferDelay == 0 { s.TransferDelay = types.Duration(3 * time.Second) } return nil } func (s *Strategy) Initialize() error { if s.logger == nil { s.logger = log.Dup() } return nil } func (s *Strategy) Validate() error { return nil } func (s *Strategy) InstanceID() string { return fmt.Sprintf("%s-%s", ID, s.Assets) } func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { s.session = session s.watchingDeposits = make(map[string]types.Deposit) s.lastAssetDepositTimes = make(map[string]time.Time) s.logger = s.logger.WithField("exchange", session.ExchangeName) var ok bool s.marginTransferService, ok = session.Exchange.(marginTransferService) if !ok { return errMarginTransferNotSupport } s.depositHistoryService, ok = session.Exchange.(types.ExchangeTransferService) if !ok { return errDepositHistoryNotSupport } session.UserDataStream.OnStart(func() { go s.tickWatcher(ctx, s.Interval.Duration()) }) return nil } func (s *Strategy) tickWatcher(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() s.checkDeposits(ctx) for { select { case <-ctx.Done(): return case <-ticker.C: s.checkDeposits(ctx) } } } func (s *Strategy) checkDeposits(ctx context.Context) { accountLimiter := rate.NewLimiter(rate.Every(5*time.Second), 1) for _, asset := range s.Assets { logger := s.logger.WithField("asset", asset) logger.Debugf("checking %s deposits...", asset) succeededDeposits, err := s.scanDepositHistory(ctx, asset, 4*time.Hour) if err != nil { logger.WithError(err).Errorf("unable to scan deposit history") return } if len(succeededDeposits) == 0 { logger.Debugf("no %s deposit found", asset) continue } else { logger.Infof("found %d %s deposits", len(succeededDeposits), asset) } if s.TransferDelay > 0 { logger.Infof("delaying transfer for %s...", s.TransferDelay.Duration()) time.Sleep(s.TransferDelay.Duration()) } for _, d := range succeededDeposits { logger.Infof("found succeeded %s deposit: %+v", asset, d) if err2 := accountLimiter.Wait(ctx); err2 != nil { logger.WithError(err2).Errorf("rate limiter error") return } // we can't use the account from margin amount := d.Amount if service, ok := s.session.Exchange.(spotAccountQueryService); ok { var account *types.Account err = retry.GeneralBackoff(ctx, func() (err error) { account, err = service.QuerySpotAccount(ctx) return err }) if err != nil || account == nil { logger.WithError(err).Errorf("unable to query spot account") continue } bal, ok := account.Balance(d.Asset) if ok { logger.Infof("spot account balance %s: %+v", d.Asset, bal) amount = fixedpoint.Min(bal.Available, amount) } else { logger.Errorf("unexpected error: %s balance not found", d.Asset) } if amount.IsZero() || amount.Sign() < 0 { bbgo.Notify("Found succeeded deposit %s %s, but the balance %s %s is insufficient, skip transferring", d.Amount.String(), d.Asset, bal.Available.String(), bal.Currency) continue } } bbgo.Notify("Found succeeded deposit %s %s, transferring %s %s into the margin account", d.Amount.String(), d.Asset, amount.String(), d.Asset) if s.SlackAlert != nil { bbgo.PostLiveNote(&d, livenote.Channel(s.SlackAlert.Channel), livenote.Comment(fmt.Sprintf("Transferring deposit asset %s %s into the margin account", amount.String(), d.Asset)), ) } err2 := retry.GeneralBackoff(ctx, func() error { return s.marginTransferService.TransferMarginAccountAsset(ctx, d.Asset, amount, types.TransferIn) }) if err2 != nil { logger.WithError(err2).Errorf("unable to transfer deposit asset into the margin account") if s.SlackAlert != nil { bbgo.PostLiveNote(&d, livenote.Channel(s.SlackAlert.Channel), livenote.Comment(fmt.Sprintf("Margin account transfer error: %+v", err2)), ) } } } } } func (s *Strategy) addWatchingDeposit(deposit types.Deposit) { s.watchingDeposits[deposit.TransactionID] = deposit if s.SlackAlert != nil { bbgo.PostLiveNote(&deposit, livenote.Channel(s.SlackAlert.Channel), livenote.CompareObject(true), livenote.OneTimeMention(s.SlackAlert.Mentions...), ) } } func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duration time.Duration) ([]types.Deposit, error) { logger := s.logger.WithField("asset", asset) logger.Debugf("scanning %s deposit history...", asset) now := time.Now() since := now.Add(-duration) var deposits []types.Deposit err := retry.GeneralBackoff(ctx, func() (err error) { deposits, err = s.depositHistoryService.QueryDepositHistory(ctx, asset, since, now) return err }) if err != nil { return nil, err } // sort the recent deposit records in ascending order sort.Slice(deposits, func(i, j int) bool { return deposits[i].Time.Time().Before(deposits[j].Time.Time()) }) s.mu.Lock() defer s.mu.Unlock() // update the watching deposits for _, deposit := range deposits { logger.Debugf("checking deposit: %+v", deposit) if deposit.Asset != asset { continue } // if the deposit record is already in the watch list, update it if _, ok := s.watchingDeposits[deposit.TransactionID]; ok { s.addWatchingDeposit(deposit) } else { // if the deposit record is not in the watch list, we need to check the status // here the deposit is outside the watching list switch deposit.Status { case types.DepositSuccess: // if the deposit is in success status, we need to check if it's newer than the latest deposit time // this usually happens when the deposit is credited to the account very quickly if depositTime, ok := s.lastAssetDepositTimes[asset]; ok { // if it's newer than the latest deposit time, then we just add it the monitoring list if deposit.Time.After(depositTime) { logger.Infof("adding new success deposit: %s", deposit.TransactionID) s.addWatchingDeposit(deposit) } } else { // ignore all initial deposits that are already in success status logger.Infof("ignored succeess deposit: %s %+v", deposit.TransactionID, deposit) } case types.DepositCredited, types.DepositPending: logger.Infof("adding pending deposit: %s", deposit.TransactionID) s.addWatchingDeposit(deposit) } } } if len(deposits) > 0 { lastDeposit := deposits[len(deposits)-1] if lastTime, ok := s.lastAssetDepositTimes[asset]; ok { s.lastAssetDepositTimes[asset] = later(lastDeposit.Time.Time(), lastTime) } else { s.lastAssetDepositTimes[asset] = lastDeposit.Time.Time() } } var succeededDeposits []types.Deposit // find the succeeded deposits for _, deposit := range s.watchingDeposits { switch deposit.Status { case types.DepositSuccess: logger.Infof("found pending -> success deposit: %+v", deposit) current, required := deposit.GetCurrentConfirmation() if required > 0 && deposit.UnlockConfirm > 0 && current < deposit.UnlockConfirm { logger.Infof("deposit %s unlock confirm %d is not reached, current: %d, required: %d, skip this round", deposit.TransactionID, deposit.UnlockConfirm, current, required) continue } succeededDeposits = append(succeededDeposits, deposit) delete(s.watchingDeposits, deposit.TransactionID) } } return succeededDeposits, nil } func later(a, b time.Time) time.Time { if a.After(b) { return a } return b }