service/sync: rewrite trade sync with syncTask

This commit is contained in:
c9s 2022-06-01 18:30:24 +08:00
parent 415450acb7
commit b070952b32
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 112 additions and 139 deletions

View File

@ -4,6 +4,13 @@ sessions:
exchange: binance
envVarPrefix: binance
binance_margin_dotusdt:
exchange: binance
envVarPrefix: binance
margin: true
isolatedMargin: true
isolatedMarginSymbol: DOTUSDT
max:
exchange: max
envVarPrefix: max
@ -16,12 +23,13 @@ sync:
filledOrders: true
# since is the start date of your trading data
since: 2019-11-01
since: 2022-01-01
# sessions is the list of session names you want to sync
# by default, BBGO sync all your available sessions.
sessions:
- binance
- binance_margin_dotusdt
- max
# symbols is the list of symbols you want to sync
@ -29,8 +37,15 @@ sync:
symbols:
- BTCUSDT
- ETHUSDT
- LINKUSDT
depositHistory: true
# marginHistory enables the margin history sync
marginHistory: true
# marginAssets lists the assets that are used in the margin.
# including loan, repay, interest and liquidation
marginAssets:
- USDT
# depositHistory: true
rewardHistory: true
withdrawHistory: true
# withdrawHistory: true

View File

@ -19,26 +19,10 @@ type OrderService struct {
}
func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isFutures := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
if futuresExchange, ok := exchange.(types.FuturesExchange); ok {
futuresSettings := futuresExchange.GetFuturesSettings()
isFutures = futuresSettings.IsFutures
isIsolated = futuresSettings.IsIsolatedFutures
if futuresSettings.IsIsolatedFutures {
symbol = futuresSettings.IsolatedFuturesSymbol
}
isMargin, isFutures, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
symbol = isolatedSymbol
}
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 50)
@ -99,7 +83,6 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol
return <-errC
}
// QueryLast queries the last order from the database
func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit int) ([]types.Order, error) {
log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_futures = %v AND is_isolated = %v", ex, symbol, isMargin, isFutures, isIsolated)

View File

@ -7,6 +7,7 @@ import (
"strings"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -51,86 +52,58 @@ func NewTradeService(db *sqlx.DB) *TradeService {
}
func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isFutures := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
isMargin, isFutures, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
symbol = isolatedSymbol
}
if futuresExchange, ok := exchange.(types.FuturesExchange); ok {
futuresSettings := futuresExchange.GetFuturesSettings()
isFutures = futuresSettings.IsFutures
isIsolated = futuresSettings.IsIsolatedFutures
if futuresSettings.IsIsolatedFutures {
symbol = futuresSettings.IsolatedFuturesSymbol
}
}
// buffer 50 trades and use the trades ID to scan if the new trades are duplicated
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100)
if err != nil {
return err
}
var tradeKeys = map[types.TradeKey]struct{}{}
// for exchange supports trade id query, we should always try to query from the first trade.
// 0 means disable.
var lastTradeID uint64 = 1
var now = time.Now()
if len(records) > 0 {
for _, record := range records {
tradeKeys[record.Key()] = struct{}{}
}
end := len(records) - 1
last := records[end]
lastTradeID = last.ID
startTime = last.Time.Time()
}
exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService)
api, ok := exchange.(types.ExchangeTradeHistoryService)
if !ok {
return nil
}
b := &batch.TradeBatchQuery{
ExchangeTradeHistoryService: exchangeTradeHistoryService,
lastTradeID := uint64(1)
tasks := []SyncTask{
{
Type: types.Trade{},
Select: SelectLastTrades(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100),
OnLoad: func(objs interface{}) {
// update last trade ID
trades := objs.([]types.Trade)
if len(trades) > 0 {
end := len(trades) - 1
last := trades[end]
lastTradeID = last.ID
}
},
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.TradeBatchQuery{
ExchangeTradeHistoryService: api,
}
return query.Query(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
EndTime: &endTime,
LastTradeID: lastTradeID,
})
},
Time: func(obj interface{}) time.Time {
return obj.(types.Trade).Time.Time()
},
ID: func(obj interface{}) string {
trade := obj.(types.Trade)
return strconv.FormatUint(trade.ID, 10) + trade.Side.String()
},
},
}
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
LastTradeID: lastTradeID,
StartTime: &startTime,
EndTime: &now,
})
for trade := range tradeC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
}
key := trade.Key()
if _, exists := tradeKeys[key]; exists {
continue
}
tradeKeys[key] = struct{}{}
/*
log.Infof("inserting trade: %s %d %s %-4s price: %-13v volume: %-11v %5s %s",
trade.Exchange,
trade.ID,
@ -140,13 +113,8 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
trade.Quantity,
trade.Liquidity(),
trade.Time.String())
if err := s.Insert(trade); err != nil {
return err
}
}
return <-errC
*/
return nil
}
func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) {
@ -472,43 +440,8 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro
}
func (s *TradeService) Insert(trade types.Trade) error {
_, err := s.DB.NamedExec(`
INSERT INTO trades (
id,
exchange,
order_id,
symbol,
price,
quantity,
quote_quantity,
side,
is_buyer,
is_maker,
fee,
fee_currency,
traded_at,
is_margin,
is_futures,
is_isolated)
VALUES (
:id,
:exchange,
:order_id,
:symbol,
:price,
:quantity,
:quote_quantity,
:side,
:is_buyer,
:is_maker,
:fee,
:fee_currency,
:traded_at,
:is_margin,
:is_futures,
:is_isolated
)`,
trade)
sql := dbCache.InsertSqlOf(trade)
_, err := s.DB.NamedExec(sql, trade)
return err
}
@ -516,3 +449,45 @@ func (s *TradeService) DeleteAll() error {
_, err := s.DB.Exec(`DELETE FROM trades`)
return err
}
func SelectLastTrades(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("trades").
Where(sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"exchange": ex},
sq.Eq{"is_margin": isMargin},
sq.Eq{"is_futures": isFutures},
sq.Eq{"is_isolated": isIsolated},
}).
OrderBy("traded_at DESC").
Limit(limit)
}
func getExchangeAttributes(exchange types.Exchange) (isMargin, isFutures, isIsolated bool, isolatedSymbol string) {
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
if isMargin {
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
isolatedSymbol = marginSettings.IsolatedMarginSymbol
}
}
}
if futuresExchange, ok := exchange.(types.FuturesExchange); ok {
futuresSettings := futuresExchange.GetFuturesSettings()
isFutures = futuresSettings.IsFutures
if isFutures {
isIsolated = futuresSettings.IsIsolatedFutures
if futuresSettings.IsIsolatedFutures {
isolatedSymbol = futuresSettings.IsolatedFuturesSymbol
}
}
}
return isMargin, isFutures, isIsolated, isolatedSymbol
}