From a90184a4647dbcbfcdcef3e9a43dace69f320d5a Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 5 Sep 2020 16:22:46 +0800 Subject: [PATCH] add kline store --- bbgo/kline_regression.go | 146 ++++++++++++++++++ bbgo/pnl.go | 10 +- {service => bbgo/service}/trade.go | 8 +- {slack => bbgo/slack}/logrus_look.go | 3 +- {slack => bbgo/slack}/slackstyle/style.go.go | 0 bbgo/store.go | 48 ++++++ bbgo/trader.go | 148 ++----------------- 7 files changed, 216 insertions(+), 147 deletions(-) create mode 100644 bbgo/kline_regression.go rename {service => bbgo/service}/trade.go (99%) rename {slack => bbgo/slack}/logrus_look.go (99%) rename {slack => bbgo/slack}/slackstyle/style.go.go (100%) create mode 100644 bbgo/store.go diff --git a/bbgo/kline_regression.go b/bbgo/kline_regression.go new file mode 100644 index 000000000..55f584f82 --- /dev/null +++ b/bbgo/kline_regression.go @@ -0,0 +1,146 @@ +package bbgo + +import ( + "context" + "fmt" + "time" + + "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/bbgo/types" + "github.com/c9s/bbgo/pkg/util" +) + +type KLineRegressionTrader struct { + // Context is trading Context + Context *TradingContext + SourceKLines []types.KLine + ProfitAndLossCalculator *ProfitAndLossCalculator + + doneOrders []*types.SubmitOrder + pendingOrders []*types.SubmitOrder +} + +func (trader *KLineRegressionTrader) SubmitOrder(cxt context.Context, order *types.SubmitOrder) { + trader.pendingOrders = append(trader.pendingOrders, order) +} + +func (trader *KLineRegressionTrader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) { + logrus.Infof("[regression] number of kline data: %d", len(trader.SourceKLines)) + + maxExposure := 0.4 + trader.Context.Quota = make(map[string]types.Balance) + for currency, balance := range trader.Context.Balances { + quota := balance + quota.Available *= maxExposure + trader.Context.Quota[currency] = quota + } + + done := make(chan struct{}) + defer close(done) + + if err := strategy.Init(trader.Context, trader); err != nil { + return nil, err + } + + standardStream := types.StandardPrivateStream{} + if err := strategy.OnNewStream(&standardStream); err != nil { + return nil, err + } + + var tradeID int64 = 0 + for _, kline := range trader.SourceKLines { + logrus.Debugf("kline %+v", kline) + + fmt.Print(".") + + standardStream.EmitKLineClosed(&kline) + + for _, order := range trader.pendingOrders { + switch order.Side { + case types.SideTypeBuy: + fmt.Print("B") + case types.SideTypeSell: + fmt.Print("S") + } + + var price float64 + if order.Type == types.OrderTypeLimit { + price = util.MustParseFloat(order.Price) + } else { + price = kline.GetClose() + } + + volume := util.MustParseFloat(order.Quantity) + fee := 0.0 + feeCurrency := "" + + trader.Context.Lock() + if order.Side == types.SideTypeBuy { + fee = price * volume * 0.001 + feeCurrency = "USDT" + + quote := trader.Context.Balances[trader.Context.Market.QuoteCurrency] + + if quote.Available < volume*price { + logrus.Fatalf("quote balance not enough: %+v", quote) + } + quote.Available -= volume * price + trader.Context.Balances[trader.Context.Market.QuoteCurrency] = quote + + base := trader.Context.Balances[trader.Context.Market.BaseCurrency] + base.Available += volume + trader.Context.Balances[trader.Context.Market.BaseCurrency] = base + + } else { + fee = volume * 0.001 + feeCurrency = "BTC" + + base := trader.Context.Balances[trader.Context.Market.BaseCurrency] + if base.Available < volume { + logrus.Fatalf("base balance not enough: %+v", base) + } + + base.Available -= volume + trader.Context.Balances[trader.Context.Market.BaseCurrency] = base + + quote := trader.Context.Balances[trader.Context.Market.QuoteCurrency] + quote.Available += volume * price + trader.Context.Balances[trader.Context.Market.QuoteCurrency] = quote + } + trader.Context.Unlock() + + trade := types.Trade{ + ID: tradeID, + Price: price, + Quantity: volume, + Side: string(order.Side), + IsBuyer: order.Side == types.SideTypeBuy, + IsMaker: false, + Time: time.Unix(0, kline.EndTime*int64(time.Millisecond)), + Symbol: trader.Context.Symbol, + Fee: fee, + FeeCurrency: feeCurrency, + } + + tradeID++ + trader.ProfitAndLossCalculator.AddTrade(trade) + + trader.doneOrders = append(trader.doneOrders, order) + } + + // clear pending orders + trader.pendingOrders = nil + } + + fmt.Print("\n") + report := trader.ProfitAndLossCalculator.Calculate() + report.Print() + + logrus.Infof("wallet balance:") + for _, balance := range trader.Context.Balances { + logrus.Infof(" %s: %f", balance.Currency, balance.Available) + } + + return done, nil +} diff --git a/bbgo/pnl.go b/bbgo/pnl.go index f62e86189..673ff5b7c 100644 --- a/bbgo/pnl.go +++ b/bbgo/pnl.go @@ -1,13 +1,15 @@ package bbgo import ( - "github.com/c9s/bbgo/pkg/bbgo/types" - "github.com/c9s/bbgo/pkg/slack/slackstyle" - log "github.com/sirupsen/logrus" - "github.com/slack-go/slack" "strconv" "strings" "time" + + log "github.com/sirupsen/logrus" + "github.com/slack-go/slack" + + "github.com/c9s/bbgo/pkg/bbgo/slack/slackstyle" + "github.com/c9s/bbgo/pkg/bbgo/types" ) type ProfitAndLossCalculator struct { diff --git a/service/trade.go b/bbgo/service/trade.go similarity index 99% rename from service/trade.go rename to bbgo/service/trade.go index b2a1036ee..a16720c86 100644 --- a/service/trade.go +++ b/bbgo/service/trade.go @@ -2,12 +2,14 @@ package service import ( "context" - "github.com/c9s/bbgo/pkg/bbgo/exchange/binance" - "github.com/c9s/bbgo/pkg/bbgo/types" + "time" + "github.com/jmoiron/sqlx" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "time" + + "github.com/c9s/bbgo/pkg/bbgo/exchange/binance" + "github.com/c9s/bbgo/pkg/bbgo/types" ) type TradeSync struct { diff --git a/slack/logrus_look.go b/bbgo/slack/logrus_look.go similarity index 99% rename from slack/logrus_look.go rename to bbgo/slack/logrus_look.go index be3e48101..25c6efb95 100644 --- a/slack/logrus_look.go +++ b/bbgo/slack/logrus_look.go @@ -3,9 +3,10 @@ package slack import ( "context" "fmt" + "strings" + "github.com/sirupsen/logrus" "github.com/slack-go/slack" - "strings" ) type LogHook struct { diff --git a/slack/slackstyle/style.go.go b/bbgo/slack/slackstyle/style.go.go similarity index 100% rename from slack/slackstyle/style.go.go rename to bbgo/slack/slackstyle/style.go.go diff --git a/bbgo/store.go b/bbgo/store.go new file mode 100644 index 000000000..1e14a4d29 --- /dev/null +++ b/bbgo/store.go @@ -0,0 +1,48 @@ +package bbgo + +import ( + "github.com/c9s/bbgo/pkg/bbgo/types" +) + +type Interval string + +var Interval1m = Interval("1m") +var Interval5m = Interval("5m") +var Interval1h = Interval("1h") +var Interval1d = Interval("1d") + +type KLineStore struct { + // MaxKLines stores the max change kline per interval + MaxKLines map[Interval]types.KLine `json:"-"` + + // KLineWindows stores all loaded klines per interval + KLineWindows map[Interval]types.KLineWindow `json:"-"` +} + +func NewKLineStore() *KLineStore { + return &KLineStore{ + MaxKLines: make(map[Interval]types.KLine), + + // KLineWindows stores all loaded klines per interval + KLineWindows: make(map[Interval]types.KLineWindow), + } +} + +func (store *KLineStore) BindPrivateStream(stream *types.StandardPrivateStream) { + stream.OnKLineClosed(store.handleKLineClosed) +} + +func (store *KLineStore) handleKLineClosed(kline *types.KLine) { + store.AddKLine(*kline) +} + +func (store *KLineStore) AddKLine(kline types.KLine) { + var interval = Interval(kline.Interval) + + var window = store.KLineWindows[interval] + window.Add(kline) + + if kline.GetMaxChange() > store.MaxKLines[interval].GetMaxChange() { + store.MaxKLines[interval] = kline + } +} diff --git a/bbgo/trader.go b/bbgo/trader.go index a317b6ef9..34d057354 100644 --- a/bbgo/trader.go +++ b/bbgo/trader.go @@ -2,11 +2,11 @@ package bbgo import ( "context" - "fmt" - "github.com/c9s/bbgo/pkg/service" - "github.com/c9s/bbgo/pkg/util" "time" + "github.com/c9s/bbgo/pkg/bbgo/service" + "github.com/c9s/bbgo/pkg/util" + log "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/bbgo/exchange/binance" @@ -18,140 +18,6 @@ type Strategy interface { OnNewStream(stream *types.StandardPrivateStream) error } -type KLineRegressionTrader struct { - // Context is trading Context - Context *TradingContext - SourceKLines []types.KLine - ProfitAndLossCalculator *ProfitAndLossCalculator - - doneOrders []*types.SubmitOrder - pendingOrders []*types.SubmitOrder -} - -func (trader *KLineRegressionTrader) SubmitOrder(cxt context.Context, order *types.SubmitOrder) { - trader.pendingOrders = append(trader.pendingOrders, order) -} - -func (trader *KLineRegressionTrader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) { - log.Infof("[regression] number of kline data: %d", len(trader.SourceKLines)) - - maxExposure := 0.4 - trader.Context.Quota = make(map[string]types.Balance) - for currency, balance := range trader.Context.Balances { - quota := balance - quota.Available *= maxExposure - trader.Context.Quota[ currency ] = quota - } - - done := make(chan struct{}) - defer close(done) - - if err := strategy.Init(trader.Context, trader); err != nil { - return nil, err - } - - standardStream := types.StandardPrivateStream{} - if err := strategy.OnNewStream(&standardStream); err != nil { - return nil, err - } - - var tradeID int64 = 0 - for _, kline := range trader.SourceKLines { - log.Debugf("kline %+v", kline) - - fmt.Print(".") - - standardStream.EmitKLineClosed(&kline) - - for _, order := range trader.pendingOrders { - switch order.Side { - case types.SideTypeBuy: - fmt.Print("B") - case types.SideTypeSell: - fmt.Print("S") - } - - var price float64 - if order.Type == types.OrderTypeLimit { - price = util.MustParseFloat(order.Price) - } else { - price = kline.GetClose() - } - - volume := util.MustParseFloat(order.Quantity) - fee := 0.0 - feeCurrency := "" - - trader.Context.Lock() - if order.Side == types.SideTypeBuy { - fee = price * volume * 0.001 - feeCurrency = "USDT" - - quote := trader.Context.Balances[trader.Context.Market.QuoteCurrency] - - if quote.Available < volume*price { - log.Fatalf("quote balance not enough: %+v", quote) - } - quote.Available -= volume * price - trader.Context.Balances[trader.Context.Market.QuoteCurrency] = quote - - base := trader.Context.Balances[trader.Context.Market.BaseCurrency] - base.Available += volume - trader.Context.Balances[trader.Context.Market.BaseCurrency] = base - - } else { - fee = volume * 0.001 - feeCurrency = "BTC" - - base := trader.Context.Balances[trader.Context.Market.BaseCurrency] - if base.Available < volume { - log.Fatalf("base balance not enough: %+v", base) - } - - base.Available -= volume - trader.Context.Balances[trader.Context.Market.BaseCurrency] = base - - quote := trader.Context.Balances[trader.Context.Market.QuoteCurrency] - quote.Available += volume * price - trader.Context.Balances[trader.Context.Market.QuoteCurrency] = quote - } - trader.Context.Unlock() - - trade := types.Trade{ - ID: tradeID, - Price: price, - Quantity: volume, - Side: string(order.Side), - IsBuyer: order.Side == types.SideTypeBuy, - IsMaker: false, - Time: time.Unix(0, kline.EndTime*int64(time.Millisecond)), - Symbol: trader.Context.Symbol, - Fee: fee, - FeeCurrency: feeCurrency, - } - - tradeID++ - trader.ProfitAndLossCalculator.AddTrade(trade) - - trader.doneOrders = append(trader.doneOrders, order) - } - - // clear pending orders - trader.pendingOrders = nil - } - - fmt.Print("\n") - report := trader.ProfitAndLossCalculator.Calculate() - report.Print() - - log.Infof("wallet balance:") - for _, balance := range trader.Context.Balances { - log.Infof(" %s: %f", balance.Currency, balance.Available) - } - - return done, nil -} - type Trader struct { Notifier *SlackNotifier @@ -191,6 +57,10 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan return nil, err } + // bind kline store to the stream + klineStore := NewKLineStore() + klineStore.BindPrivateStream(&stream.StandardPrivateStream) + if err := strategy.OnNewStream(&stream.StandardPrivateStream); err != nil { return nil, err } @@ -204,13 +74,13 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan return } - if err := trader.TradeService.Insert(*trade) ; err != nil { + if err := trader.TradeService.Insert(*trade); err != nil { log.WithError(err).Error("trade insert error") } trader.ReportTrade(trade) trader.ProfitAndLossCalculator.AddTrade(*trade) - _ , err := trader.Context.StockManager.AddTrades([]types.Trade{*trade}) + _, err := trader.Context.StockManager.AddTrades([]types.Trade{*trade}) if err != nil { log.WithError(err).Error("stock manager load trades error") }