Merge pull request #652 from c9s/refactor/sync

refactor/fix: withdraw sync
This commit is contained in:
Yo-An Lin 2022-06-02 14:03:54 +08:00 committed by GitHub
commit 38a6d8c813
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 689 additions and 234 deletions

View File

@ -23,7 +23,7 @@ sync:
filledOrders: true
# since is the start date of your trading data
since: 2022-01-01
since: 2019-01-01
# sessions is the list of session names you want to sync
# by default, BBGO sync all your available sessions.
@ -37,6 +37,7 @@ sync:
symbols:
- BTCUSDT
- ETHUSDT
- DOTUSDT
# marginHistory enables the margin history sync
marginHistory: true
@ -47,5 +48,5 @@ sync:
- USDT
# depositHistory: true
rewardHistory: true
# withdrawHistory: true
# rewardHistory: true
withdrawHistory: true

View File

@ -587,6 +587,8 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *
sessions = environ.SelectSessions(selectedSessions...)
}
since := userConfig.Sync.Since.Time()
for _, session := range sessions {
if err := environ.syncSession(ctx, session, syncSymbols...); err != nil {
return err
@ -599,7 +601,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *
}
if userConfig.Sync.WithdrawHistory {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange, since); err != nil {
return err
}
}
@ -612,7 +614,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *
if userConfig.Sync.MarginHistory {
if err := environ.SyncService.SyncMarginHistory(ctx, session.Exchange,
userConfig.Sync.Since.Time(),
since,
userConfig.Sync.MarginAssets...); err != nil {
return err
}
@ -644,6 +646,8 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err
return environ.syncWithUserConfig(ctx, userConfig[0])
}
since := time.Now().AddDate(0, -6, 0)
// the default sync logics
for _, session := range environ.sessions {
if err := environ.syncSession(ctx, session); err != nil {
@ -661,7 +665,7 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err
}
if userConfig[0].Sync.WithdrawHistory {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange, since); err != nil {
return err
}
}

View File

@ -61,12 +61,17 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s
listRef := reflect.ValueOf(sliceInf)
listLen := listRef.Len()
log.Debugf("batch querying %T: %d remote records", q.Type, listLen)
if listLen == 0 {
if q.JumpIfEmpty > 0 {
startTime = startTime.Add(q.JumpIfEmpty)
log.Debugf("batch querying %T: empty records jump to %s", q.Type, startTime)
continue
}
log.Debugf("batch querying %T: empty records, query is completed", q.Type)
return
}
@ -94,7 +99,7 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s
obj := item.Interface()
id := q.ID(obj)
if _, exists := idMap[id]; exists {
log.Debugf("batch querying %T: duplicated id %s", q.Type, id)
log.Debugf("batch querying %T: ignore duplicated record, id = %s", q.Type, id)
continue
}
@ -102,10 +107,11 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s
cRef.Send(item)
sentAny = true
startTime = entryTime
startTime = entryTime.Add(time.Millisecond)
}
if !sentAny {
log.Debugf("batch querying %T: %d/%d records are not sent", q.Type, listLen, listLen)
return
}
}

View File

