bbgo: add mutex lock to ActiveOrderBook

This commit is contained in:
c9s 2023-08-17 17:16:27 +08:00
parent 2669c3a5db
commit 1cadaf9265
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 26 additions and 12 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"sort" "sort"
"sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -32,6 +33,8 @@ type ActiveOrderBook struct {
// sig is the order update signal // sig is the order update signal
// this signal will be emitted when a new order is added or removed. // this signal will be emitted when a new order is added or removed.
C sigchan.Chan C sigchan.Chan
mu sync.Mutex
} }
func NewActiveOrderBook(symbol string) *ActiveOrderBook { func NewActiveOrderBook(symbol string) *ActiveOrderBook {
@ -57,7 +60,7 @@ func (b *ActiveOrderBook) BindStream(stream types.Stream) {
} }
func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, waitTime, timeout time.Duration) (bool, error) { func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, waitTime, timeout time.Duration) (bool, error) {
if !b.Exists(order) { if !b.orders.Exists(order.OrderID) {
return true, nil return true, nil
} }
@ -68,7 +71,7 @@ func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, wait
case <-b.C: case <-b.C:
} }
clear := !b.Exists(order) clear := !b.orders.Exists(order.OrderID)
select { select {
case <-timeoutC: case <-timeoutC:
@ -146,7 +149,7 @@ func (b *ActiveOrderBook) FastCancel(ctx context.Context, ex types.Exchange, ord
} }
for _, o := range orders { for _, o := range orders {
b.Remove(o) b.orders.Remove(o.OrderID)
} }
return nil return nil
} }
@ -269,45 +272,53 @@ func (b *ActiveOrderBook) update(orders ...types.Order) {
// When the order is cancelled, it will be removed from the internal order storage. // When the order is cancelled, it will be removed from the internal order storage.
func (b *ActiveOrderBook) Update(order types.Order) { func (b *ActiveOrderBook) Update(order types.Order) {
hasSymbol := len(b.Symbol) > 0 hasSymbol := len(b.Symbol) > 0
if hasSymbol && order.Symbol != b.Symbol { if hasSymbol && order.Symbol != b.Symbol {
return return
} }
b.mu.Lock()
if !b.orders.Exists(order.OrderID) { if !b.orders.Exists(order.OrderID) {
b.pendingOrderUpdates.Add(order) b.pendingOrderUpdates.Add(order)
b.mu.Unlock()
return return
} }
switch order.Status { switch order.Status {
case types.OrderStatusFilled: case types.OrderStatusFilled:
// make sure we have the order and we remove it // make sure we have the order and we remove it
if b.Remove(order) { removed := b.orders.Remove(order.OrderID)
b.mu.Unlock()
if removed {
b.EmitFilled(order) b.EmitFilled(order)
} }
b.C.Emit() b.C.Emit()
case types.OrderStatusPartiallyFilled: case types.OrderStatusPartiallyFilled:
b.update(order) b.update(order)
b.mu.Unlock()
case types.OrderStatusNew: case types.OrderStatusNew:
b.update(order) b.update(order)
b.mu.Unlock()
b.C.Emit() b.C.Emit()
case types.OrderStatusCanceled, types.OrderStatusRejected: case types.OrderStatusCanceled, types.OrderStatusRejected:
// TODO: note that orders transit to "canceled" may have partially filled // TODO: note that orders transit to "canceled" may have partially filled
log.Debugf("[ActiveOrderBook] order is %s, removing order %s", order.Status, order)
b.orders.Remove(order.OrderID)
b.mu.Unlock()
if order.Status == types.OrderStatusCanceled { if order.Status == types.OrderStatusCanceled {
b.EmitCanceled(order) b.EmitCanceled(order)
} }
log.Debugf("[ActiveOrderBook] order is %s, removing order %s", order.Status, order)
b.Remove(order)
b.C.Emit() b.C.Emit()
default: default:
b.mu.Unlock()
log.Warnf("[ActiveOrderBook] unhandled order status: %s", order.Status) log.Warnf("[ActiveOrderBook] unhandled order status: %s", order.Status)
} }
} }
func (b *ActiveOrderBook) Add(orders ...types.Order) { func (b *ActiveOrderBook) Add(orders ...types.Order) {
@ -339,7 +350,7 @@ func (b *ActiveOrderBook) add(order types.Order) {
// so, when it's not status=new, we should trigger order update handler // so, when it's not status=new, we should trigger order update handler
if order.Status != types.OrderStatusNew { if order.Status != types.OrderStatusNew {
// emit the order update handle function to trigger callback // emit the order update handle function to trigger callback
b.orderUpdateHandler(order) b.Update(order)
} }
} else { } else {
@ -348,6 +359,8 @@ func (b *ActiveOrderBook) add(order types.Order) {
} }
func (b *ActiveOrderBook) Exists(order types.Order) bool { func (b *ActiveOrderBook) Exists(order types.Order) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.orders.Exists(order.OrderID) return b.orders.Exists(order.OrderID)
} }
@ -356,6 +369,8 @@ func (b *ActiveOrderBook) Get(orderID uint64) (types.Order, bool) {
} }
func (b *ActiveOrderBook) Remove(order types.Order) bool { func (b *ActiveOrderBook) Remove(order types.Order) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.orders.Remove(order.OrderID) return b.orders.Remove(order.OrderID)
} }

View File

@ -22,7 +22,7 @@ func TestActiveOrderBook_pendingOrders(t *testing.T) {
// if we received filled order first // if we received filled order first
// should be added to pending orders // should be added to pending orders
ob.orderUpdateHandler(types.Order{ ob.Update(types.Order{
OrderID: 99, OrderID: 99,
SubmitOrder: types.SubmitOrder{ SubmitOrder: types.SubmitOrder{
Symbol: "BTCUSDT", Symbol: "BTCUSDT",
@ -63,5 +63,4 @@ func TestActiveOrderBook_pendingOrders(t *testing.T) {
}) })
assert.True(t, filled, "filled event should be fired") assert.True(t, filled, "filled event should be fired")
} }