2023-08-05 11:03:14 +00:00
package deposit2transfer
import (
"context"
"errors"
"fmt"
2023-08-07 01:51:07 +00:00
"sort"
"sync"
"time"
2023-08-05 11:03:14 +00:00
"github.com/sirupsen/logrus"
2023-08-11 11:03:16 +00:00
"golang.org/x/time/rate"
2023-08-05 11:03:14 +00:00
"github.com/c9s/bbgo/pkg/bbgo"
2023-08-07 01:51:07 +00:00
"github.com/c9s/bbgo/pkg/fixedpoint"
2023-08-05 11:03:14 +00:00
"github.com/c9s/bbgo/pkg/types"
)
2023-08-07 01:51:07 +00:00
type marginTransferService interface {
TransferMarginAccountAsset ( ctx context . Context , asset string , amount fixedpoint . Value , io types . TransferDirection ) error
}
2023-08-16 04:02:18 +00:00
type spotAccountQueryService interface {
QuerySpotAccount ( ctx context . Context ) ( * types . Account , error )
}
2023-08-05 11:03:14 +00:00
const ID = "deposit2transfer"
var log = logrus . WithField ( "strategy" , ID )
2023-08-07 01:51:07 +00:00
var errMarginTransferNotSupport = errors . New ( "exchange session does not support margin transfer" )
var errDepositHistoryNotSupport = errors . New ( "exchange session does not support deposit history query" )
2023-08-05 11:03:14 +00:00
func init ( ) {
// Register the pointer of the strategy struct,
// so that bbgo knows what struct to be used to unmarshal the configs (YAML or JSON)
// Note: built-in strategies need to imported manually in the bbgo cmd package.
bbgo . RegisterStrategy ( ID , & Strategy { } )
}
type Strategy struct {
Environment * bbgo . Environment
2023-08-08 03:07:21 +00:00
Assets [ ] string ` json:"assets" `
2023-08-07 01:51:07 +00:00
2023-08-08 04:38:23 +00:00
Interval types . Duration ` json:"interval" `
2023-08-05 11:03:14 +00:00
2023-08-07 01:51:07 +00:00
marginTransferService marginTransferService
depositHistoryService types . ExchangeTransferService
2023-08-05 11:03:14 +00:00
2023-08-08 04:08:14 +00:00
session * bbgo . ExchangeSession
2023-08-07 01:51:07 +00:00
watchingDeposits map [ string ] types . Deposit
mu sync . Mutex
2023-08-09 07:54:28 +00:00
lastAssetDepositTimes map [ string ] time . Time
2023-08-05 11:03:14 +00:00
}
func ( s * Strategy ) ID ( ) string {
return ID
}
func ( s * Strategy ) Subscribe ( session * bbgo . ExchangeSession ) { }
func ( s * Strategy ) Defaults ( ) error {
2023-08-08 04:38:23 +00:00
if s . Interval == 0 {
s . Interval = types . Duration ( 5 * time . Minute )
2023-08-05 11:03:14 +00:00
}
return nil
}
func ( s * Strategy ) Validate ( ) error {
return nil
}
func ( s * Strategy ) InstanceID ( ) string {
2023-08-08 03:58:36 +00:00
return fmt . Sprintf ( "%s-%s" , ID , s . Assets )
2023-08-05 11:03:14 +00:00
}
func ( s * Strategy ) Run ( ctx context . Context , orderExecutor bbgo . OrderExecutor , session * bbgo . ExchangeSession ) error {
2023-08-08 04:08:14 +00:00
s . session = session
2023-08-07 01:51:07 +00:00
s . watchingDeposits = make ( map [ string ] types . Deposit )
2023-08-09 07:54:28 +00:00
s . lastAssetDepositTimes = make ( map [ string ] time . Time )
2023-08-07 01:51:07 +00:00
2023-08-05 11:03:14 +00:00
var ok bool
2023-08-07 01:51:07 +00:00
s . marginTransferService , ok = session . Exchange . ( marginTransferService )
if ! ok {
return errMarginTransferNotSupport
}
2023-08-05 11:03:14 +00:00
2023-08-07 01:51:07 +00:00
s . depositHistoryService , ok = session . Exchange . ( types . ExchangeTransferService )
if ! ok {
return errDepositHistoryNotSupport
}
2023-08-05 11:03:14 +00:00
2023-08-08 04:38:23 +00:00
session . UserDataStream . OnStart ( func ( ) {
go s . tickWatcher ( ctx , s . Interval . Duration ( ) )
} )
2023-08-08 04:23:17 +00:00
2023-08-07 01:51:07 +00:00
return nil
}
2023-08-05 11:03:14 +00:00
2023-08-07 01:51:07 +00:00
func ( s * Strategy ) tickWatcher ( ctx context . Context , interval time . Duration ) {
ticker := time . NewTicker ( interval )
defer ticker . Stop ( )
2023-08-08 04:23:17 +00:00
s . checkDeposits ( ctx )
2023-08-08 04:38:23 +00:00
2023-08-07 01:51:07 +00:00
for {
select {
case <- ctx . Done ( ) :
return
case <- ticker . C :
2023-08-08 04:23:17 +00:00
s . checkDeposits ( ctx )
}
}
}
func ( s * Strategy ) checkDeposits ( ctx context . Context ) {
2023-08-11 11:03:16 +00:00
accountLimiter := rate . NewLimiter ( rate . Every ( 3 * time . Second ) , 1 )
2023-08-08 04:23:17 +00:00
for _ , asset := range s . Assets {
log . Infof ( "checking %s deposits..." , asset )
2023-08-08 04:08:14 +00:00
2023-08-08 04:23:17 +00:00
succeededDeposits , err := s . scanDepositHistory ( ctx , asset , 4 * time . Hour )
if err != nil {
log . WithError ( err ) . Errorf ( "unable to scan deposit history" )
return
}
2023-08-08 03:07:21 +00:00
2023-08-08 04:23:17 +00:00
if len ( succeededDeposits ) == 0 {
log . Infof ( "no %s deposit found" , asset )
2023-08-08 04:38:23 +00:00
continue
2023-08-08 04:23:17 +00:00
}
2023-08-08 03:07:21 +00:00
2023-08-08 04:23:17 +00:00
for _ , d := range succeededDeposits {
log . Infof ( "found succeeded deposit: %+v" , d )
2023-08-11 11:03:16 +00:00
if err2 := accountLimiter . Wait ( ctx ) ; err2 != nil {
log . WithError ( err2 ) . Errorf ( "rate limiter error" )
return
}
2023-08-16 04:02:18 +00:00
// we can't use the account from margin
amount := d . Amount
if service , ok := s . session . Exchange . ( spotAccountQueryService ) ; ok {
account , err2 := service . QuerySpotAccount ( ctx )
if err2 != nil {
2023-08-16 04:51:15 +00:00
log . WithError ( err2 ) . Errorf ( "unable to query spot account" )
2023-08-16 04:02:18 +00:00
continue
}
if bal , ok := account . Balance ( d . Asset ) ; ok {
2023-08-16 04:26:01 +00:00
log . Infof ( "spot account balance %s: %+v" , d . Asset , bal )
2023-08-16 04:02:18 +00:00
amount = fixedpoint . Min ( bal . Available , amount )
} else {
log . Errorf ( "unexpected error: %s balance not found" , d . Asset )
}
2023-08-11 11:03:16 +00:00
}
2023-08-08 04:23:17 +00:00
bbgo . Notify ( "Found succeeded deposit %s %s, transferring %s %s into the margin account" ,
d . Amount . String ( ) , d . Asset ,
amount . String ( ) , d . Asset )
2023-08-08 04:08:14 +00:00
2023-08-08 04:23:17 +00:00
if err2 := s . marginTransferService . TransferMarginAccountAsset ( ctx , d . Asset , amount , types . TransferIn ) ; err2 != nil {
log . WithError ( err2 ) . Errorf ( "unable to transfer deposit asset into the margin account" )
2023-08-07 01:51:07 +00:00
}
2023-08-05 11:03:14 +00:00
}
2023-08-07 01:51:07 +00:00
}
}
2023-08-05 11:03:14 +00:00
2023-08-08 03:07:21 +00:00
func ( s * Strategy ) scanDepositHistory ( ctx context . Context , asset string , duration time . Duration ) ( [ ] types . Deposit , error ) {
log . Infof ( "scanning %s deposit history..." , asset )
2023-08-07 01:51:07 +00:00
now := time . Now ( )
2023-08-08 03:07:21 +00:00
since := now . Add ( - duration )
deposits , err := s . depositHistoryService . QueryDepositHistory ( ctx , asset , since , now )
2023-08-07 01:51:07 +00:00
if err != nil {
return nil , err
2023-08-05 11:03:14 +00:00
}
2023-08-09 07:54:28 +00:00
// sort the recent deposit records in ascending order
2023-08-07 01:51:07 +00:00
sort . Slice ( deposits , func ( i , j int ) bool {
return deposits [ i ] . Time . Time ( ) . Before ( deposits [ j ] . Time . Time ( ) )
} )
s . mu . Lock ( )
2023-08-08 04:38:23 +00:00
defer s . mu . Unlock ( )
2023-08-07 01:51:07 +00:00
for _ , deposit := range deposits {
2023-08-08 04:38:23 +00:00
log . Infof ( "checking deposit: %+v" , deposit )
2023-08-08 04:08:14 +00:00
2023-08-08 03:07:21 +00:00
if deposit . Asset != asset {
continue
}
2023-08-07 01:51:07 +00:00
if _ , ok := s . watchingDeposits [ deposit . TransactionID ] ; ok {
// if the deposit record is in the watch list, update it
s . watchingDeposits [ deposit . TransactionID ] = deposit
} else {
switch deposit . Status {
2023-08-08 04:38:23 +00:00
2023-08-07 01:51:07 +00:00
case types . DepositSuccess :
2023-08-09 07:54:28 +00:00
if depositTime , ok := s . lastAssetDepositTimes [ asset ] ; ok {
// if it's newer than the latest deposit time, then we just add it the monitoring list
if deposit . Time . After ( depositTime ) {
log . Infof ( "adding new success deposit: %s" , deposit . TransactionID )
s . watchingDeposits [ deposit . TransactionID ] = deposit
}
} else {
// ignore all initial deposit history that are already success
log . Infof ( "ignored succeess deposit: %s %+v" , deposit . TransactionID , deposit )
}
2023-08-07 01:51:07 +00:00
case types . DepositCredited , types . DepositPending :
2023-08-08 04:08:14 +00:00
log . Infof ( "adding pending deposit: %s" , deposit . TransactionID )
2023-08-07 01:51:07 +00:00
s . watchingDeposits [ deposit . TransactionID ] = deposit
}
}
}
2023-08-05 11:03:14 +00:00
2023-08-09 07:54:28 +00:00
if len ( deposits ) > 0 {
lastDeposit := deposits [ len ( deposits ) - 1 ]
if lastTime , ok := s . lastAssetDepositTimes [ asset ] ; ok {
s . lastAssetDepositTimes [ asset ] = later ( lastDeposit . Time . Time ( ) , lastTime )
} else {
s . lastAssetDepositTimes [ asset ] = lastDeposit . Time . Time ( )
}
}
2023-08-07 01:51:07 +00:00
var succeededDeposits [ ] types . Deposit
2023-08-08 04:38:59 +00:00
for _ , deposit := range s . watchingDeposits {
2023-08-07 01:51:07 +00:00
if deposit . Status == types . DepositSuccess {
2023-08-08 04:08:14 +00:00
log . Infof ( "found pending -> success deposit: %+v" , deposit )
2023-08-08 03:07:21 +00:00
current , required := deposit . GetCurrentConfirmation ( )
if required > 0 && deposit . UnlockConfirm > 0 && current < deposit . UnlockConfirm {
2023-08-08 04:08:14 +00:00
log . Infof ( "deposit %s unlock confirm %d is not reached, current: %d, required: %d, skip this round" , deposit . TransactionID , deposit . UnlockConfirm , current , required )
2023-08-08 03:07:21 +00:00
continue
}
2023-08-07 01:51:07 +00:00
succeededDeposits = append ( succeededDeposits , deposit )
delete ( s . watchingDeposits , deposit . TransactionID )
}
2023-08-05 11:03:14 +00:00
}
2023-08-07 01:51:07 +00:00
return succeededDeposits , nil
2023-08-05 11:03:14 +00:00
}
2023-08-09 07:54:28 +00:00
func later ( a , b time . Time ) time . Time {
if a . After ( b ) {
return a
}
return b
}