refactor trade command

This commit is contained in:
c9s 2020-09-07 14:20:03 +08:00
parent a90184a464
commit ec839d0dc9
5 changed files with 185 additions and 56 deletions

59
bbgo/account.go Normal file
View File

@ -0,0 +1,59 @@
package bbgo
import (
"context"
"sync"
"github.com/c9s/bbgo/pkg/bbgo/exchange/binance"
"github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/c9s/bbgo/pkg/util"
log "github.com/sirupsen/logrus"
)
type Account struct {
mu sync.Mutex
Balances map[string]types.Balance
}
func LoadAccount(ctx context.Context, exchange *binance.Exchange) (*Account, error) {
balances, err := exchange.QueryAccountBalances(ctx)
return &Account{
Balances: balances,
}, err
}
func (a *Account) BindPrivateStream(stream *binance.PrivateStream) {
stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) {
a.mu.Lock()
defer a.mu.Unlock()
for _, balance := range snapshot {
a.Balances[balance.Currency] = balance
}
})
stream.OnOutboundAccountInfoEvent(func(e *binance.OutboundAccountInfoEvent) {
})
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
a.mu.Lock()
defer a.mu.Unlock()
delta := util.MustParseFloat(e.Delta)
if balance, ok := a.Balances[e.Asset]; ok {
balance.Available += delta
a.Balances[e.Asset] = balance
}
})
}
func (a *Account) Print() {
for _, balance := range a.Balances {
if util.NotZero(balance.Available) {
log.Infof("[trader] balance %s %f", balance.Currency, balance.Available)
}
}
}

View File

