diff --git a/pkg/strategy/convert/strategy.go b/pkg/strategy/convert/strategy.go index 0ceaf10e9..5f8618864 100644 --- a/pkg/strategy/convert/strategy.go +++ b/pkg/strategy/convert/strategy.go @@ -51,7 +51,7 @@ type Strategy struct { session *bbgo.ExchangeSession orderExecutor *bbgo.SimpleOrderExecutor - pendingQuantity map[string]fixedpoint.Value + pendingQuantity map[string]fixedpoint.Value `persistence:"pendingQuantities"` pendingQuantityLock sync.Mutex } @@ -109,12 +109,6 @@ func (s *Strategy) handleOrderFilled(ctx context.Context, order types.Order) { nextMarket := s.indirectMarkets[i+1] - ticker, err := s.session.Exchange.QueryTicker(ctx, nextMarket.Symbol) - if err != nil { - log.WithError(err).Errorf("unable to query ticker") - return - } - quantity := order.Quantity quoteQuantity := quantity.Mul(order.Price) @@ -125,7 +119,7 @@ func (s *Strategy) handleOrderFilled(ctx context.Context, order types.Order) { quoteQuantity = quoteQuantity.Sub(quoteFee) } - if err := s.convertBalance(ctx, market.QuoteCurrency, quoteQuantity, nextMarket, ticker); err != nil { + if err := s.convertBalance(ctx, market.QuoteCurrency, quoteQuantity, nextMarket); err != nil { log.WithError(err).Errorf("unable to convert balance") } @@ -134,7 +128,7 @@ func (s *Strategy) handleOrderFilled(ctx context.Context, order types.Order) { quantity = quantity.Sub(baseFee) } - if err := s.convertBalance(ctx, market.BaseCurrency, quantity, nextMarket, ticker); err != nil { + if err := s.convertBalance(ctx, market.BaseCurrency, quantity, nextMarket); err != nil { log.WithError(err).Errorf("unable to convert balance") } } @@ -166,6 +160,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se }) } + bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { + s.collectPendingQuantity(ctx) + + _ = s.orderExecutor.GracefulCancel(ctx) + }) + return nil } @@ -202,11 +202,7 @@ func (s *Strategy) getSourceMarket() (types.Market, bool) { // convert triggers a convert order func (s *Strategy) convert(ctx context.Context) error { - s.collectPendingQuantity() - - if err := s.orderExecutor.GracefulCancel(ctx); err != nil { - log.WithError(err).Warn("unable to cancel orders") - } + s.collectPendingQuantity(ctx) // sleep one second for exchange to unlock the balance time.Sleep(time.Second) @@ -220,11 +216,6 @@ func (s *Strategy) convert(ctx context.Context) error { log.Debugf("converting %s to %s, current balance: %+v", s.From, s.To, fromAsset) if sourceMarket, ok := s.getSourceMarket(); ok { - ticker, err := s.session.Exchange.QueryTicker(ctx, sourceMarket.Symbol) - if err != nil { - return err - } - quantity := fromAsset.Available if !s.MinBalance.IsZero() { @@ -238,7 +229,7 @@ func (s *Strategy) convert(ctx context.Context) error { quantity = fixedpoint.Min(s.MaxQuantity, quantity) } - if err := s.convertBalance(ctx, fromAsset.Currency, quantity, sourceMarket, ticker); err != nil { + if err := s.convertBalance(ctx, fromAsset.Currency, quantity, sourceMarket); err != nil { return err } } @@ -246,39 +237,70 @@ func (s *Strategy) convert(ctx context.Context) error { return nil } -func (s *Strategy) collectPendingQuantity() { +func (s *Strategy) addPendingQuantity(asset string, q fixedpoint.Value) { + if q2, ok := s.pendingQuantity[asset]; ok { + s.pendingQuantity[asset] = q2.Add(q) + } else { + s.pendingQuantity[asset] = q + } +} + +func (s *Strategy) collectPendingQuantity(ctx context.Context) { log.Infof("collecting pending quantity...") s.pendingQuantityLock.Lock() defer s.pendingQuantityLock.Unlock() activeOrders := s.orderExecutor.ActiveMakerOrders().Orders() + log.Infof("found %d active orders", len(activeOrders)) + + if err := s.orderExecutor.GracefulCancel(ctx); err != nil { + log.WithError(err).Warn("unable to cancel orders") + } + for _, o := range activeOrders { + log.Infof("checking order: %+v", o) + + if service, ok := s.session.Exchange.(types.ExchangeOrderQueryService); ok { + trades, err := service.QueryOrderTrades(ctx, types.OrderQuery{ + Symbol: o.Symbol, + OrderID: strconv.FormatUint(o.OrderID, 10), + }) + + if err != nil { + return + } + + o.ExecutedQuantity = tradingutil.AggregateTradesQuantity(trades) + + log.Infof("updated executed quantity to %s", o.ExecutedQuantity) + } + if m, ok := s.markets[o.Symbol]; ok { switch o.Side { case types.SideTypeBuy: + if !o.ExecutedQuantity.IsZero() { + s.addPendingQuantity(m.BaseCurrency, o.ExecutedQuantity) + } + if m.QuoteCurrency == s.From { continue } qq := o.Quantity.Sub(o.ExecutedQuantity).Mul(o.Price) - if q2, ok := s.pendingQuantity[m.QuoteCurrency]; ok { - s.pendingQuantity[m.QuoteCurrency] = q2.Add(qq) - } else { - s.pendingQuantity[m.QuoteCurrency] = qq + s.addPendingQuantity(m.QuoteCurrency, qq) + case types.SideTypeSell: + + if !o.ExecutedQuantity.IsZero() { + s.addPendingQuantity(m.QuoteCurrency, o.ExecutedQuantity.Mul(o.Price)) } - case types.SideTypeSell: if m.BaseCurrency == s.From { continue } q := o.Quantity.Sub(o.ExecutedQuantity) - if q2, ok := s.pendingQuantity[m.BaseCurrency]; ok { - s.pendingQuantity[m.BaseCurrency] = q2.Add(q) - } else { - s.pendingQuantity[m.BaseCurrency] = q - } + s.addPendingQuantity(m.BaseCurrency, q) } } } @@ -286,7 +308,11 @@ func (s *Strategy) collectPendingQuantity() { log.Infof("collected pending quantity: %+v", s.pendingQuantity) } -func (s *Strategy) convertBalance(ctx context.Context, fromAsset string, available fixedpoint.Value, market types.Market, ticker *types.Ticker) error { +func (s *Strategy) convertBalance(ctx context.Context, fromAsset string, available fixedpoint.Value, market types.Market) error { + ticker, err2 := s.session.Exchange.QueryTicker(ctx, market.Symbol) + if err2 != nil { + return err2 + } s.pendingQuantityLock.Lock() if pendingQ, ok := s.pendingQuantity[fromAsset]; ok { diff --git a/pkg/util/tradingutil/trades.go b/pkg/util/tradingutil/trades.go index dc8001652..1bedf7f2c 100644 --- a/pkg/util/tradingutil/trades.go +++ b/pkg/util/tradingutil/trades.go @@ -18,3 +18,11 @@ func CollectTradeFee(trades []types.Trade) map[string]fixedpoint.Value { return fees } + +func AggregateTradesQuantity(trades []types.Trade) fixedpoint.Value { + tq := fixedpoint.Zero + for _, t := range trades { + tq = tq.Add(t.Quantity) + } + return tq +}