fix: cache params and kline until next kline 1m appears

This commit is contained in:
zenix 2022-08-29 19:46:58 +09:00
parent 1eb03c3dba
commit ecc959835a
3 changed files with 48 additions and 33 deletions

View File

@ -167,7 +167,7 @@ func (e *Exchange) QueryOrder(ctx context.Context, q types.OrderQuery) (*types.O
func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
for _, order := range orders {
symbol := order.Symbol
matching, ok := e.matchingBook(symbol)
matching, ok := e.MatchingBook(symbol)
if !ok {
return nil, fmt.Errorf("matching engine is not initialized for symbol %s", symbol)
}
@ -192,7 +192,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder
}
func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
matching, ok := e.matchingBook(symbol)
matching, ok := e.MatchingBook(symbol)
if !ok {
return nil, fmt.Errorf("matching engine is not initialized for symbol %s", symbol)
}
@ -211,7 +211,7 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error {
for _, order := range orders {
matching, ok := e.matchingBook(order.Symbol)
matching, ok := e.MatchingBook(order.Symbol)
if !ok {
return fmt.Errorf("matching engine is not initialized for symbol %s", order.Symbol)
}
@ -250,7 +250,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}
func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) {
matching, ok := e.matchingBook(symbol)
matching, ok := e.MatchingBook(symbol)
if !ok {
return nil, fmt.Errorf("matching engine is not initialized for symbol %s", symbol)
}
@ -293,7 +293,7 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since
return nil, nil
}
func (e *Exchange) matchingBook(symbol string) (*SimplePriceMatching, bool) {
func (e *Exchange) MatchingBook(symbol string) (*SimplePriceMatching, bool) {
e.matchingBooksMutex.Lock()
m, ok := e.matchingBooks[symbol]
e.matchingBooksMutex.Unlock()
@ -362,21 +362,36 @@ func (e *Exchange) SubscribeMarketData(startTime, endTime time.Time, extraInterv
return klineC, nil
}
func (e *Exchange) ConsumeKLine(k types.KLine) {
if k.Interval == types.Interval1m {
e.currentTime = k.EndTime.Time()
matching, ok := e.matchingBook(k.Symbol)
if !ok {
log.Errorf("matching book of %s is not initialized", k.Symbol)
return
}
// here we generate trades and order updates
matching.processKLine(k)
func (e *Exchange) ConsumeKLine(k types.KLine, handlers []func(types.KLine, *ExchangeDataSource), src *ExchangeDataSource) {
matching, ok := e.MatchingBook(k.Symbol)
if !ok {
log.Errorf("matching book of %s is not initialized", k.Symbol)
return
}
if matching.ParamCache == nil {
matching.ParamCache = make(map[types.Interval]Param)
}
_, ok = matching.ParamCache[k.Interval]
if ok { // pop out all the old
for _, param := range matching.ParamCache {
if param.kline.Interval == types.Interval1m {
// here we generate trades and order updates
matching.processKLine(param.kline)
matching.NextKLine = &k
}
// log.Errorf("kline %v, next %v", param.kline, matching.NextKLine)
e.MarketDataStream.EmitKLineClosed(param.kline)
for _, h := range param.callback {
h(param.kline, param.src)
}
}
matching.ParamCache = make(map[types.Interval]Param)
}
matching.ParamCache[k.Interval] = Param{
callback: handlers,
src: src,
kline: k,
}
e.MarketDataStream.EmitKLineClosed(k)
}
func (e *Exchange) CloseMarketData() error {

View File

@ -47,6 +47,12 @@ func init() {
}
}
type Param struct {
callback []func(types.KLine, *ExchangeDataSource)
kline types.KLine
src *ExchangeDataSource
}
// SimplePriceMatching implements a simple kline data driven matching engine for backtest
//go:generate callbackgen -type SimplePriceMatching
type SimplePriceMatching struct {
@ -58,8 +64,10 @@ type SimplePriceMatching struct {
askOrders []types.Order
closedOrders map[uint64]types.Order
ParamCache map[types.Interval]Param
LastPrice fixedpoint.Value
LastKLine types.KLine
NextKLine *types.KLine
CurrentTime time.Time
Account *types.Account
@ -185,9 +193,9 @@ func (m *SimplePriceMatching) PlaceOrder(o types.SubmitOrder) (*types.Order, *ty
order.Price = m.Market.TruncatePrice(m.LastPrice)
price = order.Price
} else if order.Type == types.OrderTypeLimit {
if m.LastKLine.High.Compare(order.Price) > 0 && order.Side == types.SideTypeBuy {
if m.NextKLine.High.Compare(order.Price) > 0 && order.Side == types.SideTypeBuy {
order.AveragePrice = order.Price
} else if m.LastKLine.Low.Compare(order.Price) < 0 && order.Side == types.SideTypeSell {
} else if m.NextKLine.Low.Compare(order.Price) < 0 && order.Side == types.SideTypeSell {
order.AveragePrice = order.Price
} else {
@ -637,7 +645,6 @@ func (m *SimplePriceMatching) processKLine(kline types.KLine) {
m.buyToPrice(kline.Open)
}
}
m.LastKLine = kline
switch kline.Direction() {
case types.DirectionDown:
@ -669,6 +676,8 @@ func (m *SimplePriceMatching) processKLine(kline types.KLine) {
m.buyToPrice(kline.Close)
}
}
m.LastKLine = kline
}
func (m *SimplePriceMatching) newOrder(o types.SubmitOrder, orderID uint64) types.Order {

View File

@ -446,12 +446,7 @@ var BacktestCmd = &cobra.Command{
if numOfExchangeSources == 1 {
exSource := exchangeSources[0]
for k := range exSource.C {
exSource.Exchange.ConsumeKLine(k)
for _, h := range kLineHandlers {
h(k, &exSource)
}
exSource.Exchange.ConsumeKLine(k, kLineHandlers, &exSource)
}
if err := exSource.Exchange.CloseMarketData(); err != nil {
@ -472,11 +467,7 @@ var BacktestCmd = &cobra.Command{
break RunMultiExchangeData
}
exK.Exchange.ConsumeKLine(k)
for _, h := range kLineHandlers {
h(k, &exK)
}
exK.Exchange.ConsumeKLine(k, kLineHandlers, &exK)
}
}
}()