core: fix trade collector dead lock

This commit is contained in:
c9s 2023-08-01 20:11:33 +08:00
parent 74aae687be
commit c0e315fafe
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 8 additions and 6 deletions

View File

@ -127,7 +127,6 @@ func (c *TradeCollector) RecoverTrade(td types.Trade) bool {
// if we have the order in the order store, then the trade will be considered for the position. // if we have the order in the order store, then the trade will be considered for the position.
// profit will also be calculated. // profit will also be calculated.
func (c *TradeCollector) Process() bool { func (c *TradeCollector) Process() bool {
logrus.Debugf("TradeCollector.Process()")
positionChanged := false positionChanged := false
var trades []types.Trade var trades []types.Trade
@ -184,20 +183,24 @@ func (c *TradeCollector) Process() bool {
// return true when the given trade is added // return true when the given trade is added
// return false when the given trade is not added // return false when the given trade is not added
func (c *TradeCollector) processTrade(trade types.Trade) bool { func (c *TradeCollector) processTrade(trade types.Trade) bool {
c.mu.Lock()
defer c.mu.Unlock()
key := trade.Key() key := trade.Key()
c.mu.Lock()
// if it's already done, remove the trade from the trade store // if it's already done, remove the trade from the trade store
if _, done := c.doneTrades[key]; done { if _, done := c.doneTrades[key]; done {
c.mu.Unlock()
return false return false
} }
if !c.orderStore.Exists(trade.OrderID) { if !c.orderStore.Exists(trade.OrderID) {
c.mu.Unlock()
return false return false
} }
c.doneTrades[key] = struct{}{}
c.mu.Unlock()
if c.position != nil { if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade) profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit { if madeProfit {
@ -213,7 +216,6 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
} }
c.doneTrades[key] = struct{}{}
return true return true
} }

View File

@ -44,7 +44,7 @@ func NewPositionRiskControl(orderExecutor bbgo.OrderExecutorExtended, hardLimit,
Quantity: quantity, Quantity: quantity,
} }
log.Infof("submitting order: %+v", submitOrder) log.Infof("RiskControl: position limit exceeded, submitting order to reduce position: %+v", submitOrder)
createdOrders, err := orderExecutor.SubmitOrders(context.Background(), submitOrder) createdOrders, err := orderExecutor.SubmitOrders(context.Background(), submitOrder)
if err != nil { if err != nil {
log.WithError(err).Errorf("failed to submit orders") log.WithError(err).Errorf("failed to submit orders")