@ -0,0 +1,36 @@
package batch
import (
"context"
"time"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/types"
)
type WithdrawBatchQuery struct {
types.ExchangeTransferService
}
func (e *WithdrawBatchQuery) Query(ctx context.Context, asset string, startTime, endTime time.Time) (c chan types.Withdraw, errC chan error) {
query := &AsyncTimeRangedBatchQuery{
Type: types.Withdraw{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
JumpIfEmpty: time.Hour * 24 * 80,
Q: func(startTime, endTime time.Time) (interface{}, error) {
return e.ExchangeTransferService.QueryWithdrawHistory(ctx, asset, startTime, endTime)
},
T: func(obj interface{}) time.Time {
return time.Time(obj.(types.Withdraw).ApplyTime)
},
ID: func(obj interface{}) string {
withdraw := obj.(types.Withdraw)
return withdraw.TransactionID
},
}
c = make(chan types.Withdraw, 100)
errC = query.Query(ctx, c, startTime, endTime)
return c, errC
}

View File

@ -0,0 +1,38 @@
package batch
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/testutil"
)
func TestWithdrawBatchQuery(t *testing.T) {
key, secret, ok := testutil.IntegrationTestConfigured(t, "BINANCE")
if !ok {
t.Skip("binance api is not set")
}
ex := binance.New(key, secret)
q := WithdrawBatchQuery{
ExchangeTransferService: ex,
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
now := time.Now()
startTime := now.AddDate(0, -6, 0)
endTime := now
dataC, errC := q.Query(ctx, "", startTime, endTime)
for withdraw := range dataC {
t.Logf("%+v", withdraw)
}
err := <-errC
assert.NoError(t, err)
}

View File

@ -0,0 +1,67 @@
package binanceapi
import (
"time"
"github.com/c9s/requestgen"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
//go:generate stringer -type=TransferType
// 1 for internal transfer, 0 for external transfer
type TransferType int
const (
TransferTypeInternal TransferType = 0
TransferTypeExternal TransferType = 0
)
type WithdrawRecord struct {
Id string `json:"id"`
Address string `json:"address"`
Amount fixedpoint.Value `json:"amount"`
ApplyTime string `json:"applyTime"`
Coin string `json:"coin"`
WithdrawOrderID string `json:"withdrawOrderId"`
Network string `json:"network"`
TransferType TransferType `json:"transferType"`
Status WithdrawStatus `json:"status"`
TransactionFee fixedpoint.Value `json:"transactionFee"`
ConfirmNo int `json:"confirmNo"`
Info string `json:"info"`
TxID string `json:"txId"`
}
//go:generate stringer -type=WithdrawStatus
type WithdrawStatus int
// WithdrawStatus: 0(0:Email Sent,1:Cancelled 2:Awaiting Approval 3:Rejected 4:Processing 5:Failure 6:Completed)
const (
WithdrawStatusEmailSent WithdrawStatus = iota
WithdrawStatusCancelled
WithdrawStatusAwaitingApproval
WithdrawStatusRejected
WithdrawStatusProcessing
WithdrawStatusFailure
WithdrawStatusCompleted
)
//go:generate requestgen -method GET -url "/sapi/v1/capital/withdraw/history" -type GetWithdrawHistoryRequest -responseType []WithdrawRecord
type GetWithdrawHistoryRequest struct {
client requestgen.AuthenticatedAPIClient
coin string `param:"coin"`
withdrawOrderId *string `param:"withdrawOrderId"`
status *WithdrawStatus `param:"status"`
startTime *time.Time `param:"startTime,milliseconds"`
endTime *time.Time `param:"endTime,milliseconds"`
limit *uint64 `param:"limit"`
offset *uint64 `param:"offset"`
}
func (c *RestClient) NewGetWithdrawHistoryRequest() *GetWithdrawHistoryRequest {
return &GetWithdrawHistoryRequest{client: c}
}

View File

@ -0,0 +1,241 @@
// Code generated by "requestgen -method GET -url /sapi/v1/capital/withdraw/history -type GetWithdrawHistoryRequest -responseType []WithdrawRecord"; DO NOT EDIT.
package binanceapi
import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"regexp"
"strconv"
"time"
)
func (g *GetWithdrawHistoryRequest) Coin(coin string) *GetWithdrawHistoryRequest {
g.coin = coin
return g
}
func (g *GetWithdrawHistoryRequest) WithdrawOrderId(withdrawOrderId string) *GetWithdrawHistoryRequest {
g.withdrawOrderId = &withdrawOrderId
return g
}
func (g *GetWithdrawHistoryRequest) Status(status WithdrawStatus) *GetWithdrawHistoryRequest {
g.status = &status
return g
}
func (g *GetWithdrawHistoryRequest) StartTime(startTime time.Time) *GetWithdrawHistoryRequest {
g.startTime = &startTime
return g
}
func (g *GetWithdrawHistoryRequest) EndTime(endTime time.Time) *GetWithdrawHistoryRequest {
g.endTime = &endTime
return g
}
func (g *GetWithdrawHistoryRequest) Limit(limit uint64) *GetWithdrawHistoryRequest {
g.limit = &limit
return g
}
func (g *GetWithdrawHistoryRequest) Offset(offset uint64) *GetWithdrawHistoryRequest {
g.offset = &offset
return g
}
// GetQueryParameters builds and checks the query parameters and returns url.Values
func (g *GetWithdrawHistoryRequest) GetQueryParameters() (url.Values, error) {
var params = map[string]interface{}{}
query := url.Values{}
for _k, _v := range params {
query.Add(_k, fmt.Sprintf("%v", _v))
}
return query, nil
}
// GetParameters builds and checks the parameters and return the result in a map object
func (g *GetWithdrawHistoryRequest) GetParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
// check coin field -> json key coin
coin := g.coin
// assign parameter of coin
params["coin"] = coin
// check withdrawOrderId field -> json key withdrawOrderId
if g.withdrawOrderId != nil {
withdrawOrderId := *g.withdrawOrderId
// assign parameter of withdrawOrderId
params["withdrawOrderId"] = withdrawOrderId
} else {
}
// check status field -> json key status
if g.status != nil {
status := *g.status
// TEMPLATE check-valid-values
switch status {
case WithdrawStatusEmailSent:
params["status"] = status
default:
return nil, fmt.Errorf("status value %v is invalid", status)
}
// END TEMPLATE check-valid-values
// assign parameter of status
params["status"] = status
} else {
}
// check startTime field -> json key startTime
if g.startTime != nil {
startTime := *g.startTime
// assign parameter of startTime
// convert time.Time to milliseconds time stamp
params["startTime"] = strconv.FormatInt(startTime.UnixNano()/int64(time.Millisecond), 10)
} else {
}
// check endTime field -> json key endTime
if g.endTime != nil {
endTime := *g.endTime
// assign parameter of endTime
// convert time.Time to milliseconds time stamp
params["endTime"] = strconv.FormatInt(endTime.UnixNano()/int64(time.Millisecond), 10)
} else {
}
// check limit field -> json key limit
if g.limit != nil {
limit := *g.limit
// assign parameter of limit
params["limit"] = limit
} else {
}
// check offset field -> json key offset
if g.offset != nil {
offset := *g.offset
// assign parameter of offset
params["offset"] = offset
} else {
}
return params, nil
}
// GetParametersQuery converts the parameters from GetParameters into the url.Values format
func (g *GetWithdrawHistoryRequest) GetParametersQuery() (url.Values, error) {
query := url.Values{}
params, err := g.GetParameters()
if err != nil {
return query, err
}
for _k, _v := range params {
if g.isVarSlice(_v) {
g.iterateSlice(_v, func(it interface{}) {
query.Add(_k+"[]", fmt.Sprintf("%v", it))
})
} else {
query.Add(_k, fmt.Sprintf("%v", _v))
}
}
return query, nil
}
// GetParametersJSON converts the parameters from GetParameters into the JSON format
func (g *GetWithdrawHistoryRequest) GetParametersJSON() ([]byte, error) {
params, err := g.GetParameters()
if err != nil {
return nil, err
}
return json.Marshal(params)
}
// GetSlugParameters builds and checks the slug parameters and return the result in a map object
func (g *GetWithdrawHistoryRequest) GetSlugParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
func (g *GetWithdrawHistoryRequest) applySlugsToUrl(url string, slugs map[string]string) string {
for _k, _v := range slugs {
needleRE := regexp.MustCompile(":" + _k + "\\b")
url = needleRE.ReplaceAllString(url, _v)
}
return url
}
func (g *GetWithdrawHistoryRequest) iterateSlice(slice interface{}, _f func(it interface{})) {
sliceValue := reflect.ValueOf(slice)
for _i := 0; _i < sliceValue.Len(); _i++ {
it := sliceValue.Index(_i).Interface()
_f(it)
}
}
func (g *GetWithdrawHistoryRequest) isVarSlice(_v interface{}) bool {
rt := reflect.TypeOf(_v)
switch rt.Kind() {
case reflect.Slice:
return true
}
return false
}
func (g *GetWithdrawHistoryRequest) GetSlugsMap() (map[string]string, error) {
slugs := map[string]string{}
params, err := g.GetSlugParameters()
if err != nil {
return slugs, nil
}
for _k, _v := range params {
slugs[_k] = fmt.Sprintf("%v", _v)
}
return slugs, nil
}
func (g *GetWithdrawHistoryRequest) Do(ctx context.Context) ([]WithdrawRecord, error) {
// empty params for GET operation
var params interface{}
query, err := g.GetParametersQuery()
if err != nil {
return nil, err
}
apiURL := "/sapi/v1/capital/withdraw/history"
req, err := g.client.NewAuthenticatedRequest(ctx, "GET", apiURL, query, params)
if err != nil {
return nil, err
}
response, err := g.client.SendRequest(req)
if err != nil {
return nil, err
}
var apiResponse []WithdrawRecord
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}
return apiResponse, nil
}

