diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index ec8357d80..db8c8533d 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -2,14 +2,19 @@ package grid import ( "context" + "sync" "time" + "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/indicator" "github.com/c9s/bbgo/pkg/types" ) +var log = logrus.WithField("strategy", "grid") + // The indicators (SMA and EWMA) that we want to use are returning float64 data. type Float64Indicator interface { Last() float64 @@ -56,7 +61,7 @@ type Strategy struct { GridPips fixedpoint.Value `json:"gridPips"` // GridNum is the grid number (order numbers) - GridNum int `json:"gridNum"` + GridNum int `json:"gridNumber"` BaseQuantity float64 `json:"baseQuantity"` @@ -64,6 +69,7 @@ type Strategy struct { activeAskOrders map[uint64]types.Order boll *indicator.BOLL + mu sync.Mutex } func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { @@ -85,19 +91,19 @@ func (s *Strategy) updateBidOrders(orderExecutor bbgo.OrderExecutor, session *bb return } - var upBand = s.boll.LastUpBand() - var startPrice = upBand + var downBand = s.boll.LastDownBand() + var startPrice = downBand var submitOrders []types.SubmitOrder - for i := 0 ; i < numOrders ; i++ { + for i := 0; i < numOrders; i++ { submitOrders = append(submitOrders, types.SubmitOrder{ - Symbol: s.Symbol, - Side: types.SideTypeBuy, - Type: types.OrderTypeLimit, - Market: s.Market, - Quantity: s.BaseQuantity, - Price: startPrice, - TimeInForce: "GTC", + Symbol: s.Symbol, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimit, + Market: s.Market, + Quantity: s.BaseQuantity, + Price: startPrice, + TimeInForce: "GTC", }) startPrice -= s.GridPips.Float64() @@ -105,12 +111,17 @@ func (s *Strategy) updateBidOrders(orderExecutor bbgo.OrderExecutor, session *bb orders, err := orderExecutor.SubmitOrders(context.Background(), submitOrders...) if err != nil { + log.WithError(err).Error("submit bid order error") return } - for _, order := range orders { + s.mu.Lock() + for i := range orders { + var order = orders[i] + log.Infof("adding order %d to the active bid order pool...", order.OrderID) s.activeBidOrders[order.OrderID] = order } + s.mu.Unlock() } func (s *Strategy) updateAskOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) { @@ -127,19 +138,19 @@ func (s *Strategy) updateAskOrders(orderExecutor bbgo.OrderExecutor, session *bb return } - var downBand = s.boll.LastDownBand() - var startPrice = downBand + var upBand = s.boll.LastUpBand() + var startPrice = upBand var submitOrders []types.SubmitOrder - for i := 0 ; i < numOrders ; i++ { + for i := 0; i < numOrders; i++ { submitOrders = append(submitOrders, types.SubmitOrder{ - Symbol: s.Symbol, - Side: types.SideTypeSell, - Type: types.OrderTypeLimit, - Market: s.Market, - Quantity: s.BaseQuantity, - Price: startPrice, - TimeInForce: "GTC", + Symbol: s.Symbol, + Side: types.SideTypeSell, + Type: types.OrderTypeLimit, + Market: s.Market, + Quantity: s.BaseQuantity, + Price: startPrice, + TimeInForce: "GTC", }) startPrice += s.GridPips.Float64() @@ -147,43 +158,106 @@ func (s *Strategy) updateAskOrders(orderExecutor bbgo.OrderExecutor, session *bb orders, err := orderExecutor.SubmitOrders(context.Background(), submitOrders...) if err != nil { + log.WithError(err).Error("submit ask order error") return } - for _, order := range orders { + s.mu.Lock() + for i := range orders { + var order = orders[i] + log.Infof("adding order %d to the active ask order pool...", order.OrderID) s.activeAskOrders[order.OrderID] = order } + s.mu.Unlock() } func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) { + log.Infof("checking grid orders, bids=%d asks=%d", len(s.activeBidOrders), len(s.activeAskOrders)) + + for _, o := range s.activeBidOrders { + log.Infof("bid order: %d -> %s", o.OrderID, o.Status) + } + + for _, o := range s.activeAskOrders { + log.Infof("ask order: %d -> %s", o.OrderID, o.Status) + } + if len(s.activeBidOrders) < s.GridNum { + log.Infof("active bid orders not enough: %d < %d, updating...", len(s.activeBidOrders), s.GridNum) s.updateBidOrders(orderExecutor, session) } if len(s.activeAskOrders) < s.GridNum { + log.Infof("active ask orders not enough: %d < %d, updating...", len(s.activeAskOrders), s.GridNum) s.updateAskOrders(orderExecutor, session) } } func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { - // 1. we don't persist orders so that we can not clear the previous orders for now. just need time to support this. if s.GridNum == 0 { s.GridNum = 2 } - s.activeBidOrders = make(map[uint64]types.Order) - s.activeAskOrders = make(map[uint64]types.Order) s.boll = s.StandardIndicatorSet.GetBOLL(types.IntervalWindow{ Interval: s.Interval, Window: 21, }) - // session.Stream.OnOrderUpdate(func) + // we don't persist orders so that we can not clear the previous orders for now. just need time to support this. + // TODO: pull this map out and add mutex lock + s.activeBidOrders = make(map[uint64]types.Order) + s.activeAskOrders = make(map[uint64]types.Order) + + session.Stream.OnOrderUpdate(func(order types.Order) { + log.Infof("received order update: %+v", order) + + if order.Symbol != s.Symbol { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + switch order.Status { + + case types.OrderStatusCanceled, types.OrderStatusRejected: + log.Infof("order status %s, removing %d from the active order pool...", order.Status, order.OrderID) + + switch order.Side { + case types.SideTypeSell: + delete(s.activeAskOrders, order.OrderID) + case types.SideTypeBuy: + delete(s.activeBidOrders, order.OrderID) + + } + + default: + log.Infof("order status %s, updating %d to the active order pool...", order.Status, order.OrderID) + switch order.Side { + case types.SideTypeSell: + s.activeAskOrders[order.OrderID] = order + case types.SideTypeBuy: + s.activeBidOrders[order.OrderID] = order + } + } + }) go func() { - ticker := time.NewTicker(1 * time.Minute) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() + s.updateOrders(orderExecutor, session) + + defer func() { + for _, o := range s.activeBidOrders { + _ = session.Exchange.CancelOrders(context.Background(), o) + } + + for _, o := range s.activeAskOrders { + _ = session.Exchange.CancelOrders(context.Background(), o) + } + }() + for { select { case <-ctx.Done():