support: use trade collector

This commit is contained in:
c9s 2021-06-24 15:38:55 +08:00
parent aab0c377d7
commit 3165d10986

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/service"
@ -38,7 +37,7 @@ type Target struct {
type Strategy struct {
*bbgo.Notifiability `json:"-"`
*bbgo.Persistence
*bbgo.Graceful
*bbgo.Graceful `json:"-"`
Symbol string `json:"symbol"`
Market types.Market `json:"-"`
@ -72,9 +71,9 @@ type Strategy struct {
ScaleQuantity *bbgo.PriceVolumeScale `json:"scaleQuantity"`
tradeCollector *bbgo.TradeCollector
orderStore *bbgo.OrderStore
tradeStore *bbgo.TradeStore
tradeC chan types.Trade
state *State
triggerEMA *indicator.EWMA
@ -101,23 +100,6 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: string(s.Interval)})
}
func (s *Strategy) handleTradeUpdate(trade types.Trade) {
s.tradeC <- trade
}
func (s *Strategy) tradeCollector(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case trade := <-s.tradeC:
s.tradeStore.Add(trade)
}
}
}
func (s *Strategy) SaveState() error {
if err := s.Persistence.Save(s.state, ID, s.Symbol, stateKey); err != nil {
return err
@ -142,6 +124,10 @@ func (s *Strategy) LoadState() error {
log.Infof("state is restored: %+v", s.state)
}
if s.state.Position == nil {
s.state.Position = bbgo.NewPositionFromMarket(s.Market)
}
return nil
}
@ -250,9 +236,6 @@ func (s *Strategy) detectResistance(kline types.KLine) (confidence fixedpoint.Va
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
// buffer 100 trades in the channel
s.tradeC = make(chan types.Trade, 100)
s.tradeStore = bbgo.NewTradeStore(s.Symbol)
// set default values
if s.Interval == "" {
@ -314,18 +297,10 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.Notify("%s state is restored => %+v", s.Symbol, s.state)
}
// init state
if s.state.Position == nil {
s.state.Position = &bbgo.Position{
Symbol: s.Symbol,
BaseCurrency: market.BaseCurrency,
QuoteCurrency: market.QuoteCurrency,
}
}
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.state.Position, s.orderStore)
s.tradeCollector.BindStream(session.UserDataStream)
go s.tradeCollector.Run(ctx)
go s.tradeCollector(ctx)
session.UserDataStream.OnTradeUpdate(s.handleTradeUpdate)
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
// skip k-lines from other symbols
if kline.Symbol != s.Symbol {
@ -387,14 +362,17 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// check taker buy ratio, we need strong buy taker
if s.TakerBuyRatio > 0 {
takerBuyRatio := kline.TakerBuyBaseAssetVolume / kline.Volume
takerBuyBaseVolumeThreshold := kline.Volume * s.TakerBuyRatio.Float64()
if kline.TakerBuyBaseAssetVolume < takerBuyBaseVolumeThreshold {
s.Notify("%s: taker buy base volume %f is less than threshold %f (volume %f volume ratio %f), skipping this support",
if takerBuyRatio < s.TakerBuyRatio.Float64() {
s.Notify("%s: taker buy base volume %f (volume ratio %f) is less than %f (volume ratio %f)",
s.Symbol,
kline.TakerBuyBaseAssetVolume,
takerBuyRatio,
takerBuyBaseVolumeThreshold,
kline.Volume,
s.TakerBuyRatio.Float64(),
kline,
)
return
}
@ -443,17 +421,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return
}
time.Sleep(500 * time.Millisecond)
trades := s.tradeStore.GetAndClear()
for _, trade := range trades {
if s.orderStore.Exists(trade.OrderID) {
s.Notify(trade)
s.state.Position.AddTrade(trade)
}
}
s.Notify(s.state.Position)
// submit target orders
var targetOrders []types.SubmitOrder
for _, target := range s.Targets {