bbgo: process pending order update for active order book

This commit is contained in:
c9s 2023-02-17 19:15:00 +08:00
parent 21cdb7afe8
commit cf1be9fc6f
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 73 additions and 5 deletions

View File

@ -25,6 +25,8 @@ type ActiveOrderBook struct {
filledCallbacks []func(o types.Order)
canceledCallbacks []func(o types.Order)
pendingOrderUpdates *types.SyncOrderMap
// sig is the order update signal
// this signal will be emitted when a new order is added or removed.
C sigchan.Chan
@ -32,9 +34,10 @@ type ActiveOrderBook struct {
func NewActiveOrderBook(symbol string) *ActiveOrderBook {
return &ActiveOrderBook{
Symbol: symbol,
orders: types.NewSyncOrderMap(),
C: sigchan.New(1),
Symbol: symbol,
orders: types.NewSyncOrderMap(),
pendingOrderUpdates: types.NewSyncOrderMap(),
C: sigchan.New(1),
}
}
@ -230,6 +233,11 @@ func (b *ActiveOrderBook) orderUpdateHandler(order types.Order) {
return
}
if !b.orders.Exists(order.OrderID) {
b.pendingOrderUpdates.Add(order)
return
}
switch order.Status {
case types.OrderStatusFilled:
// make sure we have the order and we remove it
@ -277,7 +285,11 @@ func (b *ActiveOrderBook) Print() {
func (b *ActiveOrderBook) Update(orders ...types.Order) {
hasSymbol := len(b.Symbol) > 0
for _, order := range orders {
if hasSymbol && b.Symbol == order.Symbol {
if hasSymbol {
if b.Symbol == order.Symbol {
b.orders.Update(order)
}
} else {
b.orders.Update(order)
}
}
@ -286,9 +298,26 @@ func (b *ActiveOrderBook) Update(orders ...types.Order) {
func (b *ActiveOrderBook) Add(orders ...types.Order) {
hasSymbol := len(b.Symbol) > 0
for _, order := range orders {
if hasSymbol && b.Symbol == order.Symbol {
if hasSymbol {
if b.Symbol == order.Symbol {
b.add(order)
}
} else {
b.add(order)
}
}
}
// add the order to the active order book and check the pending order
func (b *ActiveOrderBook) add(order types.Order) {
if pendingOrder, ok := b.pendingOrderUpdates.Get(order.OrderID); ok {
if pendingOrder.UpdateTime.Time().After(order.UpdateTime.Time()) {
b.orders.Add(pendingOrder)
} else {
b.orders.Add(order)
}
} else {
b.orders.Add(order)
}
}

View File

@ -62,6 +62,33 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
return createdOrders, err
}
// BatchPlaceOrderChan post orders with a channel, the created order will be sent to this channel immediately, so that
// the caller can add the created order to the active order book or the order store to collect trades.
// this method is used when you have large amount of orders to be sent and most of the orders might be filled as taker order.
// channel orderC will be closed when all the submit orders are submitted.
func BatchPlaceOrderChan(ctx context.Context, exchange types.Exchange, orderC chan types.Order, submitOrders ...types.SubmitOrder) (types.OrderSlice, []int, error) {
defer close(orderC)
var createdOrders types.OrderSlice
var err error
var errIndexes []int
for i, submitOrder := range submitOrders {
createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrder)
if err2 != nil {
err = multierr.Append(err, err2)
errIndexes = append(errIndexes, i)
} else if createdOrder != nil {
createdOrder.Tag = submitOrder.Tag
orderC <- *createdOrder
createdOrders = append(createdOrders, *createdOrder)
}
}
return createdOrders, errIndexes, err
}
// BatchPlaceOrder
func BatchPlaceOrder(ctx context.Context, exchange types.Exchange, submitOrders ...types.SubmitOrder) (types.OrderSlice, []int, error) {
var createdOrders types.OrderSlice

View File

@ -56,6 +56,11 @@ func (m OrderMap) Exists(orderID uint64) bool {
return ok
}
func (m OrderMap) Get(orderID uint64) (Order, bool) {
order, ok := m[orderID]
return order, ok
}
func (m OrderMap) FindByStatus(status OrderStatus) (orders OrderSlice) {
for _, o := range m {
if o.Status == status {
@ -165,6 +170,13 @@ func (m *SyncOrderMap) Exists(orderID uint64) (exists bool) {
return exists
}
func (m *SyncOrderMap) Get(orderID uint64) (Order, bool) {
m.Lock()
order, ok := m.orders.Get(orderID)
m.Unlock()
return order, ok
}
func (m *SyncOrderMap) Lookup(f func(o Order) bool) *Order {
m.Lock()
defer m.Unlock()