fix: Position not synchronized in drift. add DisableNotify for GeneralOrderExecutor

This commit is contained in:
zenix 2022-11-16 18:10:58 +09:00
parent 27800e95bd
commit 109f4d0e3e
5 changed files with 167 additions and 85 deletions

View File

@ -38,7 +38,8 @@ type GeneralOrderExecutor struct {
marginBaseMaxBorrowable, marginQuoteMaxBorrowable fixedpoint.Value
closing int64
disableNotify bool
closing int64
}
func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strategyInstanceID string, position *types.Position) *GeneralOrderExecutor {
@ -66,6 +67,10 @@ func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strateg
return executor
}
func (e *GeneralOrderExecutor) DisableNotify() {
e.disableNotify = true
}
func (e *GeneralOrderExecutor) startMarginAssetUpdater(ctx context.Context) {
marginService, ok := e.session.Exchange.(types.MarginBorrowRepayService)
if !ok {
@ -110,6 +115,10 @@ func (e *GeneralOrderExecutor) marginAssetMaxBorrowableUpdater(ctx context.Conte
}
}
func (e *GeneralOrderExecutor) OrderStore() *OrderStore {
return e.orderStore
}
func (e *GeneralOrderExecutor) ActiveMakerOrders() *ActiveOrderBook {
return e.activeMakerOrders
}
@ -144,11 +153,11 @@ func (e *GeneralOrderExecutor) BindProfitStats(profitStats *types.ProfitStats) {
})
}
func (e *GeneralOrderExecutor) Bind(notify ...bool) {
func (e *GeneralOrderExecutor) Bind() {
e.activeMakerOrders.BindStream(e.session.UserDataStream)
e.orderStore.BindStream(e.session.UserDataStream)
if len(notify) > 0 && notify[0] {
if !e.disableNotify {
// trade notify
e.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
Notify(trade)

View File

@ -89,6 +89,10 @@ func (s *OrderStore) Add(orders ...types.Order) {
defer s.mu.Unlock()
for _, o := range orders {
old, ok := s.orders[o.OrderID]
if ok && o.Tag == "" && old.Tag != "" {
o.Tag = old.Tag
}
s.orders[o.OrderID] = o
}
}
@ -120,11 +124,11 @@ func (s *OrderStore) BindStream(stream types.Stream) {
return
}
s.handleOrderUpdate(order)
s.HandleOrderUpdate(order)
})
}
func (s *OrderStore) handleOrderUpdate(order types.Order) {
func (s *OrderStore) HandleOrderUpdate(order types.Order) {
switch order.Status {
case types.OrderStatusNew, types.OrderStatusPartiallyFilled, types.OrderStatusFilled:

View File

@ -56,6 +56,10 @@ func (c *TradeCollector) Position() *types.Position {
return c.position
}
func (c *TradeCollector) TradeStore() *TradeStore {
return c.tradeStore
}
func (c *TradeCollector) SetPosition(position *types.Position) {
c.position = position
}

View File

@ -26,14 +26,14 @@ const RestBaseURL = "https://api.binance.com"
const SandboxRestBaseURL = "https://testnet.binance.vision"
const DebugRequestResponse = false
var Dialer = &net.Dialer{
var dialer = &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}
var DefaultTransport = &http.Transport{
var defaultTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: Dialer.DialContext,
DialContext: dialer.DialContext,
MaxIdleConns: 100,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
@ -45,7 +45,7 @@ var DefaultTransport = &http.Transport{
var DefaultHttpClient = &http.Client{
Timeout: defaultHTTPTimeout,
Transport: DefaultTransport,
Transport: defaultTransport,
}
type RestClient struct {

View File

@ -64,7 +64,6 @@ type Strategy struct {
*types.ProfitStats `persistence:"profit_stats"`
*types.TradeStats `persistence:"trade_stats"`
p *types.Position
MinInterval types.Interval `json:"MinInterval"` // minimum interval referred for doing stoploss/trailing exists and updating highest/lowest
elapsed *types.Queue
@ -173,10 +172,22 @@ func (s *Strategy) CurrentPosition() *types.Position {
return s.Position
}
func (s *Strategy) SubmitOrder(ctx context.Context, submitOrder types.SubmitOrder) (*types.Order, error) {
formattedOrder, err := s.Session.FormatOrder(submitOrder)
if err != nil {
return nil, err
}
createdOrders, errIdx, err := bbgo.BatchPlaceOrder(ctx, s.Session.Exchange, formattedOrder)
if len(errIdx) > 0 {
return nil, err
}
return &createdOrders[0], err
}
const closeOrderRetryLimit = 5
func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Value) error {
order := s.p.NewMarketCloseOrder(percentage)
order := s.Position.NewMarketCloseOrder(percentage)
if order == nil {
return nil
}
@ -199,17 +210,15 @@ func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Valu
if s.Market.IsDustQuantity(order.Quantity, price) {
return nil
}
createdOrders, err := s.GeneralOrderExecutor.FastSubmitOrders(ctx, *order)
o, err := s.SubmitOrder(ctx, *order)
if err != nil {
order.Quantity = order.Quantity.Mul(fixedpoint.One.Sub(Delta))
continue
}
if createdOrders != nil {
for _, o := range createdOrders {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
log.Errorf("created Order when Close: %v", o)
}
if o != nil {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
log.Errorf("created Order when Close: %v", o)
}
}
return nil
@ -420,47 +429,47 @@ func (s *Strategy) Rebalance(ctx context.Context) {
quoteBalance := balances[s.Market.QuoteCurrency].Total()
total := baseBalance.Add(quoteBalance.Div(price))
percentage := fixedpoint.One.Sub(Delta)
log.Infof("rebalance beta %f %v", beta, s.p)
log.Infof("rebalance beta %f %v", beta, s.Position)
if beta > s.RebalanceFilter {
if total.Mul(percentage).Compare(baseBalance) > 0 {
q := total.Mul(percentage).Sub(baseBalance)
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q.Neg()
s.p.Quote = q.Mul(price)
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q.Neg()
s.Position.Quote = q.Mul(price)
s.Position.AverageCost = price
}
} else if beta <= -s.RebalanceFilter {
if total.Mul(percentage).Compare(quoteBalance.Div(price)) > 0 {
q := total.Mul(percentage).Sub(quoteBalance.Div(price))
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q
s.p.Quote = q.Mul(price).Neg()
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q
s.Position.Quote = q.Mul(price).Neg()
s.Position.AverageCost = price
}
} else {
if total.Div(Two).Compare(quoteBalance.Div(price)) > 0 {
q := total.Div(Two).Sub(quoteBalance.Div(price))
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q
s.p.Quote = q.Mul(price).Neg()
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q
s.Position.Quote = q.Mul(price).Neg()
s.Position.AverageCost = price
} else if total.Div(Two).Compare(baseBalance) > 0 {
q := total.Div(Two).Sub(baseBalance)
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q.Neg()
s.p.Quote = q.Mul(price)
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q.Neg()
s.Position.Quote = q.Mul(price)
s.Position.AverageCost = price
} else {
s.p.Lock()
defer s.p.Unlock()
s.p.Reset()
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Reset()
}
}
log.Infof("rebalanceafter %v %v %v", baseBalance, quoteBalance, s.p)
log.Infof("rebalanceafter %v %v %v", baseBalance, quoteBalance, s.Position)
s.beta = beta
}
@ -646,18 +655,20 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
log.Infof("source in long %v %v %f", source, price, s.stdevLow.Last())
createdOrders, err := s.GeneralOrderExecutor.FastSubmitOrders(ctx, *submitOrder)
o, err := s.SubmitOrder(ctx, *submitOrder)
if err != nil {
log.WithError(err).Errorf("cannot place buy order")
return
}
log.Infof("orders %v", createdOrders)
if createdOrders != nil {
for _, o := range createdOrders {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
log.Infof("order %v", o)
if o != nil {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[o.OrderID]; !ok {
s.orderPendingCounter[o.OrderID] = counter
}
s.pendingLock.Unlock()
}
}
return
@ -690,19 +701,19 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
if submitOrder == nil {
return
}
createdOrders, err := s.GeneralOrderExecutor.FastSubmitOrders(ctx, *submitOrder)
o, err := s.SubmitOrder(ctx, *submitOrder)
if err != nil {
log.WithError(err).Errorf("cannot place sell order")
return
}
log.Infof("orders %v", createdOrders)
if createdOrders != nil {
for _, o := range createdOrders {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
s.pendingLock.Lock()
log.Infof("order %v", o)
if o != nil {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[o.OrderID]; !ok {
s.orderPendingCounter[o.OrderID] = counter
s.pendingLock.Unlock()
}
s.pendingLock.Unlock()
}
}
return
@ -717,12 +728,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Will be set by persistence if there's any from DB
if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market)
s.p = types.NewPositionFromMarket(s.Market)
} else {
s.p = types.NewPositionFromMarket(s.Market)
s.p.Base = s.Position.Base
s.p.Quote = s.Position.Quote
s.p.AverageCost = s.Position.AverageCost
}
if s.Session.MakerFeeRate.Sign() > 0 || s.Session.TakerFeeRate.Sign() > 0 {
s.Position.SetExchangeFeeRate(s.Session.ExchangeName, types.ExchangeFee{
MakerFeeRate: s.Session.MakerFeeRate,
TakerFeeRate: s.Session.TakerFeeRate,
})
}
if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(s.Market)
@ -742,7 +753,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
_ = s.ClosePosition(ctx, fixedpoint.One)
})
profit := floats.Slice{1., 1.}
profitChart := floats.Slice{1., 1.}
price, _ := s.Session.LastPrice(s.Symbol)
initAsset := s.CalcAssetValue(price).Float64()
cumProfit := floats.Slice{initAsset, initAsset}
@ -754,36 +765,100 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return p * (1. - Fee)
}
}
s.GeneralOrderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.GeneralOrderExecutor.DisableNotify()
orderStore := s.GeneralOrderExecutor.OrderStore()
orderStore.AddOrderUpdate = true
orderStore.RemoveCancelled = true
orderStore.RemoveFilled = true
activeOrders := s.GeneralOrderExecutor.ActiveMakerOrders()
tradeCollector := s.GeneralOrderExecutor.TradeCollector()
tradeStore := tradeCollector.TradeStore()
syscounter := 0
// Modify activeOrders to force write order updates
s.Session.UserDataStream.OnOrderUpdate(func(order types.Order) {
hasSymbol := len(activeOrders.Symbol) > 0
if hasSymbol && order.Symbol != activeOrders.Symbol {
return
}
switch order.Status {
case types.OrderStatusFilled:
s.pendingLock.Lock()
s.orderPendingCounter = make(map[uint64]int)
s.pendingLock.Unlock()
// make sure we have the order and we remove it
activeOrders.Remove(order)
case types.OrderStatusPartiallyFilled:
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[order.OrderID]; !ok {
s.orderPendingCounter[order.OrderID] = syscounter
}
s.pendingLock.Unlock()
activeOrders.Add(order)
case types.OrderStatusNew:
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[order.OrderID]; !ok {
s.orderPendingCounter[order.OrderID] = syscounter
}
s.pendingLock.Unlock()
activeOrders.Add(order)
case types.OrderStatusCanceled, types.OrderStatusRejected:
log.Debugf("[ActiveOrderBook] order status %s, removing order %s", order.Status, order)
s.pendingLock.Lock()
s.orderPendingCounter = make(map[uint64]int)
s.pendingLock.Unlock()
activeOrders.Remove(order)
default:
log.Errorf("unhandled order status: %s", order.Status)
}
orderStore.HandleOrderUpdate(order)
})
s.Session.UserDataStream.OnTradeUpdate(func(trade types.Trade) {
if trade.Symbol != s.Symbol {
return
}
profit, netProfit, madeProfit := s.Position.AddTrade(trade)
tradeStore.Add(trade)
if madeProfit {
p := s.Position.NewProfit(trade, profit, netProfit)
s.Environment.RecordPosition(s.Position, trade, &p)
s.TradeStats.Add(&p)
s.ProfitStats.AddTrade(trade)
s.ProfitStats.AddProfit(p)
bbgo.Notify(&p)
bbgo.Notify(s.ProfitStats)
}
price := trade.Price.Float64()
s.p.AddTrade(trade)
s.pendingLock.Lock()
delete(s.orderPendingCounter, trade.OrderID)
s.pendingLock.Unlock()
if s.buyPrice > 0 {
profit.Update(modify(price / s.buyPrice))
profitChart.Update(modify(price / s.buyPrice))
cumProfit.Update(s.CalcAssetValue(trade.Price).Float64())
} else if s.sellPrice > 0 {
profit.Update(modify(s.sellPrice / price))
profitChart.Update(modify(s.sellPrice / price))
cumProfit.Update(s.CalcAssetValue(trade.Price).Float64())
}
s.positionLock.Lock()
if s.p.IsDust(trade.Price) {
if s.Position.IsDust(trade.Price) {
s.buyPrice = 0
s.sellPrice = 0
s.highestPrice = 0
s.lowestPrice = 0
} else if s.p.IsLong() {
s.buyPrice = s.p.ApproximateAverageCost.Float64()
} else if s.Position.IsLong() {
s.buyPrice = s.Position.ApproximateAverageCost.Float64()
s.sellPrice = 0
s.highestPrice = math.Max(s.buyPrice, s.highestPrice)
s.lowestPrice = s.buyPrice
} else if s.p.IsShort() {
s.sellPrice = s.p.ApproximateAverageCost.Float64()
} else if s.Position.IsShort() {
s.sellPrice = s.Position.ApproximateAverageCost.Float64()
s.buyPrice = 0
s.highestPrice = s.sellPrice
if s.lowestPrice == 0 {
@ -795,12 +870,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.positionLock.Unlock()
})
s.GeneralOrderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.GeneralOrderExecutor.BindEnvironment(s.Environment)
s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats)
s.GeneralOrderExecutor.BindTradeStats(s.TradeStats)
s.GeneralOrderExecutor.Bind()
s.orderPendingCounter = make(map[uint64]int)
// Exit methods from config
@ -818,7 +887,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, s.startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, s.startTime))
s.InitDrawCommands(&profit, &cumProfit)
s.InitDrawCommands(&profitChart, &cumProfit)
bbgo.RegisterCommand("/config", "Show latest config", func(reply interact.Reply) {
var buffer bytes.Buffer
@ -826,10 +895,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
reply.Message(buffer.String())
})
bbgo.RegisterCommand("/pos", "Show internal position", func(reply interact.Reply) {
reply.Message(s.p.String())
})
bbgo.RegisterCommand("/dump", "Dump internal params", func(reply interact.Reply) {
reply.Message("Please enter series output length:")
}).Next(func(length string, reply interact.Reply) {
@ -855,9 +920,9 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return nil
}
// var lastK types.KLine
store.OnKLineClosed(func(kline types.KLine) {
counter := int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Milliseconds()) / s.MinInterval.Milliseconds()
syscounter = counter
if kline.Interval == s.Interval {
s.klineHandler(ctx, kline, counter)
} else if kline.Interval == s.MinInterval {
@ -886,7 +951,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
os.Stdout.Write(buffer.Bytes())
if s.GenerateGraph {
s.Draw(s.frameKLine.StartTime, &profit, &cumProfit)
s.Draw(s.frameKLine.StartTime, &profitChart, &cumProfit)
}
wg.Done()
})