View File

@ -0,0 +1,24 @@
// Code generated by "stringer -type=TransferType"; DO NOT EDIT.
package binanceapi
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[TransferTypeInternal-0]
_ = x[TransferTypeExternal-0]
}
const _TransferType_name = "TransferTypeInternal"
var _TransferType_index = [...]uint8{0, 20}
func (i TransferType) String() string {
if i < 0 || i >= TransferType(len(_TransferType_index)-1) {
return "TransferType(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _TransferType_name[_TransferType_index[i]:_TransferType_index[i+1]]
}

View File

@ -0,0 +1,29 @@
// Code generated by "stringer -type=WithdrawStatus"; DO NOT EDIT.
package binanceapi
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[WithdrawStatusEmailSent-0]
_ = x[WithdrawStatusCancelled-1]
_ = x[WithdrawStatusAwaitingApproval-2]
_ = x[WithdrawStatusRejected-3]
_ = x[WithdrawStatusProcessing-4]
_ = x[WithdrawStatusFailure-5]
_ = x[WithdrawStatusCompleted-6]
}
const _WithdrawStatus_name = "WithdrawStatusEmailSentWithdrawStatusCancelledWithdrawStatusAwaitingApprovalWithdrawStatusRejectedWithdrawStatusProcessingWithdrawStatusFailureWithdrawStatusCompleted"
var _WithdrawStatus_index = [...]uint8{0, 23, 46, 76, 98, 122, 143, 166}
func (i WithdrawStatus) String() string {
if i < 0 || i >= WithdrawStatus(len(_WithdrawStatus_index)-1) {
return "WithdrawStatus(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _WithdrawStatus_name[_WithdrawStatus_index[i]:_WithdrawStatus_index[i+1]]
}

View File

@ -402,8 +402,8 @@ func (e *Exchange) queryIsolatedMarginAccount(ctx context.Context) (*types.Accou
return a, nil
}
func (e *Exchange) Withdrawal(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *types.WithdrawalOptions) error {
req := e.client.NewCreateWithdrawService()
func (e *Exchange) Withdraw(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *types.WithdrawalOptions) error {
req := e.client2.NewWithdrawRequest()
req.Coin(asset)
req.Address(address)
req.Amount(fmt.Sprintf("%f", amount.Float64()))
@ -426,92 +426,58 @@ func (e *Exchange) Withdrawal(ctx context.Context, asset string, amount fixedpoi
return nil
}
func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) {
startTime := since
func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (withdraws []types.Withdraw, err error) {
var emptyTime = time.Time{}
if startTime == emptyTime {
startTime, err = getLaunchDate()
if since == emptyTime {
since, err = getLaunchDate()
if err != nil {
return nil, err
return withdraws, err
}
}
txIDs := map[string]struct{}{}
for startTime.Before(until) {
// startTime ~ endTime must be in 90 days
endTime := startTime.AddDate(0, 0, 60)
if endTime.After(until) {
endTime = until
historyDayRangeLimit := time.Hour * 24 * 89
if until.Sub(since) >= historyDayRangeLimit {
until = since.Add(historyDayRangeLimit)
}
req := e.client.NewListWithdrawsService()
req := e.client2.NewGetWithdrawHistoryRequest()
if len(asset) > 0 {
req.Coin(asset)
}
withdraws, err := req.
StartTime(startTime.UnixNano() / int64(time.Millisecond)).
EndTime(endTime.UnixNano() / int64(time.Millisecond)).
records, err := req.
StartTime(since).
EndTime(until).
Limit(1000).
Do(ctx)
if err != nil {
return allWithdraws, err
return withdraws, err
}
for _, d := range withdraws {
if _, ok := txIDs[d.TxID]; ok {
continue
}
status := ""
switch d.Status {
case 0:
status = "email_sent"
case 1:
status = "cancelled"
case 2:
status = "awaiting_approval"
case 3:
status = "rejected"
case 4:
status = "processing"
case 5:
status = "failure"
case 6:
status = "completed"
default:
status = fmt.Sprintf("unsupported code: %d", d.Status)
}
txIDs[d.TxID] = struct{}{}
// 2006-01-02 15:04:05
for _, d := range records {
// time format: 2006-01-02 15:04:05
applyTime, err := time.Parse("2006-01-02 15:04:05", d.ApplyTime)
if err != nil {
return nil, err
}
allWithdraws = append(allWithdraws, types.Withdraw{
withdraws = append(withdraws, types.Withdraw{
Exchange: types.ExchangeBinance,
ApplyTime: types.Time(applyTime),
Asset: d.Coin,
Amount: fixedpoint.MustNewFromString(d.Amount),
Amount: d.Amount,
Address: d.Address,
TransactionID: d.TxID,
TransactionFee: fixedpoint.MustNewFromString(d.TransactionFee),
TransactionFee: d.TransactionFee,
WithdrawOrderID: d.WithdrawOrderID,
Network: d.Network,
Status: status,
Status: d.Status.String(),
})
}
startTime = endTime
}
return allWithdraws, nil
return withdraws, nil
}
func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) {

View File

@ -407,7 +407,7 @@ func toMaxSubmitOrder(o types.SubmitOrder) (*maxapi.SubmitOrder, error) {
return &maxOrder, nil
}
func (e *Exchange) Withdrawal(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *types.WithdrawalOptions) error {
func (e *Exchange) Withdraw(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *types.WithdrawalOptions) error {
asset = toLocalCurrency(asset)
addresses, err := e.client.WithdrawalService.NewGetWithdrawalAddressesRequest().

View File

@ -6,8 +6,8 @@ import (
"strings"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
@ -25,84 +25,76 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol
symbol = isolatedSymbol
}
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 50)
if err != nil {
return err
}
orderKeys := make(map[uint64]struct{})
var lastID uint64 = 0
if len(records) > 0 {
for _, record := range records {
orderKeys[record.OrderID] = struct{}{}
}
lastID = records[0].OrderID
startTime = records[0].CreationTime.Time()
}
exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService)
api, ok := exchange.(types.ExchangeTradeHistoryService)
if !ok {
return nil
}
b := &batch.ClosedOrderBatchQuery{
ExchangeTradeHistoryService: exchangeTradeHistoryService,
lastOrderID := uint64(0)
tasks := []SyncTask{
{
Type: types.Order{},
Time: func(obj interface{}) time.Time {
return obj.(types.Order).CreationTime.Time()
},
ID: func(obj interface{}) string {
order := obj.(types.Order)
return strconv.FormatUint(order.OrderID, 10)
},
Select: SelectLastOrders(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100),
OnLoad: func(objs interface{}) {
// update last order ID
orders := objs.([]types.Order)
if len(orders) > 0 {
end := len(orders) - 1
last := orders[end]
lastOrderID = last.OrderID
}
ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID)
for order := range ordersC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, exists := orderKeys[order.OrderID]; exists {
continue
},
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.ClosedOrderBatchQuery{
ExchangeTradeHistoryService: api,
}
return query.Query(ctx, symbol, startTime, endTime, lastOrderID)
},
Filter: func(obj interface{}) bool {
// skip canceled and not filled orders
order := obj.(types.Order)
if order.Status == types.OrderStatusCanceled && order.ExecutedQuantity.IsZero() {
continue
return false
}
if err := s.Insert(order); err != nil {
return true
},
Insert: func(obj interface{}) error {
order := obj.(types.Order)
return s.Insert(order)
},
},
}
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
}
return <-errC
return nil
}
// QueryLast queries the last order from the database
func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit int) ([]types.Order, error) {
log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_futures = %v AND is_isolated = %v", ex, symbol, isMargin, isFutures, isIsolated)
sql := `SELECT * FROM orders WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_futures = :is_futures AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT :limit`
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
"is_margin": isMargin,
"is_futures": isFutures,
"is_isolated": isIsolated,
"limit": limit,
})
if err != nil {
return nil, errors.Wrap(err, "query last order error")
}
defer rows.Close()
return s.scanRows(rows)
func SelectLastOrders(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("orders").
Where(sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"exchange": ex},
sq.Eq{"is_margin": isMargin},
sq.Eq{"is_futures": isFutures},
sq.Eq{"is_isolated": isIsolated},
}).
OrderBy("gid DESC").
Limit(limit)
}
type AggOrder struct {

View File

@ -100,9 +100,9 @@ func (s *SyncService) SyncDepositHistory(ctx context.Context, exchange types.Exc
return nil
}
func (s *SyncService) SyncWithdrawHistory(ctx context.Context, exchange types.Exchange) error {
func (s *SyncService) SyncWithdrawHistory(ctx context.Context, exchange types.Exchange, startTime time.Time) error {
log.Infof("syncing %s withdraw records...", exchange.Name())
if err := s.WithdrawService.Sync(ctx, exchange); err != nil {
if err := s.WithdrawService.Sync(ctx, exchange, startTime); err != nil {
if err != ErrNotImplemented {
log.Warnf("%s withdraw service is not supported", exchange.Name())
return err

View File

@ -18,18 +18,24 @@ type SyncTask struct {
// Since it will create a []Type slice from this type, you should not set pointer to this field
Type interface{}
// Select is the select query builder for querying db records
Select squirrel.SelectBuilder
// OnLoad is called when the records are loaded from the database
OnLoad func(objs interface{})
// ID is a function that returns the unique identity of the object
ID func(obj interface{}) string
// Time is a function that returns the time of the object
Time func(obj interface{}) time.Time
// Select is the select query builder for querying db records
Select squirrel.SelectBuilder
// OnLoad is an optional field, which is called when the records are loaded from the database
OnLoad func(objs interface{})
// Filter is an optional field, which is used for filtering the remote records
Filter func(obj interface{}) bool
// Insert is an option field, which is used for customizing the record insert
Insert func(obj interface{}) error
// BatchQuery is used for querying remote records.
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
}
@ -86,13 +92,27 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
continue
}
if sel.Filter != nil {
if !sel.Filter(obj) {
continue
}
}
logrus.Infof("inserting %T: %+v", obj, obj)
if sel.Insert != nil {
// for custom insert
if err := sel.Insert(obj); err != nil {
return err
}
} else {
if err := insertType(db, obj); err != nil {
return err
}
}
}
}
}
func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Time) time.Time {
since := defaultTime

View File

@ -103,17 +103,6 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
}
}
/*
log.Infof("inserting trade: %s %d %s %-4s price: %-13v volume: %-11v %5s %s",
trade.Exchange,
trade.ID,
trade.Symbol,
trade.Side,
trade.Price,
trade.Quantity,
trade.Liquidity(),
trade.Time.String())
*/
return nil
}
@ -464,8 +453,6 @@ func SelectLastTrades(ex types.ExchangeName, symbol string, isMargin, isFutures,
Limit(limit)
}
func getExchangeAttributes(exchange types.Exchange) (isMargin, isFutures, isIsolated bool, isolatedSymbol string) {
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()

View File

@ -2,12 +2,12 @@ package service
import (
"context"
"fmt"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
@ -15,51 +15,53 @@ type WithdrawService struct {
DB *sqlx.DB
}
// Sync syncs the withdraw records into db
func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 10)
if err != nil {
return err
}
for _, record := range records {
txnIDs[record.TransactionID] = struct{}{}
// Sync syncs the withdrawal records into db
func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange, startTime time.Time) error {
isMargin, isFutures, isIsolated, _ := getExchangeAttributes(ex)
if isMargin || isFutures || isIsolated {
// only works in spot
return nil
}
transferApi, ok := ex.(types.ExchangeTransferService)
if !ok {
return ErrNotImplemented
return nil
}
since := time.Time{}
if len(records) > 0 {
since = records[len(records)-1].ApplyTime.Time()
tasks := []SyncTask{
{
Type: types.Withdraw{},
Select: SelectLastWithdraws(ex.Name(), 100),
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.WithdrawBatchQuery{
ExchangeTransferService: transferApi,
}
// asset "" means all assets
withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", since, time.Now())
if err != nil {
return err
}
for _, withdraw := range withdraws {
if _, exists := txnIDs[withdraw.TransactionID]; exists {
continue
}
return query.Query(ctx, "", startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.Withdraw).ApplyTime.Time()
},
ID: func(obj interface{}) string {
withdraw := obj.(types.Withdraw)
return withdraw.TransactionID
},
Filter: func(obj interface{}) bool {
withdraw := obj.(types.Withdraw)
if withdraw.Status == "rejected" {
log.Warnf("skip record, withdraw transaction rejected: %+v", withdraw)
continue
return false
}
if len(withdraw.TransactionID) == 0 {
return fmt.Errorf("empty withdraw transacion ID: %+v", withdraw)
return false
}
if err := s.Insert(withdraw); err != nil {
return true
},
},
}
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
}
@ -67,6 +69,16 @@ func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error {
return nil
}
func SelectLastWithdraws(ex types.ExchangeName, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("withdraws").
Where(sq.And{
sq.Eq{"exchange": ex},
}).
OrderBy("time DESC").
Limit(limit)
}
func (s *WithdrawService) QueryLast(ex types.ExchangeName, limit int) ([]types.Withdraw, error) {
sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{

View File

@ -84,7 +84,7 @@ func (r *WithdrawalRequest) String() string {
}
func (r *WithdrawalRequest) PlainText() string {
return fmt.Sprintf("Withdrawal request: sending %s %s from %s -> %s",
return fmt.Sprintf("Withdraw request: sending %s %s from %s -> %s",
r.Amount.FormatString(4),
r.Asset,
r.FromSession,
@ -94,7 +94,7 @@ func (r *WithdrawalRequest) PlainText() string {
func (r *WithdrawalRequest) SlackAttachment() slack.Attachment {
var color = "#DC143C"
title := util.Render(`Withdrawal Request {{ .Asset }}`, r)
title := util.Render(`Withdraw Request {{ .Asset }}`, r)
return slack.Attachment{
// Pretext: "",
// Text: text,
@ -265,7 +265,7 @@ func (s *Strategy) checkBalance(ctx context.Context, sessions map[string]*bbgo.E
Amount: requiredAmount,
})
if err := withdrawalService.Withdrawal(ctx, s.Asset, requiredAmount, toAddress.Address, &types.WithdrawalOptions{
if err := withdrawalService.Withdraw(ctx, s.Asset, requiredAmount, toAddress.Address, &types.WithdrawalOptions{
Network: toAddress.Network,
AddressTag: toAddress.AddressTag,
}); err != nil {

View File

@ -40,15 +40,22 @@ func (n ExchangeName) String() string {
}
const (
ExchangeMax = ExchangeName("max")
ExchangeBinance = ExchangeName("binance")
ExchangeFTX = ExchangeName("ftx")
ExchangeOKEx = ExchangeName("okex")
ExchangeKucoin = ExchangeName("kucoin")
ExchangeBacktest = ExchangeName("backtest")
ExchangeMax ExchangeName = "max"
ExchangeBinance ExchangeName = "binance"
ExchangeFTX ExchangeName = "ftx"
ExchangeOKEx ExchangeName = "okex"
ExchangeKucoin ExchangeName = "kucoin"
ExchangeBacktest ExchangeName = "backtest"
)
var SupportedExchanges = []ExchangeName{"binance", "max", "ftx", "okex", "kucoin"}
var SupportedExchanges = []ExchangeName{
ExchangeMax,
ExchangeBinance,
ExchangeFTX,
ExchangeOKEx,
ExchangeKucoin,
// note: we are not using "backtest"
}
func ValidExchangeName(a string) (ExchangeName, error) {
switch strings.ToLower(a) {
@ -122,7 +129,7 @@ type ExchangeTransferService interface {
}
type ExchangeWithdrawalService interface {
Withdrawal(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *WithdrawalOptions) error
Withdraw(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *WithdrawalOptions) error
}
type ExchangeRewardService interface {

View File

@ -2,8 +2,9 @@ package types
import (
"fmt"
"github.com/c9s/bbgo/pkg/fixedpoint"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
type Withdraw struct {
@ -23,8 +24,32 @@ type Withdraw struct {
Network string `json:"network" db:"network"`
}
func (w Withdraw) String() string {
return fmt.Sprintf("withdraw %s %v to %s at %s", w.Asset, w.Amount, w.Address, w.ApplyTime.Time())
func cutstr(s string, maxLen, head, tail int) string {
if len(s) > maxLen {
l := len(s)
return s[0:head] + "..." + s[l-tail:]
}
return s
}
func (w Withdraw) String() (o string) {
o = fmt.Sprintf("withdraw %s %v -> ", w.Asset, w.Amount)
if len(w.Network) > 0 && w.Network != w.Asset {
o += w.Network + ":"
}
o += fmt.Sprintf("%s at %s", w.Address, w.ApplyTime.Time())
if !w.TransactionFee.IsZero() {
o += fmt.Sprintf("fee %f %s", w.TransactionFee.Float64(), w.TransactionFeeCurrency)
}
if len(w.TransactionID) > 0 {
o += fmt.Sprintf("txID: %s", cutstr(w.TransactionID, 12, 4, 4))
}
return o
}
func (w Withdraw) EffectiveTime() time.Time {