mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
refactor and fix backtest for user data stream and market data stream
This commit is contained in:
parent
38fd5422ab
commit
3aa36b5989
|
@ -25,7 +25,7 @@ type Exchange struct {
|
|||
account *types.Account
|
||||
config *bbgo.Backtest
|
||||
|
||||
stream *Stream
|
||||
userDataStream *Stream
|
||||
|
||||
trades map[string][]types.Trade
|
||||
closedOrders map[string][]types.Order
|
||||
|
@ -91,31 +91,7 @@ func (e *Exchange) Done() chan struct{} {
|
|||
}
|
||||
|
||||
func (e *Exchange) NewStream() types.Stream {
|
||||
if e.stream != nil {
|
||||
panic("backtest stream can not be allocated twice")
|
||||
}
|
||||
|
||||
e.stream = &Stream{exchange: e}
|
||||
|
||||
e.stream.OnTradeUpdate(func(trade types.Trade) {
|
||||
e.trades[trade.Symbol] = append(e.trades[trade.Symbol], trade)
|
||||
})
|
||||
|
||||
for symbol, market := range e.markets {
|
||||
matching := &SimplePriceMatching{
|
||||
CurrentTime: e.startTime,
|
||||
Account: e.account,
|
||||
Market: market,
|
||||
MakerCommission: e.config.Account.MakerCommission,
|
||||
TakerCommission: e.config.Account.TakerCommission,
|
||||
}
|
||||
matching.OnTradeUpdate(e.stream.EmitTradeUpdate)
|
||||
matching.OnOrderUpdate(e.stream.EmitOrderUpdate)
|
||||
matching.OnBalanceUpdate(e.stream.EmitBalanceUpdate)
|
||||
e.matchingBooks[symbol] = matching
|
||||
}
|
||||
|
||||
return e.stream
|
||||
return &Stream{exchange: e}
|
||||
}
|
||||
|
||||
func (e Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
|
||||
|
@ -140,11 +116,11 @@ func (e Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder)
|
|||
e.closedOrders[symbol] = append(e.closedOrders[symbol], *createdOrder)
|
||||
}
|
||||
|
||||
e.stream.EmitOrderUpdate(*createdOrder)
|
||||
e.userDataStream.EmitOrderUpdate(*createdOrder)
|
||||
}
|
||||
|
||||
if trade != nil {
|
||||
e.stream.EmitTradeUpdate(*trade)
|
||||
e.userDataStream.EmitTradeUpdate(*trade)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,7 +156,7 @@ func (e Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error
|
|||
return err
|
||||
}
|
||||
|
||||
e.stream.EmitOrderUpdate(canceledOrder)
|
||||
e.userDataStream.EmitOrderUpdate(canceledOrder)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -15,6 +15,8 @@ type Stream struct {
|
|||
types.StandardStream
|
||||
|
||||
exchange *Exchange
|
||||
|
||||
publicOnly bool
|
||||
}
|
||||
|
||||
func (s *Stream) Connect(ctx context.Context) error {
|
||||
|
@ -51,13 +53,35 @@ func (s *Stream) Connect(ctx context.Context) error {
|
|||
|
||||
log.Infof("used symbols: %v and intervals: %v", symbols, intervals)
|
||||
|
||||
go func() {
|
||||
log.Infof("emitting connect callbacks...")
|
||||
s.EmitConnect()
|
||||
if !s.publicOnly {
|
||||
// user data stream
|
||||
s.OnTradeUpdate(func(trade types.Trade) {
|
||||
s.exchange.trades[trade.Symbol] = append(s.exchange.trades[trade.Symbol], trade)
|
||||
})
|
||||
|
||||
log.Infof("emitting start callbacks...")
|
||||
for symbol, market := range s.exchange.markets {
|
||||
matching := &SimplePriceMatching{
|
||||
CurrentTime: s.exchange.startTime,
|
||||
Account: s.exchange.account,
|
||||
Market: market,
|
||||
MakerCommission: s.exchange.config.Account.MakerCommission,
|
||||
TakerCommission: s.exchange.config.Account.TakerCommission,
|
||||
}
|
||||
matching.OnTradeUpdate(s.EmitTradeUpdate)
|
||||
matching.OnOrderUpdate(s.EmitOrderUpdate)
|
||||
matching.OnBalanceUpdate(s.EmitBalanceUpdate)
|
||||
s.exchange.matchingBooks[symbol] = matching
|
||||
}
|
||||
|
||||
// assign user data stream back
|
||||
s.exchange.userDataStream = s
|
||||
}
|
||||
|
||||
s.EmitConnect()
|
||||
s.EmitStart()
|
||||
|
||||
if s.publicOnly {
|
||||
go func() {
|
||||
log.Infof("querying klines from database...")
|
||||
klineC, errC := s.exchange.srv.QueryKLinesCh(s.exchange.startTime, s.exchange.endTime, s.exchange, symbols, intervals)
|
||||
numKlines := 0
|
||||
|
@ -88,11 +112,13 @@ func (s *Stream) Connect(ctx context.Context) error {
|
|||
log.WithError(err).Error("stream close error")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Stream) SetPublicOnly() {
|
||||
s.publicOnly = true
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -211,11 +211,8 @@ func (environ *Environment) AddExchangesByViperKeys() error {
|
|||
}
|
||||
|
||||
func InitExchangeSession(name string, session *ExchangeSession) error {
|
||||
exchangeName, err := types.ValidExchangeName(session.ExchangeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
var exchangeName = session.ExchangeName
|
||||
var exchange types.Exchange
|
||||
if session.Key != "" && session.Secret != "" {
|
||||
if !session.PublicOnly {
|
||||
|
|
|
@ -133,7 +133,7 @@ func (s *Server) newEngine() *gin.Engine {
|
|||
return
|
||||
}
|
||||
|
||||
err := bbgo.InitExchangeSession(session.ExchangeName, &session)
|
||||
err := bbgo.InitExchangeSession(session.ExchangeName.String(), &session)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{
|
||||
"error": err.Error(),
|
||||
|
@ -182,7 +182,7 @@ func (s *Server) newEngine() *gin.Engine {
|
|||
return
|
||||
}
|
||||
|
||||
if err := bbgo.InitExchangeSession(session.ExchangeName, &session); err != nil {
|
||||
if err := bbgo.InitExchangeSession(session.ExchangeName.String(), &session); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{
|
||||
"error": err.Error(),
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue
Block a user