notify trades and update position

This commit is contained in:
c9s 2021-05-31 01:02:35 +08:00
parent 41daea9e05
commit e5db780be8
2 changed files with 85 additions and 1 deletions

View File

@ -56,6 +56,17 @@ func (s *TradeStore) Clear() {
s.mu.Unlock() s.mu.Unlock()
} }
func (s *TradeStore) GetAndClear() (trades []types.Trade) {
s.mu.Lock()
for _, o := range s.trades {
trades = append(trades, o)
}
s.trades = make(map[int64]types.Trade)
s.mu.Unlock()
return trades
}
func (s *TradeStore) Add(trades ...types.Trade) { func (s *TradeStore) Add(trades ...types.Trade) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()

View File

@ -3,7 +3,9 @@ package support
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"github.com/c9s/bbgo/pkg/service"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
@ -13,12 +15,18 @@ import (
const ID = "support" const ID = "support"
const stateKey = "state-v1"
var log = logrus.WithField("strategy", ID) var log = logrus.WithField("strategy", ID)
func init() { func init() {
bbgo.RegisterStrategy(ID, &Strategy{}) bbgo.RegisterStrategy(ID, &Strategy{})
} }
type State struct {
Position *bbgo.Position `json:"position,omitempty"`
}
type Target struct { type Target struct {
ProfitPercentage float64 `json:"profitPercentage"` ProfitPercentage float64 `json:"profitPercentage"`
QuantityPercentage float64 `json:"quantityPercentage"` QuantityPercentage float64 `json:"quantityPercentage"`
@ -27,6 +35,8 @@ type Target struct {
type Strategy struct { type Strategy struct {
*bbgo.Notifiability *bbgo.Notifiability
*bbgo.Persistence
*bbgo.Graceful
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"` Interval types.Interval `json:"interval"`
@ -46,6 +56,7 @@ type Strategy struct {
orderStore *bbgo.OrderStore orderStore *bbgo.OrderStore
tradeStore *bbgo.TradeStore tradeStore *bbgo.TradeStore
tradeC chan types.Trade tradeC chan types.Trade
state *State
} }
func (s *Strategy) ID() string { func (s *Strategy) ID() string {
@ -85,6 +96,33 @@ func (s *Strategy) tradeCollector(ctx context.Context) {
} }
} }
func (s *Strategy) SaveState() error {
if err := s.Persistence.Save(s.state, ID, s.Symbol, stateKey); err != nil {
return err
} else {
log.Infof("state is saved => %+v", s.state)
}
return nil
}
func (s *Strategy) LoadState() error {
var state State
// load position
if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err != nil {
if err != service.ErrPersistenceNotExists {
return err
}
s.state = &State{}
} else {
s.state = &state
log.Infof("state is restored: %+v", s.state)
}
return nil
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
// buffer 100 trades in the channel // buffer 100 trades in the channel
s.tradeC = make(chan types.Trade, 100) s.tradeC = make(chan types.Trade, 100)
@ -115,6 +153,21 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
var iw = types.IntervalWindow{Interval: s.Interval, Window: s.MovingAverageWindow} var iw = types.IntervalWindow{Interval: s.Interval, Window: s.MovingAverageWindow}
var ema = standardIndicatorSet.EWMA(iw) var ema = standardIndicatorSet.EWMA(iw)
if err := s.LoadState(); err != nil {
return err
} else {
s.Notify("%s state is restored => %+v", s.Symbol, s.state)
}
// init state
if s.state.Position == nil {
s.state.Position = &bbgo.Position{
Symbol: s.Symbol,
BaseCurrency: market.BaseCurrency,
QuoteCurrency: market.QuoteCurrency,
}
}
go s.tradeCollector(ctx) go s.tradeCollector(ctx)
session.UserDataStream.OnTradeUpdate(s.handleTradeUpdate) session.UserDataStream.OnTradeUpdate(s.handleTradeUpdate)
@ -152,7 +205,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
closePrice.Float64(), closePrice.Float64(),
ema.Last(), ema.Last(),
kline.Volume, kline.Volume,
s.MinVolume.Float64(), kline) s.MinVolume.Float64(),
kline)
var quantity fixedpoint.Value var quantity fixedpoint.Value
if s.Quantity > 0 { if s.Quantity > 0 {
@ -221,6 +275,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
} }
s.orderStore.Add(createdOrders...) s.orderStore.Add(createdOrders...)
trades := s.tradeStore.GetAndClear()
for _, trade := range trades {
if s.orderStore.Exists(trade.OrderID) {
s.Notify(trade)
s.state.Position.AddTrade(trade)
}
}
s.Notify(s.state.Position)
// submit target orders // submit target orders
var targetOrders []types.SubmitOrder var targetOrders []types.SubmitOrder
for _, target := range s.Targets { for _, target := range s.Targets {
@ -255,5 +318,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
} }
}) })
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
if err := s.SaveState(); err != nil {
log.WithError(err).Errorf("can not save state: %+v", s.state)
} else {
s.Notify("%s position is saved", s.Symbol, s.state.Position)
}
})
return nil return nil
} }