refactor sync service

This commit is contained in:
c9s 2021-03-14 11:04:56 +08:00
parent 3c90aa515d
commit 4b49fda463
6 changed files with 128 additions and 234 deletions

View File

@ -341,6 +341,7 @@ for lorca
make embed && go run -tags web ./cmd/bbgo-lorca make embed && go run -tags web ./cmd/bbgo-lorca
``` ```
## Support ## Support
### By contributing pull requests ### By contributing pull requests

View File

@ -161,9 +161,11 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver
environ.RewardService = &service.RewardService{DB: db} environ.RewardService = &service.RewardService{DB: db}
environ.SyncService = &service.SyncService{ environ.SyncService = &service.SyncService{
TradeService: environ.TradeService, TradeService: environ.TradeService,
OrderService: environ.OrderService, OrderService: environ.OrderService,
RewardService: environ.RewardService, RewardService: environ.RewardService,
WithdrawService: &service.WithdrawService{DB: db},
DepositService: &service.DepositService{DB: db},
} }
return nil return nil

View File

@ -2,7 +2,10 @@ package cmd
import ( import (
"context" "context"
"fmt"
"os"
"strings" "strings"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -11,14 +14,14 @@ import (
"github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/accounting"
"github.com/c9s/bbgo/pkg/accounting/pnl" "github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
func init() { func init() {
PnLCmd.Flags().String("exchange", "", "target exchange") PnLCmd.Flags().String("session", "", "target exchange")
PnLCmd.Flags().String("symbol", "BTCUSDT", "trading symbol") PnLCmd.Flags().String("symbol", "", "trading symbol")
PnLCmd.Flags().Bool("include-transfer", false, "convert transfer records into trades")
PnLCmd.Flags().Int("limit", 500, "number of trades") PnLCmd.Flags().Int("limit", 500, "number of trades")
RootCmd.AddCommand(PnLCmd) RootCmd.AddCommand(PnLCmd)
} }
@ -30,12 +33,26 @@ var PnLCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
exchangeNameStr, err := cmd.Flags().GetString("exchange") configFile, err := cmd.Flags().GetString("config")
if err != nil { if err != nil {
return err return err
} }
exchangeName, err := types.ValidExchangeName(exchangeNameStr) if len(configFile) == 0 {
return errors.New("--config option is required")
}
if _, err := os.Stat(configFile); os.IsNotExist(err) {
return err
}
userConfig, err := bbgo.Load(configFile, false)
if err != nil {
return err
}
sessionName, err := cmd.Flags().GetString("session")
if err != nil { if err != nil {
return err return err
} }
@ -45,23 +62,73 @@ var PnLCmd = &cobra.Command{
return err return err
} }
if len(symbol) == 0 {
return errors.New("--symbol [SYMBOL] is required")
}
limit, err := cmd.Flags().GetInt("limit") limit, err := cmd.Flags().GetInt("limit")
if err != nil { if err != nil {
return err return err
} }
exchange, err := cmdutil.NewExchange(exchangeName) environ := bbgo.NewEnvironment()
if err := environ.ConfigureDatabase(ctx); err != nil {
return err
}
if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err
}
session, ok := environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
}
if err := environ.Sync(ctx) ; err != nil {
return err
}
exchange := session.Exchange
market, ok := session.Market(symbol)
if !ok {
return fmt.Errorf("market config %s not found", symbol)
}
since := time.Now().AddDate(-1, 0, 0)
until := time.Now()
includeTransfer, err := cmd.Flags().GetBool("include-transfer")
if err != nil { if err != nil {
return err return err
} }
if includeTransfer {
transferService, ok := exchange.(types.ExchangeTransferService)
if !ok {
return fmt.Errorf("session exchange %s does not implement transfer service", sessionName)
}
environ := bbgo.NewEnvironment() deposits, err := transferService.QueryDepositHistory(ctx, market.BaseCurrency, since, until)
if err := environ.ConfigureDatabase(ctx) ; err != nil { if err != nil {
return err return err
}
_ = deposits
withdrawals, err := transferService.QueryWithdrawHistory(ctx, market.BaseCurrency, since, until)
if err != nil {
return err
}
_ = withdrawals
// we need the backtest klines for the daily prices
backtestService := &service.BacktestService{DB: environ.DatabaseService.DB}
if err := backtestService.SyncKLineByInterval(ctx, exchange, symbol, types.Interval1d, since, until); err != nil {
return err
}
} }
var trades []types.Trade var trades []types.Trade
tradingFeeCurrency := exchange.PlatformFeeCurrency() tradingFeeCurrency := exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) { if strings.HasPrefix(symbol, tradingFeeCurrency) {
@ -71,7 +138,7 @@ var PnLCmd = &cobra.Command{
trades, err = environ.TradeService.Query(service.QueryTradesOptions{ trades, err = environ.TradeService.Query(service.QueryTradesOptions{
Exchange: exchange.Name(), Exchange: exchange.Name(),
Symbol: symbol, Symbol: symbol,
Limit: limit, Limit: limit,
}) })
} }

View File

@ -101,7 +101,11 @@ var TransferHistoryCmd = &cobra.Command{
var records timeSlice var records timeSlice
exchange := session.Exchange exchange, ok := session.Exchange.(types.ExchangeTransferService)
if !ok {
return fmt.Errorf("exchange session %s does not implement transfer service", sessionName)
}
deposits, err := exchange.QueryDepositHistory(ctx, asset, since, until) deposits, err := exchange.QueryDepositHistory(ctx, asset, since, until)
if err != nil { if err != nil {
return err return err

View File

@ -17,33 +17,41 @@ type BacktestService struct {
DB *sqlx.DB DB *sqlx.DB
} }
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
now := time.Now() log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
for interval := range types.SupportedIntervals {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval)
if err != nil { if err != nil {
return err
}
if lastKLine != nil {
log.Infof("found last checkpoint %s", lastKLine.EndTime)
startTime = lastKLine.StartTime.Add(time.Minute)
}
batch := &batch2.KLineBatchQuery{Exchange: exchange}
// should use channel here
klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime)
// var previousKLine types.KLine
for k := range klineC {
if err := s.Insert(k); err != nil {
return err return err
} }
}
if lastKLine != nil { if err := <-errC; err != nil {
log.Infof("found last checkpoint %s", lastKLine.EndTime) return err
startTime = lastKLine.StartTime.Add(time.Minute) }
}
batch := &batch2.KLineBatchQuery{Exchange: exchange} return nil
}
// should use channel here func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
klineC, errC := batch.Query(ctx, symbol, interval, startTime, now) endTime := time.Now()
// var previousKLine types.KLine for interval := range types.SupportedIntervals {
for k := range klineC { if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime) ; err != nil {
if err := s.Insert(k); err != nil {
return err
}
}
if err := <-errC; err != nil {
return err return err
} }
} }

