2024-08-30 14:55:17 +00:00
package deposit2transfer
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"git.qtrade.icu/lychiyu/bbgo/pkg/bbgo"
"git.qtrade.icu/lychiyu/bbgo/pkg/exchange/retry"
"git.qtrade.icu/lychiyu/bbgo/pkg/fixedpoint"
"git.qtrade.icu/lychiyu/bbgo/pkg/types"
)
type marginTransferService interface {
TransferMarginAccountAsset ( ctx context . Context , asset string , amount fixedpoint . Value , io types . TransferDirection ) error
}
type spotAccountQueryService interface {
QuerySpotAccount ( ctx context . Context ) ( * types . Account , error )
}
const ID = "deposit2transfer"
var log = logrus . WithField ( "strategy" , ID )
var errMarginTransferNotSupport = errors . New ( "exchange session does not support margin transfer" )
var errDepositHistoryNotSupport = errors . New ( "exchange session does not support deposit history query" )
func init ( ) {
// Register the pointer of the strategy struct,
// so that bbgo knows what struct to be used to unmarshal the configs (YAML or JSON)
// Note: built-in strategies need to imported manually in the bbgo cmd package.
bbgo . RegisterStrategy ( ID , & Strategy { } )
}
type Strategy struct {
Environment * bbgo . Environment
Assets [ ] string ` json:"assets" `
Interval types . Duration ` json:"interval" `
marginTransferService marginTransferService
depositHistoryService types . ExchangeTransferService
session * bbgo . ExchangeSession
watchingDeposits map [ string ] types . Deposit
mu sync . Mutex
logger logrus . FieldLogger
lastAssetDepositTimes map [ string ] time . Time
}
func ( s * Strategy ) ID ( ) string {
return ID
}
func ( s * Strategy ) Subscribe ( session * bbgo . ExchangeSession ) { }
func ( s * Strategy ) Defaults ( ) error {
if s . Interval == 0 {
s . Interval = types . Duration ( 5 * time . Minute )
}
if s . logger == nil {
s . logger = log . Dup ( )
}
return nil
}
func ( s * Strategy ) Validate ( ) error {
return nil
}
func ( s * Strategy ) InstanceID ( ) string {
return fmt . Sprintf ( "%s-%s" , ID , s . Assets )
}
func ( s * Strategy ) Run ( ctx context . Context , orderExecutor bbgo . OrderExecutor , session * bbgo . ExchangeSession ) error {
s . session = session
s . watchingDeposits = make ( map [ string ] types . Deposit )
s . lastAssetDepositTimes = make ( map [ string ] time . Time )
s . logger = s . logger . WithField ( "exchange" , session . ExchangeName )
var ok bool
s . marginTransferService , ok = session . Exchange . ( marginTransferService )
if ! ok {
return errMarginTransferNotSupport
}
s . depositHistoryService , ok = session . Exchange . ( types . ExchangeTransferService )
if ! ok {
return errDepositHistoryNotSupport
}
session . UserDataStream . OnStart ( func ( ) {
go s . tickWatcher ( ctx , s . Interval . Duration ( ) )
} )
return nil
}
func ( s * Strategy ) tickWatcher ( ctx context . Context , interval time . Duration ) {
ticker := time . NewTicker ( interval )
defer ticker . Stop ( )
s . checkDeposits ( ctx )
for {
select {
case <- ctx . Done ( ) :
return
case <- ticker . C :
s . checkDeposits ( ctx )
}
}
}
func ( s * Strategy ) checkDeposits ( ctx context . Context ) {
accountLimiter := rate . NewLimiter ( rate . Every ( 3 * time . Second ) , 1 )
for _ , asset := range s . Assets {
logger := s . logger . WithField ( "asset" , asset )
logger . Debugf ( "checking %s deposits..." , asset )
succeededDeposits , err := s . scanDepositHistory ( ctx , asset , 4 * time . Hour )
if err != nil {
logger . WithError ( err ) . Errorf ( "unable to scan deposit history" )
return
}
if len ( succeededDeposits ) == 0 {
logger . Debugf ( "no %s deposit found" , asset )
continue
}
for _ , d := range succeededDeposits {
logger . Infof ( "found succeeded %s deposit: %+v" , asset , d )
if err2 := accountLimiter . Wait ( ctx ) ; err2 != nil {
logger . WithError ( err2 ) . Errorf ( "rate limiter error" )
return
}
// we can't use the account from margin
amount := d . Amount
if service , ok := s . session . Exchange . ( spotAccountQueryService ) ; ok {
var account * types . Account
err = retry . GeneralBackoff ( ctx , func ( ) ( err error ) {
account , err = service . QuerySpotAccount ( ctx )
return err
} )
if err != nil || account == nil {
logger . WithError ( err ) . Errorf ( "unable to query spot account" )
continue
}
if bal , ok := account . Balance ( d . Asset ) ; ok {
logger . Infof ( "spot account balance %s: %+v" , d . Asset , bal )
amount = fixedpoint . Min ( bal . Available , amount )
} else {
logger . Errorf ( "unexpected error: %s balance not found" , d . Asset )
}
}
bbgo . Notify ( "Found succeeded deposit %s %s, transferring %s %s into the margin account" ,
d . Amount . String ( ) , d . Asset ,
amount . String ( ) , d . Asset )
err2 := retry . GeneralBackoff ( ctx , func ( ) error {
return s . marginTransferService . TransferMarginAccountAsset ( ctx , d . Asset , amount , types . TransferIn )
} )
if err2 != nil {
logger . WithError ( err2 ) . Errorf ( "unable to transfer deposit asset into the margin account" )
}
}
}
}
func ( s * Strategy ) scanDepositHistory ( ctx context . Context , asset string , duration time . Duration ) ( [ ] types . Deposit , error ) {
logger := s . logger . WithField ( "asset" , asset )
logger . Debugf ( "scanning %s deposit history..." , asset )
now := time . Now ( )
since := now . Add ( - duration )
var deposits [ ] types . Deposit
err := retry . GeneralBackoff ( ctx , func ( ) ( err error ) {
deposits , err = s . depositHistoryService . QueryDepositHistory ( ctx , asset , since , now )
return err
} )
if err != nil {
return nil , err
}
// sort the recent deposit records in ascending order
sort . Slice ( deposits , func ( i , j int ) bool {
return deposits [ i ] . Time . Time ( ) . Before ( deposits [ j ] . Time . Time ( ) )
} )
s . mu . Lock ( )
defer s . mu . Unlock ( )
for _ , deposit := range deposits {
logger . Debugf ( "checking deposit: %+v" , deposit )
if deposit . Asset != asset {
continue
}
if _ , ok := s . watchingDeposits [ deposit . TransactionID ] ; ok {
// if the deposit record is in the watch list, update it
s . watchingDeposits [ deposit . TransactionID ] = deposit
} else {
switch deposit . Status {
case types . DepositSuccess :
if depositTime , ok := s . lastAssetDepositTimes [ asset ] ; ok {
// if it's newer than the latest deposit time, then we just add it the monitoring list
if deposit . Time . After ( depositTime ) {
logger . Infof ( "adding new success deposit: %s" , deposit . TransactionID )
s . watchingDeposits [ deposit . TransactionID ] = deposit
}
} else {
2024-09-04 03:19:43 +00:00
// ignore all initial deposits that are already in success status
2024-08-30 14:55:17 +00:00
logger . Infof ( "ignored succeess deposit: %s %+v" , deposit . TransactionID , deposit )
}
case types . DepositCredited , types . DepositPending :
logger . Infof ( "adding pending deposit: %s" , deposit . TransactionID )
s . watchingDeposits [ deposit . TransactionID ] = deposit
}
}
}
if len ( deposits ) > 0 {
lastDeposit := deposits [ len ( deposits ) - 1 ]
if lastTime , ok := s . lastAssetDepositTimes [ asset ] ; ok {
s . lastAssetDepositTimes [ asset ] = later ( lastDeposit . Time . Time ( ) , lastTime )
} else {
s . lastAssetDepositTimes [ asset ] = lastDeposit . Time . Time ( )
}
}
var succeededDeposits [ ] types . Deposit
for _ , deposit := range s . watchingDeposits {
if deposit . Status == types . DepositSuccess {
logger . Infof ( "found pending -> success deposit: %+v" , deposit )
current , required := deposit . GetCurrentConfirmation ( )
if required > 0 && deposit . UnlockConfirm > 0 && current < deposit . UnlockConfirm {
logger . Infof ( "deposit %s unlock confirm %d is not reached, current: %d, required: %d, skip this round" , deposit . TransactionID , deposit . UnlockConfirm , current , required )
continue
}
succeededDeposits = append ( succeededDeposits , deposit )
delete ( s . watchingDeposits , deposit . TransactionID )
}
}
return succeededDeposits , nil
}
func later ( a , b time . Time ) time . Time {
if a . After ( b ) {
return a
}
return b
}