convert: fix pending quantity collector with trade query

This commit is contained in:
c9s 2023-08-05 02:37:53 +08:00
parent bc8fe22e70
commit 4d293121d7
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 66 additions and 32 deletions

View File

@ -51,7 +51,7 @@ type Strategy struct {
session *bbgo.ExchangeSession
orderExecutor *bbgo.SimpleOrderExecutor
pendingQuantity map[string]fixedpoint.Value
pendingQuantity map[string]fixedpoint.Value `persistence:"pendingQuantities"`
pendingQuantityLock sync.Mutex
}
@ -109,12 +109,6 @@ func (s *Strategy) handleOrderFilled(ctx context.Context, order types.Order) {
nextMarket := s.indirectMarkets[i+1]
ticker, err := s.session.Exchange.QueryTicker(ctx, nextMarket.Symbol)
if err != nil {
log.WithError(err).Errorf("unable to query ticker")
return
}
quantity := order.Quantity
quoteQuantity := quantity.Mul(order.Price)
@ -125,7 +119,7 @@ func (s *Strategy) handleOrderFilled(ctx context.Context, order types.Order) {
quoteQuantity = quoteQuantity.Sub(quoteFee)
}
if err := s.convertBalance(ctx, market.QuoteCurrency, quoteQuantity, nextMarket, ticker); err != nil {
if err := s.convertBalance(ctx, market.QuoteCurrency, quoteQuantity, nextMarket); err != nil {
log.WithError(err).Errorf("unable to convert balance")
}
@ -134,7 +128,7 @@ func (s *Strategy) handleOrderFilled(ctx context.Context, order types.Order) {
quantity = quantity.Sub(baseFee)
}
if err := s.convertBalance(ctx, market.BaseCurrency, quantity, nextMarket, ticker); err != nil {
if err := s.convertBalance(ctx, market.BaseCurrency, quantity, nextMarket); err != nil {
log.WithError(err).Errorf("unable to convert balance")
}
}
@ -166,6 +160,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
}
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
s.collectPendingQuantity(ctx)
_ = s.orderExecutor.GracefulCancel(ctx)
})
return nil
}
@ -202,11 +202,7 @@ func (s *Strategy) getSourceMarket() (types.Market, bool) {
// convert triggers a convert order
func (s *Strategy) convert(ctx context.Context) error {
s.collectPendingQuantity()
if err := s.orderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Warn("unable to cancel orders")
}
s.collectPendingQuantity(ctx)
// sleep one second for exchange to unlock the balance
time.Sleep(time.Second)
@ -220,11 +216,6 @@ func (s *Strategy) convert(ctx context.Context) error {
log.Debugf("converting %s to %s, current balance: %+v", s.From, s.To, fromAsset)
if sourceMarket, ok := s.getSourceMarket(); ok {
ticker, err := s.session.Exchange.QueryTicker(ctx, sourceMarket.Symbol)
if err != nil {
return err
}
quantity := fromAsset.Available
if !s.MinBalance.IsZero() {
@ -238,7 +229,7 @@ func (s *Strategy) convert(ctx context.Context) error {
quantity = fixedpoint.Min(s.MaxQuantity, quantity)
}
if err := s.convertBalance(ctx, fromAsset.Currency, quantity, sourceMarket, ticker); err != nil {
if err := s.convertBalance(ctx, fromAsset.Currency, quantity, sourceMarket); err != nil {
return err
}
}
@ -246,39 +237,70 @@ func (s *Strategy) convert(ctx context.Context) error {
return nil
}
func (s *Strategy) collectPendingQuantity() {
func (s *Strategy) addPendingQuantity(asset string, q fixedpoint.Value) {
if q2, ok := s.pendingQuantity[asset]; ok {
s.pendingQuantity[asset] = q2.Add(q)
} else {
s.pendingQuantity[asset] = q
}
}
func (s *Strategy) collectPendingQuantity(ctx context.Context) {
log.Infof("collecting pending quantity...")
s.pendingQuantityLock.Lock()
defer s.pendingQuantityLock.Unlock()
activeOrders := s.orderExecutor.ActiveMakerOrders().Orders()
log.Infof("found %d active orders", len(activeOrders))
if err := s.orderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Warn("unable to cancel orders")
}
for _, o := range activeOrders {
log.Infof("checking order: %+v", o)
if service, ok := s.session.Exchange.(types.ExchangeOrderQueryService); ok {
trades, err := service.QueryOrderTrades(ctx, types.OrderQuery{
Symbol: o.Symbol,
OrderID: strconv.FormatUint(o.OrderID, 10),
})
if err != nil {
return
}
o.ExecutedQuantity = tradingutil.AggregateTradesQuantity(trades)
log.Infof("updated executed quantity to %s", o.ExecutedQuantity)
}
if m, ok := s.markets[o.Symbol]; ok {
switch o.Side {
case types.SideTypeBuy:
if !o.ExecutedQuantity.IsZero() {
s.addPendingQuantity(m.BaseCurrency, o.ExecutedQuantity)
}
if m.QuoteCurrency == s.From {
continue
}
qq := o.Quantity.Sub(o.ExecutedQuantity).Mul(o.Price)
if q2, ok := s.pendingQuantity[m.QuoteCurrency]; ok {
s.pendingQuantity[m.QuoteCurrency] = q2.Add(qq)
} else {
s.pendingQuantity[m.QuoteCurrency] = qq
s.addPendingQuantity(m.QuoteCurrency, qq)
case types.SideTypeSell:
if !o.ExecutedQuantity.IsZero() {
s.addPendingQuantity(m.QuoteCurrency, o.ExecutedQuantity.Mul(o.Price))
}
case types.SideTypeSell:
if m.BaseCurrency == s.From {
continue
}
q := o.Quantity.Sub(o.ExecutedQuantity)
if q2, ok := s.pendingQuantity[m.BaseCurrency]; ok {
s.pendingQuantity[m.BaseCurrency] = q2.Add(q)
} else {
s.pendingQuantity[m.BaseCurrency] = q
}
s.addPendingQuantity(m.BaseCurrency, q)
}
}
}
@ -286,7 +308,11 @@ func (s *Strategy) collectPendingQuantity() {
log.Infof("collected pending quantity: %+v", s.pendingQuantity)
}
func (s *Strategy) convertBalance(ctx context.Context, fromAsset string, available fixedpoint.Value, market types.Market, ticker *types.Ticker) error {
func (s *Strategy) convertBalance(ctx context.Context, fromAsset string, available fixedpoint.Value, market types.Market) error {
ticker, err2 := s.session.Exchange.QueryTicker(ctx, market.Symbol)
if err2 != nil {
return err2
}
s.pendingQuantityLock.Lock()
if pendingQ, ok := s.pendingQuantity[fromAsset]; ok {

View File

@ -18,3 +18,11 @@ func CollectTradeFee(trades []types.Trade) map[string]fixedpoint.Value {
return fees
}
func AggregateTradesQuantity(trades []types.Trade) fixedpoint.Value {
tq := fixedpoint.Zero
for _, t := range trades {
tq = tq.Add(t.Quantity)
}
return tq
}