View File

@ -5,221 +5,33 @@ import (
"errors" "errors"
"time" "time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface")
type SyncService struct { type SyncService struct {
TradeService *TradeService TradeService *TradeService
OrderService *OrderService OrderService *OrderService
RewardService *RewardService RewardService *RewardService
} WithdrawService *WithdrawService
DepositService *DepositService
func (s *SyncService) SyncRewards(ctx context.Context, exchange types.Exchange) error {
service, ok := exchange.(types.ExchangeRewardService)
if !ok {
return ErrNotImplemented
}
var rewardKeys = map[string]struct{}{}
var startTime time.Time
records, err := s.RewardService.QueryLast(exchange.Name(), 50)
if err != nil {
return err
}
if len(records) > 0 {
lastRecord := records[0]
startTime = lastRecord.CreatedAt.Time()
for _, record := range records {
rewardKeys[record.UUID] = struct{}{}
}
}
batchQuery := &batch.RewardBatchQuery{Service: service}
rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now())
for reward := range rewardsC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, ok := rewardKeys[reward.UUID]; ok {
continue
}
logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt)
if err := s.RewardService.Insert(reward); err != nil {
return err
}
}
return <-errC
}
func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
records, err := s.OrderService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50)
if err != nil {
return err
}
orderKeys := make(map[uint64]struct{})
var lastID uint64 = 0
if len(records) > 0 {
for _, record := range records {
orderKeys[record.OrderID] = struct{}{}
}
lastID = records[0].OrderID
startTime = records[0].CreationTime.Time()
}
b := &batch.ClosedOrderBatchQuery{Exchange: exchange}
ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID)
for order := range ordersC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, exists := orderKeys[order.OrderID]; exists {
continue
}
if err := s.OrderService.Insert(order); err != nil {
return err
}
}
return <-errC
}
func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string) error {
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
// records descending ordered
records, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50)
if err != nil {
return err
}
var tradeKeys = map[types.TradeKey]struct{}{}
var lastTradeID int64 = 1
if len(records) > 0 {
for _, record := range records {
tradeKeys[record.Key()] = struct{}{}
}
lastTradeID = records[0].ID
}
b := &batch.TradeBatchQuery{Exchange: exchange}
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
LastTradeID: lastTradeID,
})
for trade := range tradeC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
key := trade.Key()
if _, exists := tradeKeys[key]; exists {
continue
}
tradeKeys[key] = struct{}{}
logrus.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s",
trade.Exchange,
trade.ID,
trade.Symbol,
trade.Side,
trade.Price,
trade.Quantity,
trade.MakerOrTakerLabel(),
trade.Time.String())
if err := s.TradeService.Insert(trade); err != nil {
return err
}
}
return <-errC
} }
// SyncSessionSymbols syncs the trades from the given exchange session // SyncSessionSymbols syncs the trades from the given exchange session
func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error { func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error {
for _, symbol := range symbols { for _, symbol := range symbols {
if err := s.SyncTrades(ctx, exchange, symbol); err != nil { if err := s.TradeService.Sync(ctx, exchange, symbol); err != nil {
return err return err
} }
if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil { if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil {
return err return err
} }
}
if err := s.SyncRewards(ctx, exchange); err != nil { if err := s.RewardService.Sync(ctx, exchange); err != nil {
if err == ErrNotImplemented { if err != ErrNotImplemented {
continue
}
return err return err
} }
} }