mirror of
https://github.com/c9s/bbgo.git
synced 2024-09-20 08:11:08 +00:00
rename twap.Execution to twap.StreamExecutor
This commit is contained in:
parent
9dd85623b9
commit
1294cd95be
|
@ -256,7 +256,7 @@ var executeOrderCmd = &cobra.Command{
|
||||||
executionCtx, cancelExecution := context.WithCancel(ctx)
|
executionCtx, cancelExecution := context.WithCancel(ctx)
|
||||||
defer cancelExecution()
|
defer cancelExecution()
|
||||||
|
|
||||||
execution := &twap.Execution{
|
execution := &twap.StreamExecutor{
|
||||||
Session: session,
|
Session: session,
|
||||||
Symbol: symbol,
|
Symbol: symbol,
|
||||||
Side: side,
|
Side: side,
|
||||||
|
|
|
@ -16,7 +16,8 @@ import (
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Execution struct {
|
// StreamExecutor is a TWAP execution that places orders on the best price by connecting to the real time market data stream.
|
||||||
|
type StreamExecutor struct {
|
||||||
Session *bbgo.ExchangeSession
|
Session *bbgo.ExchangeSession
|
||||||
Symbol string
|
Symbol string
|
||||||
Side types.SideType
|
Side types.SideType
|
||||||
|
@ -52,21 +53,21 @@ type Execution struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) connectMarketData(ctx context.Context) {
|
func (e *StreamExecutor) connectMarketData(ctx context.Context) {
|
||||||
logrus.Infof("connecting market data stream...")
|
logrus.Infof("connecting market data stream...")
|
||||||
if err := e.marketDataStream.Connect(ctx); err != nil {
|
if err := e.marketDataStream.Connect(ctx); err != nil {
|
||||||
logrus.WithError(err).Errorf("market data stream connect error")
|
logrus.WithError(err).Errorf("market data stream connect error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) connectUserData(ctx context.Context) {
|
func (e *StreamExecutor) connectUserData(ctx context.Context) {
|
||||||
logrus.Infof("connecting user data stream...")
|
logrus.Infof("connecting user data stream...")
|
||||||
if err := e.userDataStream.Connect(ctx); err != nil {
|
if err := e.userDataStream.Connect(ctx); err != nil {
|
||||||
logrus.WithError(err).Errorf("user data stream connect error")
|
logrus.WithError(err).Errorf("user data stream connect error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) {
|
func (e *StreamExecutor) newBestPriceOrder() (orderForm types.SubmitOrder, err error) {
|
||||||
book := e.orderBook.Copy()
|
book := e.orderBook.Copy()
|
||||||
sideBook := book.SideBook(e.Side)
|
sideBook := book.SideBook(e.Side)
|
||||||
|
|
||||||
|
@ -201,7 +202,7 @@ func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error)
|
||||||
return orderForm, err
|
return orderForm, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) updateOrder(ctx context.Context) error {
|
func (e *StreamExecutor) updateOrder(ctx context.Context) error {
|
||||||
book := e.orderBook.Copy()
|
book := e.orderBook.Copy()
|
||||||
sideBook := book.SideBook(e.Side)
|
sideBook := book.SideBook(e.Side)
|
||||||
|
|
||||||
|
@ -284,13 +285,13 @@ func (e *Execution) updateOrder(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) cancelActiveOrders() {
|
func (e *StreamExecutor) cancelActiveOrders() {
|
||||||
gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||||
defer gracefulCancel()
|
defer gracefulCancel()
|
||||||
e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange)
|
e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) orderUpdater(ctx context.Context) {
|
func (e *StreamExecutor) orderUpdater(ctx context.Context) {
|
||||||
updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1)
|
updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1)
|
||||||
ticker := time.NewTimer(e.UpdateInterval)
|
ticker := time.NewTimer(e.UpdateInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
@ -340,7 +341,7 @@ func (e *Execution) orderUpdater(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) cancelContextIfTargetQuantityFilled() bool {
|
func (e *StreamExecutor) cancelContextIfTargetQuantityFilled() bool {
|
||||||
base := e.position.GetBase()
|
base := e.position.GetBase()
|
||||||
|
|
||||||
if base.Abs().Compare(e.TargetQuantity) >= 0 {
|
if base.Abs().Compare(e.TargetQuantity) >= 0 {
|
||||||
|
@ -351,7 +352,7 @@ func (e *Execution) cancelContextIfTargetQuantityFilled() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) handleTradeUpdate(trade types.Trade) {
|
func (e *StreamExecutor) handleTradeUpdate(trade types.Trade) {
|
||||||
// ignore trades that are not in the symbol we interested
|
// ignore trades that are not in the symbol we interested
|
||||||
if trade.Symbol != e.Symbol {
|
if trade.Symbol != e.Symbol {
|
||||||
return
|
return
|
||||||
|
@ -367,7 +368,7 @@ func (e *Execution) handleTradeUpdate(trade types.Trade) {
|
||||||
logrus.Infof("position updated: %+v", e.position)
|
logrus.Infof("position updated: %+v", e.position)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) handleFilledOrder(order types.Order) {
|
func (e *StreamExecutor) handleFilledOrder(order types.Order) {
|
||||||
logrus.Info(order.String())
|
logrus.Info(order.String())
|
||||||
|
|
||||||
// filled event triggers the order removal from the active order store
|
// filled event triggers the order removal from the active order store
|
||||||
|
@ -375,7 +376,7 @@ func (e *Execution) handleFilledOrder(order types.Order) {
|
||||||
e.cancelContextIfTargetQuantityFilled()
|
e.cancelContextIfTargetQuantityFilled()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) Run(parentCtx context.Context) error {
|
func (e *StreamExecutor) Run(parentCtx context.Context) error {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
e.stoppedC = make(chan struct{})
|
e.stoppedC = make(chan struct{})
|
||||||
e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx)
|
e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx)
|
||||||
|
@ -419,7 +420,7 @@ func (e *Execution) Run(parentCtx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) emitDone() {
|
func (e *StreamExecutor) emitDone() {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
if e.stoppedC == nil {
|
if e.stoppedC == nil {
|
||||||
e.stoppedC = make(chan struct{})
|
e.stoppedC = make(chan struct{})
|
||||||
|
@ -428,7 +429,7 @@ func (e *Execution) emitDone() {
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execution) Done() (c <-chan struct{}) {
|
func (e *StreamExecutor) Done() (c <-chan struct{}) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
|
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
|
||||||
if e.stoppedC == nil {
|
if e.stoppedC == nil {
|
||||||
|
@ -448,7 +449,7 @@ func (e *Execution) Done() (c <-chan struct{}) {
|
||||||
// We need to:
|
// We need to:
|
||||||
// 1. stop the order updater (by using the execution context)
|
// 1. stop the order updater (by using the execution context)
|
||||||
// 2. the order updater cancels all open orders and close the user data stream
|
// 2. the order updater cancels all open orders and close the user data stream
|
||||||
func (e *Execution) Shutdown(shutdownCtx context.Context) {
|
func (e *StreamExecutor) Shutdown(shutdownCtx context.Context) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
if e.cancelExecution != nil {
|
if e.cancelExecution != nil {
|
||||||
e.cancelExecution()
|
e.cancelExecution()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user