mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-25 16:25:16 +00:00
Merge pull request #1290 from c9s/c9s/grid-disconnect-recover
FEATURE: [grid2] update local active orders after re-connected
This commit is contained in:
commit
20bdf191c3
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -32,6 +33,8 @@ type ActiveOrderBook struct {
|
|||
// sig is the order update signal
|
||||
// this signal will be emitted when a new order is added or removed.
|
||||
C sigchan.Chan
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewActiveOrderBook(symbol string) *ActiveOrderBook {
|
||||
|
@ -57,7 +60,7 @@ func (b *ActiveOrderBook) BindStream(stream types.Stream) {
|
|||
}
|
||||
|
||||
func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, waitTime, timeout time.Duration) (bool, error) {
|
||||
if !b.Exists(order) {
|
||||
if !b.orders.Exists(order.OrderID) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
@ -68,7 +71,7 @@ func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, wait
|
|||
case <-b.C:
|
||||
}
|
||||
|
||||
clear := !b.Exists(order)
|
||||
clear := !b.orders.Exists(order.OrderID)
|
||||
|
||||
select {
|
||||
case <-timeoutC:
|
||||
|
@ -146,7 +149,7 @@ func (b *ActiveOrderBook) FastCancel(ctx context.Context, ex types.Exchange, ord
|
|||
}
|
||||
|
||||
for _, o := range orders {
|
||||
b.Remove(o)
|
||||
b.orders.Remove(o.OrderID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -233,44 +236,7 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
|
|||
}
|
||||
|
||||
func (b *ActiveOrderBook) orderUpdateHandler(order types.Order) {
|
||||
hasSymbol := len(b.Symbol) > 0
|
||||
|
||||
if hasSymbol && order.Symbol != b.Symbol {
|
||||
return
|
||||
}
|
||||
|
||||
if !b.orders.Exists(order.OrderID) {
|
||||
b.pendingOrderUpdates.Add(order)
|
||||
return
|
||||
}
|
||||
|
||||
switch order.Status {
|
||||
case types.OrderStatusFilled:
|
||||
// make sure we have the order and we remove it
|
||||
if b.Remove(order) {
|
||||
b.EmitFilled(order)
|
||||
}
|
||||
b.C.Emit()
|
||||
|
||||
case types.OrderStatusPartiallyFilled:
|
||||
b.Update(order)
|
||||
|
||||
case types.OrderStatusNew:
|
||||
b.Update(order)
|
||||
b.C.Emit()
|
||||
|
||||
case types.OrderStatusCanceled, types.OrderStatusRejected:
|
||||
if order.Status == types.OrderStatusCanceled {
|
||||
b.EmitCanceled(order)
|
||||
}
|
||||
|
||||
log.Debugf("[ActiveOrderBook] order is %s, removing order %s", order.Status, order)
|
||||
b.Remove(order)
|
||||
b.C.Emit()
|
||||
|
||||
default:
|
||||
log.Warnf("unhandled order status: %s", order.Status)
|
||||
}
|
||||
b.Update(order)
|
||||
}
|
||||
|
||||
func (b *ActiveOrderBook) Print() {
|
||||
|
@ -288,14 +254,67 @@ func (b *ActiveOrderBook) Print() {
|
|||
}
|
||||
}
|
||||
|
||||
func (b *ActiveOrderBook) Update(orders ...types.Order) {
|
||||
// Update updates the order by the order status and emit the related events.
|
||||
// When order is filled, the order will be removed from the internal order storage.
|
||||
// When order is New or PartiallyFilled, the internal order will be updated according to the latest order update.
|
||||
// When the order is cancelled, it will be removed from the internal order storage.
|
||||
func (b *ActiveOrderBook) Update(order types.Order) {
|
||||
hasSymbol := len(b.Symbol) > 0
|
||||
for _, order := range orders {
|
||||
if hasSymbol && b.Symbol != order.Symbol {
|
||||
continue
|
||||
}
|
||||
if hasSymbol && order.Symbol != b.Symbol {
|
||||
return
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
if !b.orders.Exists(order.OrderID) {
|
||||
b.pendingOrderUpdates.Add(order)
|
||||
b.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// if order update time is too old, skip it
|
||||
if previousOrder, ok := b.orders.Get(order.OrderID); ok {
|
||||
previousUpdateTime := previousOrder.UpdateTime.Time()
|
||||
if !previousUpdateTime.IsZero() && order.UpdateTime.Before(previousUpdateTime) {
|
||||
b.mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
switch order.Status {
|
||||
case types.OrderStatusFilled:
|
||||
// make sure we have the order and we remove it
|
||||
removed := b.orders.Remove(order.OrderID)
|
||||
b.mu.Unlock()
|
||||
|
||||
if removed {
|
||||
b.EmitFilled(order)
|
||||
}
|
||||
b.C.Emit()
|
||||
|
||||
case types.OrderStatusPartiallyFilled:
|
||||
b.orders.Update(order)
|
||||
b.mu.Unlock()
|
||||
|
||||
case types.OrderStatusNew:
|
||||
b.orders.Update(order)
|
||||
b.mu.Unlock()
|
||||
|
||||
b.C.Emit()
|
||||
|
||||
case types.OrderStatusCanceled, types.OrderStatusRejected:
|
||||
// TODO: note that orders transit to "canceled" may have partially filled
|
||||
log.Debugf("[ActiveOrderBook] order is %s, removing order %s", order.Status, order)
|
||||
b.orders.Remove(order.OrderID)
|
||||
b.mu.Unlock()
|
||||
|
||||
if order.Status == types.OrderStatusCanceled {
|
||||
b.EmitCanceled(order)
|
||||
}
|
||||
b.C.Emit()
|
||||
|
||||
default:
|
||||
b.mu.Unlock()
|
||||
log.Warnf("[ActiveOrderBook] unhandled order status: %s", order.Status)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,7 +347,7 @@ func (b *ActiveOrderBook) add(order types.Order) {
|
|||
// so, when it's not status=new, we should trigger order update handler
|
||||
if order.Status != types.OrderStatusNew {
|
||||
// emit the order update handle function to trigger callback
|
||||
b.orderUpdateHandler(order)
|
||||
b.Update(order)
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -337,6 +356,8 @@ func (b *ActiveOrderBook) add(order types.Order) {
|
|||
}
|
||||
|
||||
func (b *ActiveOrderBook) Exists(order types.Order) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return b.orders.Exists(order.OrderID)
|
||||
}
|
||||
|
||||
|
@ -345,6 +366,8 @@ func (b *ActiveOrderBook) Get(orderID uint64) (types.Order, bool) {
|
|||
}
|
||||
|
||||
func (b *ActiveOrderBook) Remove(order types.Order) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return b.orders.Remove(order.OrderID)
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ func TestActiveOrderBook_pendingOrders(t *testing.T) {
|
|||
|
||||
// if we received filled order first
|
||||
// should be added to pending orders
|
||||
ob.orderUpdateHandler(types.Order{
|
||||
ob.Update(types.Order{
|
||||
OrderID: 99,
|
||||
SubmitOrder: types.SubmitOrder{
|
||||
Symbol: "BTCUSDT",
|
||||
|
@ -63,5 +63,4 @@ func TestActiveOrderBook_pendingOrders(t *testing.T) {
|
|||
})
|
||||
|
||||
assert.True(t, filled, "filled event should be fired")
|
||||
|
||||
}
|
||||
|
|
|
@ -1951,6 +1951,10 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
|
|||
}
|
||||
})
|
||||
|
||||
session.UserDataStream.OnConnect(func() {
|
||||
s.handleConnect(ctx, session)
|
||||
})
|
||||
|
||||
// if TriggerPrice is zero, that means we need to open the grid when start up
|
||||
if s.TriggerPrice.IsZero() {
|
||||
// must call the openGrid method inside the OnStart callback because
|
||||
|
@ -2112,3 +2116,32 @@ func (s *Strategy) newClientOrderID() string {
|
|||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (s *Strategy) handleConnect(ctx context.Context, session *bbgo.ExchangeSession) {
|
||||
grid := s.getGrid()
|
||||
if grid == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: move this logics into the active maker orders component, like activeOrders.Sync(ctx)
|
||||
activeOrderBook := s.orderExecutor.ActiveMakerOrders()
|
||||
activeOrders := activeOrderBook.Orders()
|
||||
for _, o := range activeOrders {
|
||||
var updatedOrder *types.Order
|
||||
err := retry.GeneralBackoff(ctx, func() error {
|
||||
var err error
|
||||
updatedOrder, err = s.orderQueryService.QueryOrder(ctx, types.OrderQuery{
|
||||
Symbol: o.Symbol,
|
||||
OrderID: strconv.FormatUint(o.OrderID, 10),
|
||||
})
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Errorf("unable to query order")
|
||||
return
|
||||
}
|
||||
|
||||
activeOrderBook.Update(*updatedOrder)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user