@ -1,11 +1,12 @@
package bbgo
import (
"github.com/c9s/bbgo/pkg/bbgo/types"
"sync"
"github.com/c9s/bbgo/pkg/bbgo/types"
)
type TradingContext struct {
type Context struct {
sync.Mutex
Symbol string
@ -22,6 +23,6 @@ type TradingContext struct {
StockManager *StockManager
}
func (c *TradingContext) SetCurrentPrice(price float64) {
func (c *Context) SetCurrentPrice(price float64) {
c.CurrentPrice = price
}

View File

@ -13,7 +13,7 @@ import (
type KLineRegressionTrader struct {
// Context is trading Context
Context *TradingContext
Context *Context
SourceKLines []types.KLine
ProfitAndLossCalculator *ProfitAndLossCalculator
@ -39,7 +39,7 @@ func (trader *KLineRegressionTrader) RunStrategy(ctx context.Context, strategy S
done := make(chan struct{})
defer close(done)
if err := strategy.Init(trader.Context, trader); err != nil {
if err := strategy.Load(trader.Context, trader); err != nil {
return nil, err
}

View File

@ -12,19 +12,19 @@ var Interval1h = Interval("1h")
var Interval1d = Interval("1d")
type KLineStore struct {
// MaxKLines stores the max change kline per interval
MaxKLines map[Interval]types.KLine `json:"-"`
// MaxChanges stores the max change kline per interval
MaxChanges map[Interval]types.KLine `json:"-"`
// KLineWindows stores all loaded klines per interval
KLineWindows map[Interval]types.KLineWindow `json:"-"`
// Windows stores all loaded klines per interval
Windows map[Interval]types.KLineWindow `json:"-"`
}
func NewKLineStore() *KLineStore {
return &KLineStore{
MaxKLines: make(map[Interval]types.KLine),
MaxChanges: make(map[Interval]types.KLine),
// KLineWindows stores all loaded klines per interval
KLineWindows: make(map[Interval]types.KLineWindow),
// Windows stores all loaded klines per interval
Windows: make(map[Interval]types.KLineWindow),
}
}
@ -39,10 +39,10 @@ func (store *KLineStore) handleKLineClosed(kline *types.KLine) {
func (store *KLineStore) AddKLine(kline types.KLine) {
var interval = Interval(kline.Interval)
var window = store.KLineWindows[interval]
var window = store.Windows[interval]
window.Add(kline)
if kline.GetMaxChange() > store.MaxKLines[interval].GetMaxChange() {
store.MaxKLines[interval] = kline
if kline.GetMaxChange() > store.MaxChanges[interval].GetMaxChange() {
store.MaxChanges[interval] = kline
}
}

View File

@ -2,27 +2,33 @@ package bbgo
import (
"context"
"fmt"
"strings"
"time"
"github.com/c9s/bbgo/pkg/bbgo/service"
"github.com/c9s/bbgo/pkg/util"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo/service"
"github.com/c9s/bbgo/pkg/bbgo/exchange/binance"
"github.com/c9s/bbgo/pkg/bbgo/types"
)
type Strategy interface {
Init(tradingContext *TradingContext, trader types.Trader) error
Load(tradingContext *Context, trader types.Trader) error
OnNewStream(stream *types.StandardPrivateStream) error
}
type Trader struct {
Symbol string
TradeService *service.TradeService
TradeSync *service.TradeSync
Notifier *SlackNotifier
// Context is trading Context
Context *TradingContext
Context *Context
Exchange *binance.Exchange
@ -30,25 +36,105 @@ type Trader struct {
ProfitAndLossCalculator *ProfitAndLossCalculator
TradeService *service.TradeService
Account *Account
}
func NewTrader(db *sqlx.DB, exchange *binance.Exchange, symbol string) *Trader {
tradeService := &service.TradeService{DB: db}
tradeSync := &service.TradeSync{Service: tradeService, Exchange: exchange}
return &Trader{
Symbol: symbol,
TradeService: tradeService,
TradeSync: tradeSync,
}
}
func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error {
log.Info("syncing trades...")
if err := trader.TradeSync.Sync(ctx, trader.Symbol, startTime); err != nil {
return err
}
var err error
var trades []types.Trade
tradingFeeCurrency := trader.Exchange.TradingFeeCurrency()
if strings.HasPrefix(trader.Symbol, tradingFeeCurrency) {
trades, err = trader.TradeService.QueryForTradingFeeCurrency(trader.Symbol, tradingFeeCurrency)
} else {
trades, err = trader.TradeService.Query(trader.Symbol)
}
if err != nil {
return err
}
log.Infof("%d trades loaded", len(trades))
stockManager := &StockManager{
Symbol: trader.Symbol,
TradingFeeCurrency: tradingFeeCurrency,
}
checkpoints, err := stockManager.AddTrades(trades)
if err != nil {
return err
}
log.Infof("found checkpoints: %+v", checkpoints)
market, ok := types.FindMarket(trader.Symbol)
if !ok {
return fmt.Errorf("%s market not found", trader.Symbol)
}
currentPrice, err := trader.Exchange.QueryAveragePrice(ctx, trader.Symbol)
if err != nil {
return err
}
trader.Context = &Context{
CurrentPrice: currentPrice,
Symbol: trader.Symbol,
Market: market,
StockManager: stockManager,
}
/*
if len(checkpoints) > 0 {
// get the last checkpoint
idx := checkpoints[len(checkpoints)-1]
if idx < len(trades)-1 {
trades = trades[idx:]
firstTrade := trades[0]
pnlStartTime = firstTrade.Time
notifier.Notify("%s Found the latest trade checkpoint %s", firstTrade.Symbol, firstTrade.Time, firstTrade)
}
}
*/
trader.ProfitAndLossCalculator = &ProfitAndLossCalculator{
TradingFeeCurrency: tradingFeeCurrency,
Symbol: trader.Symbol,
StartTime: startTime,
CurrentPrice: currentPrice,
Trades: trades,
}
account, err := LoadAccount(ctx, trader.Exchange)
if err != nil {
return err
}
trader.Account = account
trader.Context.Balances = account.Balances
account.Print()
return nil
}
func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) {
symbol := trader.Context.Symbol
balances, err := trader.Exchange.QueryAccountBalances(ctx)
if err != nil {
return nil, err
}
trader.Context.Balances = balances
for _, balance := range balances {
if util.NotZero(balance.Available) {
log.Infof("[trader] balance %s %f", balance.Currency, balance.Available)
}
}
if err := strategy.Init(trader.Context, trader); err != nil {
if err := strategy.Load(trader.Context, trader); err != nil {
return nil, err
}
@ -61,6 +147,8 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
klineStore := NewKLineStore()
klineStore.BindPrivateStream(&stream.StandardPrivateStream)
trader.Account.BindPrivateStream(stream)
if err := strategy.OnNewStream(&stream.StandardPrivateStream); err != nil {
return nil, err
}
@ -70,7 +158,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
})
stream.OnTrade(func(trade *types.Trade) {
if trade.Symbol != symbol {
if trade.Symbol != trader.Symbol {
return
}
@ -99,25 +187,6 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
trader.Context.SetCurrentPrice(e.KLine.GetClose())
})
stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) {
trader.Context.Lock()
defer trader.Context.Unlock()
for _, balance := range snapshot {
trader.Context.Balances[balance.Currency] = balance
}
})
// stream.OnOutboundAccountInfoEvent(func(e *binance.OutboundAccountInfoEvent) { })
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
trader.Context.Lock()
defer trader.Context.Unlock()
delta := util.MustParseFloat(e.Delta)
if balance, ok := trader.Context.Balances[e.Asset]; ok {
balance.Available += delta
trader.Context.Balances[e.Asset] = balance
}
})
var eventC = make(chan interface{}, 20)
if err := stream.Connect(ctx, eventC); err != nil {
return nil, err