diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 1dd5d8c9d..6d7ae236a 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -256,7 +256,7 @@ var executeOrderCmd = &cobra.Command{ executionCtx, cancelExecution := context.WithCancel(ctx) defer cancelExecution() - execution := &twap.Execution{ + execution := &twap.StreamExecutor{ Session: session, Symbol: symbol, Side: side, diff --git a/pkg/twap/stream_executor.go b/pkg/twap/stream_executor.go index 6ac4a809b..6d0e32b70 100644 --- a/pkg/twap/stream_executor.go +++ b/pkg/twap/stream_executor.go @@ -16,7 +16,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -type Execution struct { +// StreamExecutor is a TWAP execution that places orders on the best price by connecting to the real time market data stream. +type StreamExecutor struct { Session *bbgo.ExchangeSession Symbol string Side types.SideType @@ -52,21 +53,21 @@ type Execution struct { mu sync.Mutex } -func (e *Execution) connectMarketData(ctx context.Context) { +func (e *StreamExecutor) connectMarketData(ctx context.Context) { logrus.Infof("connecting market data stream...") if err := e.marketDataStream.Connect(ctx); err != nil { logrus.WithError(err).Errorf("market data stream connect error") } } -func (e *Execution) connectUserData(ctx context.Context) { +func (e *StreamExecutor) connectUserData(ctx context.Context) { logrus.Infof("connecting user data stream...") if err := e.userDataStream.Connect(ctx); err != nil { logrus.WithError(err).Errorf("user data stream connect error") } } -func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { +func (e *StreamExecutor) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -201,7 +202,7 @@ func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) return orderForm, err } -func (e *Execution) updateOrder(ctx context.Context) error { +func (e *StreamExecutor) updateOrder(ctx context.Context) error { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -284,13 +285,13 @@ func (e *Execution) updateOrder(ctx context.Context) error { return nil } -func (e *Execution) cancelActiveOrders() { +func (e *StreamExecutor) cancelActiveOrders() { gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second) defer gracefulCancel() e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange) } -func (e *Execution) orderUpdater(ctx context.Context) { +func (e *StreamExecutor) orderUpdater(ctx context.Context) { updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) ticker := time.NewTimer(e.UpdateInterval) defer ticker.Stop() @@ -340,7 +341,7 @@ func (e *Execution) orderUpdater(ctx context.Context) { } } -func (e *Execution) cancelContextIfTargetQuantityFilled() bool { +func (e *StreamExecutor) cancelContextIfTargetQuantityFilled() bool { base := e.position.GetBase() if base.Abs().Compare(e.TargetQuantity) >= 0 { @@ -351,7 +352,7 @@ func (e *Execution) cancelContextIfTargetQuantityFilled() bool { return false } -func (e *Execution) handleTradeUpdate(trade types.Trade) { +func (e *StreamExecutor) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { return @@ -367,7 +368,7 @@ func (e *Execution) handleTradeUpdate(trade types.Trade) { logrus.Infof("position updated: %+v", e.position) } -func (e *Execution) handleFilledOrder(order types.Order) { +func (e *StreamExecutor) handleFilledOrder(order types.Order) { logrus.Info(order.String()) // filled event triggers the order removal from the active order store @@ -375,7 +376,7 @@ func (e *Execution) handleFilledOrder(order types.Order) { e.cancelContextIfTargetQuantityFilled() } -func (e *Execution) Run(parentCtx context.Context) error { +func (e *StreamExecutor) Run(parentCtx context.Context) error { e.mu.Lock() e.stoppedC = make(chan struct{}) e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) @@ -419,7 +420,7 @@ func (e *Execution) Run(parentCtx context.Context) error { return nil } -func (e *Execution) emitDone() { +func (e *StreamExecutor) emitDone() { e.mu.Lock() if e.stoppedC == nil { e.stoppedC = make(chan struct{}) @@ -428,7 +429,7 @@ func (e *Execution) emitDone() { e.mu.Unlock() } -func (e *Execution) Done() (c <-chan struct{}) { +func (e *StreamExecutor) Done() (c <-chan struct{}) { e.mu.Lock() // if the channel is not allocated, it means it's not started yet, we need to return a closed channel if e.stoppedC == nil { @@ -448,7 +449,7 @@ func (e *Execution) Done() (c <-chan struct{}) { // We need to: // 1. stop the order updater (by using the execution context) // 2. the order updater cancels all open orders and close the user data stream -func (e *Execution) Shutdown(shutdownCtx context.Context) { +func (e *StreamExecutor) Shutdown(shutdownCtx context.Context) { e.mu.Lock() if e.cancelExecution != nil { e.cancelExecution()