add trade update callbacks and order update callbacks to order executor

This commit is contained in:
c9s 2020-12-21 13:40:23 +08:00
parent ab25bbaf74
commit 3eae58322a
4 changed files with 53 additions and 3 deletions

View File

@ -0,0 +1,27 @@
// Code generated by "callbackgen -type ExchangeOrderExecutor"; DO NOT EDIT.
package bbgo
import (
"github.com/c9s/bbgo/pkg/types"
)
func (e *ExchangeOrderExecutor) OnTradeUpdate(cb func(trade types.Trade)) {
e.tradeUpdateCallbacks = append(e.tradeUpdateCallbacks, cb)
}
func (e *ExchangeOrderExecutor) EmitTradeUpdate(trade types.Trade) {
for _, cb := range e.tradeUpdateCallbacks {
cb(trade)
}
}
func (e *ExchangeOrderExecutor) OnOrderUpdate(cb func(order types.Order)) {
e.orderUpdateCallbacks = append(e.orderUpdateCallbacks, cb)
}
func (e *ExchangeOrderExecutor) EmitOrderUpdate(order types.Order) {
for _, cb := range e.orderUpdateCallbacks {
cb(order)
}
}

View File

@ -33,10 +33,17 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi
}
// ExchangeOrderExecutor is an order executor wrapper for single exchange instance.
//go:generate callbackgen -type ExchangeOrderExecutor
type ExchangeOrderExecutor struct {
Notifiability `json:"-"`
Session *ExchangeSession
// private trade update callbacks
tradeUpdateCallbacks []func(trade types.Trade)
// private order update callbacks
orderUpdateCallbacks []func(order types.Order)
}
func (e *ExchangeOrderExecutor) notifySubmitOrders(orders ...types.SubmitOrder) {

View File

@ -149,6 +149,11 @@ func (trader *Trader) Run(ctx context.Context) error {
Session: session,
}
// forward trade updates and order updates to the order executor
session.Stream.OnTradeUpdate(baseOrderExecutor.EmitTradeUpdate)
session.Stream.OnOrderUpdate(baseOrderExecutor.EmitOrderUpdate)
// default to base order executor
var orderExecutor OrderExecutor = baseOrderExecutor
@ -252,7 +257,7 @@ func (trader *Trader) Run(ctx context.Context) error {
StoreID: "default",
Type: "memory",
},
Facade: trader.environment.PersistenceServiceFacade,
Facade: trader.environment.PersistenceServiceFacade,
}))
} else {
elem := field.Elem()
@ -300,6 +305,9 @@ func (trader *Trader) ReportPnL() *PnLReporterManager {
type OrderExecutor interface {
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
OnTradeUpdate(cb func(trade types.Trade))
OnOrderUpdate(cb func(order types.Order))
}
type OrderExecutionRouter interface {

View File

@ -71,10 +71,14 @@ type Strategy struct {
order types.Order
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()})
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.MovingAverageInterval.String()})
}
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
sourceSession := sessions[s.SourceExchangeName]
sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()})
sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.MovingAverageInterval.String()})
s.Subscribe(sourceSession)
// make sure we have the connection alive
targetSession := sessions[s.TargetExchangeName]
@ -170,6 +174,10 @@ func (s *Strategy) handleOrderUpdate(order types.Order) {
}
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
return nil
}
func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
// source session
sourceSession := sessions[s.SourceExchangeName]