diff --git a/pkg/bbgo/activeorderbook.go b/pkg/bbgo/activeorderbook.go index 104c9f349..567014665 100644 --- a/pkg/bbgo/activeorderbook.go +++ b/pkg/bbgo/activeorderbook.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "sort" + "sync" "time" "github.com/pkg/errors" @@ -32,6 +33,8 @@ type ActiveOrderBook struct { // sig is the order update signal // this signal will be emitted when a new order is added or removed. C sigchan.Chan + + mu sync.Mutex } func NewActiveOrderBook(symbol string) *ActiveOrderBook { @@ -57,7 +60,7 @@ func (b *ActiveOrderBook) BindStream(stream types.Stream) { } func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, waitTime, timeout time.Duration) (bool, error) { - if !b.Exists(order) { + if !b.orders.Exists(order.OrderID) { return true, nil } @@ -68,7 +71,7 @@ func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, wait case <-b.C: } - clear := !b.Exists(order) + clear := !b.orders.Exists(order.OrderID) select { case <-timeoutC: @@ -146,7 +149,7 @@ func (b *ActiveOrderBook) FastCancel(ctx context.Context, ex types.Exchange, ord } for _, o := range orders { - b.Remove(o) + b.orders.Remove(o.OrderID) } return nil } @@ -269,45 +272,53 @@ func (b *ActiveOrderBook) update(orders ...types.Order) { // When the order is cancelled, it will be removed from the internal order storage. func (b *ActiveOrderBook) Update(order types.Order) { hasSymbol := len(b.Symbol) > 0 - if hasSymbol && order.Symbol != b.Symbol { return } + b.mu.Lock() if !b.orders.Exists(order.OrderID) { b.pendingOrderUpdates.Add(order) + b.mu.Unlock() return } switch order.Status { case types.OrderStatusFilled: // make sure we have the order and we remove it - if b.Remove(order) { + removed := b.orders.Remove(order.OrderID) + b.mu.Unlock() + + if removed { b.EmitFilled(order) } b.C.Emit() case types.OrderStatusPartiallyFilled: b.update(order) + b.mu.Unlock() case types.OrderStatusNew: b.update(order) + b.mu.Unlock() + b.C.Emit() case types.OrderStatusCanceled, types.OrderStatusRejected: // TODO: note that orders transit to "canceled" may have partially filled + log.Debugf("[ActiveOrderBook] order is %s, removing order %s", order.Status, order) + b.orders.Remove(order.OrderID) + b.mu.Unlock() + if order.Status == types.OrderStatusCanceled { b.EmitCanceled(order) } - - log.Debugf("[ActiveOrderBook] order is %s, removing order %s", order.Status, order) - b.Remove(order) b.C.Emit() default: + b.mu.Unlock() log.Warnf("[ActiveOrderBook] unhandled order status: %s", order.Status) } - } func (b *ActiveOrderBook) Add(orders ...types.Order) { @@ -339,7 +350,7 @@ func (b *ActiveOrderBook) add(order types.Order) { // so, when it's not status=new, we should trigger order update handler if order.Status != types.OrderStatusNew { // emit the order update handle function to trigger callback - b.orderUpdateHandler(order) + b.Update(order) } } else { @@ -348,6 +359,8 @@ func (b *ActiveOrderBook) add(order types.Order) { } func (b *ActiveOrderBook) Exists(order types.Order) bool { + b.mu.Lock() + defer b.mu.Unlock() return b.orders.Exists(order.OrderID) } @@ -356,6 +369,8 @@ func (b *ActiveOrderBook) Get(orderID uint64) (types.Order, bool) { } func (b *ActiveOrderBook) Remove(order types.Order) bool { + b.mu.Lock() + defer b.mu.Unlock() return b.orders.Remove(order.OrderID) } diff --git a/pkg/bbgo/activeorderbook_test.go b/pkg/bbgo/activeorderbook_test.go index fdd4ea9e9..848e07bc9 100644 --- a/pkg/bbgo/activeorderbook_test.go +++ b/pkg/bbgo/activeorderbook_test.go @@ -22,7 +22,7 @@ func TestActiveOrderBook_pendingOrders(t *testing.T) { // if we received filled order first // should be added to pending orders - ob.orderUpdateHandler(types.Order{ + ob.Update(types.Order{ OrderID: 99, SubmitOrder: types.SubmitOrder{ Symbol: "BTCUSDT", @@ -63,5 +63,4 @@ func TestActiveOrderBook_pendingOrders(t *testing.T) { }) assert.True(t, filled, "filled event should be fired") - }