refactor session sync

This commit is contained in:
c9s 2021-02-19 10:42:24 +08:00
parent 390c9b1a4b
commit 44fa74a4c9
7 changed files with 98 additions and 86 deletions

View File

@ -1,13 +1,17 @@
package bbgo
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/types"
)
type DataFetcher func() (interface{}, error)
@ -56,3 +60,11 @@ func WithCache(key string, obj interface{}, fetcher DataFetcher) error {
return nil
}
func LoadExchangeMarketsWithCache(ctx context.Context, ex types.Exchange) (markets types.MarketMap, err error) {
err = WithCache(fmt.Sprintf("%s-markets", ex.Name()), &markets, func() (interface{}, error) {
return ex.QueryMarkets(ctx)
})
return markets, err
}

View File

@ -441,9 +441,24 @@ func (environ *Environment) Connect(ctx context.Context) error {
return nil
}
func LoadExchangeMarketsWithCache(ctx context.Context, ex types.Exchange) (markets types.MarketMap, err error) {
err = WithCache(fmt.Sprintf("%s-markets", ex.Name()), &markets, func() (interface{}, error) {
return ex.QueryMarkets(ctx)
})
return markets, err
func (environ *Environment) SyncSession(ctx context.Context, session *ExchangeSession, startTime time.Time, defaultSymbols ...string) error {
symbols, err := getSessionSymbols(session, defaultSymbols...)
if err != nil {
return err
}
return environ.TradeSync.SyncSessionSymbols(ctx, session.Exchange, startTime, symbols...)
}
func getSessionSymbols(session *ExchangeSession, defaultSymbols ...string) ([]string, error) {
if session.IsolatedMargin {
return []string{session.IsolatedMarginSymbol}, nil
}
if len(defaultSymbols) > 0 {
return defaultSymbols, nil
}
return session.FindPossibleSymbols()
}

View File

@ -11,6 +11,7 @@ import (
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
type StandardIndicatorSet struct {
@ -522,3 +523,46 @@ func (session *ExchangeSession) UpdatePrices(ctx context.Context) (err error) {
session.lastPriceUpdatedAt = time.Now()
return err
}
func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err error) {
// If the session is an isolated margin session, there will be only the isolated margin symbol
if session.IsolatedMargin {
return []string{
session.IsolatedMarginSymbol,
}, nil
}
var balances = session.Account.Balances()
var fiatCurrencies = []string{"USDC", "USDT", "USD", "TWD", "EUR", "GBP"}
var fiatAssets []string
for _, currency := range fiatCurrencies {
if balance, ok := balances[currency]; ok && balance.Total() > 0 {
fiatAssets = append(fiatAssets, currency)
}
}
var symbolMap = map[string]struct{}{}
for _, market := range session.Markets() {
// ignore the markets that are not fiat currency markets
if !util.StringSliceContains(fiatAssets, market.QuoteCurrency) {
continue
}
// ignore the asset that we don't have in the balance sheet
balance, hasAsset := balances[market.BaseCurrency]
if !hasAsset || balance.Total() == 0 {
continue
}
symbolMap[market.Symbol] = struct{}{}
}
for s := range symbolMap {
symbols = append(symbols, s)
}
return symbols, nil
}

2
pkg/bbgo/sync.go Normal file
View File

@ -0,0 +1,2 @@
package bbgo

View File

@ -102,7 +102,7 @@ var SyncCmd = &cobra.Command{
return err
}
if err := environ.TradeSync.SyncSession(ctx, session, startTime, defaultSymbols...) ; err != nil {
if err := environ.SyncSession(ctx, session, startTime, defaultSymbols...) ; err != nil {
return err
}

View File

@ -6,8 +6,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
@ -41,8 +40,8 @@ func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, s
logrus.Infof("found last order, start from lastID = %d since %s", lastID, startTime)
}
batch := &batch2.ExchangeBatchProcessor{Exchange: exchange}
ordersC, errC := batch.BatchQueryClosedOrders(ctx, symbol, startTime, time.Now(), lastID)
b := &batch.ExchangeBatchProcessor{Exchange: exchange}
ordersC, errC := b.BatchQueryClosedOrders(ctx, symbol, startTime, time.Now(), lastID)
for order := range ordersC {
select {
@ -97,8 +96,8 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
logrus.Debugf("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime)
}
batch := &batch2.ExchangeBatchProcessor{Exchange: exchange}
tradeC, errC := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
b := &batch.ExchangeBatchProcessor{Exchange: exchange}
tradeC, errC := b.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
LastTradeID: lastTradeID,
})
@ -133,88 +132,17 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
return <-errC
}
// SyncSessionSymbols syncs the trades from the given exchange session
func (s *SyncService) SyncSessionSymbols(ctx context.Context, session *bbgo.ExchangeSession, startTime time.Time, symbols ...string) error {
func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error {
for _, symbol := range symbols {
logrus.Debugf("syncing trades from exchange session %s...", session.Name)
if err := s.SyncTrades(ctx, session.Exchange, symbol, startTime); err != nil {
if err := s.SyncTrades(ctx, exchange, symbol, startTime); err != nil {
return err
}
logrus.Debugf("syncing orders from exchange session %s...", session.Name)
if err := s.SyncOrders(ctx, session.Exchange, symbol, startTime); err != nil {
if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil {
return err
}
}
return nil
}
func (s *SyncService) SyncSession(ctx context.Context, session *bbgo.ExchangeSession, startTime time.Time, defaultSymbols ...string) error {
symbols, err := getSessionSymbols(session, defaultSymbols...)
if err != nil {
return err
}
return s.SyncSessionSymbols(ctx, session, startTime, symbols...)
}
func getSessionSymbols(session *bbgo.ExchangeSession, defaultSymbols ...string) ([]string, error) {
if session.IsolatedMargin {
return []string{session.IsolatedMarginSymbol}, nil
}
if len(defaultSymbols) > 0 {
return defaultSymbols, nil
}
return FindPossibleSymbols(session)
}
func FindPossibleSymbols(session *bbgo.ExchangeSession) (symbols []string, err error) {
var balances = session.Account.Balances()
var fiatCurrencies = []string{"USDC", "USDT", "USD", "TWD", "EUR", "GBP"}
var fiatAssets []string
for _, currency := range fiatCurrencies {
if balance, ok := balances[currency]; ok && balance.Total() > 0 {
fiatAssets = append(fiatAssets, currency)
}
}
var symbolMap = map[string]struct{}{}
for _, market := range session.Markets() {
// ignore the markets that are not fiat currency markets
if !stringSliceContains(fiatAssets, market.QuoteCurrency) {
continue
}
// ignore the asset that we don't have in the balance sheet
balance, hasAsset := balances[market.BaseCurrency]
if !hasAsset || balance.Total() == 0 {
continue
}
symbolMap[market.Symbol] = struct{}{}
}
for s := range symbolMap {
symbols = append(symbols, s)
}
return symbols, nil
}
func stringSliceContains(slice []string, needle string) bool {
for _, s := range slice {
if s == needle {
return true
}
}
return false
}

11
pkg/util/string.go Normal file
View File

@ -0,0 +1,11 @@
package util
func StringSliceContains(slice []string, needle string) bool {
for _, s := range slice {
if s == needle {
return true
}
}
return false
}