all: pull out the graceful cancel process to the local active book

This commit is contained in:
c9s 2022-01-07 00:10:40 +08:00
parent 47e23fda90
commit 259771b0b0
7 changed files with 77 additions and 55 deletions

View File

@ -1,7 +1,9 @@
package bbgo
import (
"context"
"encoding/json"
"time"
log "github.com/sirupsen/logrus"
@ -11,15 +13,16 @@ import (
// LocalActiveOrderBook manages the local active order books.
//go:generate callbackgen -type LocalActiveOrderBook
type LocalActiveOrderBook struct {
Asks, Bids *types.SyncOrderMap
Symbol string
Asks, Bids *types.SyncOrderMap
filledCallbacks []func(o types.Order)
}
func NewLocalActiveOrderBook() *LocalActiveOrderBook {
func NewLocalActiveOrderBook(symbol string) *LocalActiveOrderBook {
return &LocalActiveOrderBook{
Bids: types.NewSyncOrderMap(),
Asks: types.NewSyncOrderMap(),
Symbol: symbol,
Bids: types.NewSyncOrderMap(),
Asks: types.NewSyncOrderMap(),
}
}
@ -36,6 +39,59 @@ func (b *LocalActiveOrderBook) BindStream(stream types.Stream) {
stream.OnOrderUpdate(b.orderUpdateHandler)
}
// GracefulCancel cancels the active orders gracefully
func (b *LocalActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange) error {
if err := ex.CancelOrders(ctx, b.Orders()...); err != nil {
log.WithError(err).Error("order cancel error")
}
// ensure every order is cancelled
for b.NumOfOrders() > 0 {
orders := b.Orders()
log.Warnf("%d orders are not cancelled yet:", len(orders))
b.Print()
if err := ex.CancelOrders(ctx, b.Orders()...); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", b.Symbol)
continue
}
log.Infof("waiting for orders to be cancelled...")
// wait for 3 seconds to get the order updates
select {
case <-time.After(3 * time.Second):
case <-ctx.Done():
break
}
// verify the current open orders via the RESTful API
if b.NumOfOrders() > 0 {
log.Warnf("there are orders not cancelled, using REStful API to verify...")
openOrders, err := ex.QueryOpenOrders(ctx, b.Symbol)
if err != nil {
log.WithError(err).Errorf("can not query %s open orders", b.Symbol)
continue
}
openOrderStore := NewOrderStore(b.Symbol)
openOrderStore.Add(openOrders...)
for _, o := range b.Orders() {
// if it does not exist, we should remove it
if !openOrderStore.Exists(o.OrderID) {
b.Remove(o)
}
}
}
}
log.Info("all orders are cancelled successfully")
return nil
}
func (b *LocalActiveOrderBook) orderUpdateHandler(order types.Order) {
log.Debugf("[LocalActiveOrderBook] received order update: %+v", order)

View File

@ -6,11 +6,12 @@ import (
"sync"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type TwapExecution struct {
@ -452,7 +453,7 @@ func (e *TwapExecution) Run(parentCtx context.Context) error {
e.orderStore = NewOrderStore(e.Symbol)
e.orderStore.BindStream(e.userDataStream)
e.activeMakerOrders = NewLocalActiveOrderBook()
e.activeMakerOrders = NewLocalActiveOrderBook(e.Symbol)
e.activeMakerOrders.OnFilled(e.handleFilledOrder)
e.activeMakerOrders.BindStream(e.userDataStream)

View File

@ -338,13 +338,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orders.BindStream(session.UserDataStream)
// we don't persist orders so that we can not clear the previous orders for now. just need time to support this.
s.activeOrders = bbgo.NewLocalActiveOrderBook()
s.activeOrders = bbgo.NewLocalActiveOrderBook(s.Symbol)
s.activeOrders.OnFilled(func(o types.Order) {
s.submitReverseOrder(o, session)
})
s.activeOrders.BindStream(session.UserDataStream)
s.profitOrders = bbgo.NewLocalActiveOrderBook()
s.profitOrders = bbgo.NewLocalActiveOrderBook(s.Symbol)
s.profitOrders.OnFilled(func(o types.Order) {
// we made profit here!
})

View File

@ -3,10 +3,11 @@ package bollpp
import (
"context"
"fmt"
"github.com/c9s/bbgo/pkg/indicator"
"sync"
"time"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -268,7 +269,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.stopC = make(chan struct{})
s.activeMakerOrders = bbgo.NewLocalActiveOrderBook()
s.activeMakerOrders = bbgo.NewLocalActiveOrderBook(s.Symbol)
s.activeMakerOrders.BindStream(session.UserDataStream)
s.orderStore = bbgo.NewOrderStore(s.Symbol)

View File

@ -110,7 +110,7 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
// we don't persist orders so that we can not clear the previous orders for now. just need time to support this.
s.activeOrders = bbgo.NewLocalActiveOrderBook()
s.activeOrders = bbgo.NewLocalActiveOrderBook(s.Symbol)
s.activeOrders.BindStream(session.UserDataStream)
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {

View File

@ -579,7 +579,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderStore.BindStream(session.UserDataStream)
// we don't persist orders so that we can not clear the previous orders for now. just need time to support this.
s.activeOrders = bbgo.NewLocalActiveOrderBook()
s.activeOrders = bbgo.NewLocalActiveOrderBook(s.Symbol)
s.activeOrders.OnFilled(s.handleFilledOrder)
s.activeOrders.BindStream(session.UserDataStream)

View File

@ -733,7 +733,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.book = types.NewStreamBook(s.Symbol)
s.book.BindStream(s.sourceSession.MarketDataStream)
s.activeMakerOrders = bbgo.NewLocalActiveOrderBook()
s.activeMakerOrders = bbgo.NewLocalActiveOrderBook(s.Symbol)
s.activeMakerOrders.BindStream(s.makerSession.UserDataStream)
s.orderStore = bbgo.NewOrderStore(s.Symbol)
@ -857,48 +857,12 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
// wait for the quoter to stop
time.Sleep(s.UpdateInterval.Duration())
// ensure every order is cancelled
for s.activeMakerOrders.NumOfOrders() > 0 {
orders := s.activeMakerOrders.Orders()
log.Warnf("%d orders are not cancelled yet:", len(orders))
s.activeMakerOrders.Print()
shutdownCtx, cancelShutdown := context.WithTimeout(context.TODO(), time.Minute)
defer cancelShutdown()
if err := s.makerSession.Exchange.CancelOrders(ctx, s.activeMakerOrders.Orders()...); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
continue
}
log.Infof("waiting for orders to be cancelled...")
select {
case <-time.After(3 * time.Second):
case <-ctx.Done():
break
}
// verify the current open orders via the RESTful API
if s.activeMakerOrders.NumOfOrders() > 0 {
log.Warnf("there are orders not cancelled, using REStful API to verify...")
openOrders, err := s.makerSession.Exchange.QueryOpenOrders(ctx, s.Symbol)
if err != nil {
log.WithError(err).Errorf("can not query %s open orders", s.Symbol)
continue
}
openOrderStore := bbgo.NewOrderStore(s.Symbol)
openOrderStore.Add(openOrders...)
for _, o := range s.activeMakerOrders.Orders() {
// if it does not exist, we should remove it
if !openOrderStore.Exists(o.OrderID) {
s.activeMakerOrders.Remove(o)
}
}
}
if err := s.activeMakerOrders.GracefulCancel(shutdownCtx, s.makerSession.Exchange); err != nil {
log.WithError(err).Errorf("graceful cancel error")
}
log.Info("all orders are cancelled successfully")
if err := s.SaveState(); err != nil {
log.WithError(err).Errorf("can not save state: %+v", s.state)