From 3437515d6a4dc60b35f6d00ae5542493d57fd6a2 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 12 May 2021 20:08:25 +0800 Subject: [PATCH 01/12] rename placeOrder to submitOrder for making the api consistent --- pkg/cmd/orders.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 6ab606e35..a730f295c 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -103,9 +103,14 @@ var listOrdersCmd = &cobra.Command{ }, } + + + + + // go run ./cmd/bbgo placeorder --session=ftx --symbol=BTC/USDT --side=buy --price= --quantity= -var placeOrderCmd = &cobra.Command{ - Use: "placeorder", +var submitOrderCmd = &cobra.Command{ + Use: "submit-order", SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() @@ -200,12 +205,12 @@ func init() { listOrdersCmd.Flags().String("session", "", "the exchange session name for sync") listOrdersCmd.Flags().String("symbol", "", "the trading pair, like btcusdt") - placeOrderCmd.Flags().String("session", "", "the exchange session name for sync") - placeOrderCmd.Flags().String("symbol", "", "the trading pair, like btcusdt") - placeOrderCmd.Flags().String("side", "", "the trading side: buy or sell") - placeOrderCmd.Flags().String("price", "", "the trading price") - placeOrderCmd.Flags().String("quantity", "", "the trading quantity") + submitOrderCmd.Flags().String("session", "", "the exchange session name for sync") + submitOrderCmd.Flags().String("symbol", "", "the trading pair, like btcusdt") + submitOrderCmd.Flags().String("side", "", "the trading side: buy or sell") + submitOrderCmd.Flags().String("price", "", "the trading price") + submitOrderCmd.Flags().String("quantity", "", "the trading quantity") RootCmd.AddCommand(listOrdersCmd) - RootCmd.AddCommand(placeOrderCmd) + RootCmd.AddCommand(submitOrderCmd) } From c520cfa5403cdc01b60e52fb7f91736cbc1da2e8 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 13 May 2021 09:55:53 +0800 Subject: [PATCH 02/12] xmaker: fix price calculation --- pkg/strategy/xmaker/strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index bdf4aadbd..e7588903b 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -324,7 +324,7 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or } accumulativeAskQuantity += askQuantity - askPrice := aggregatePrice(sourceBook.Asks, accumulativeBidQuantity) + askPrice := aggregatePrice(sourceBook.Asks, accumulativeAskQuantity) askPrice = askPrice.MulFloat64(1.0 + s.AskMargin.Float64()) if i > 0 && s.Pips > 0 { askPrice += fixedpoint.NewFromFloat(s.makerMarket.TickSize * float64(s.Pips)) From c8b97629e071b5ae43eb8e74682d4fedf160381a Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 13 May 2021 19:41:05 +0800 Subject: [PATCH 03/12] add NumOfOrders method on active book --- pkg/bbgo/active_book.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/bbgo/active_book.go b/pkg/bbgo/active_book.go index 53c7bc702..e91edc05b 100644 --- a/pkg/bbgo/active_book.go +++ b/pkg/bbgo/active_book.go @@ -152,6 +152,10 @@ func (b *LocalActiveOrderBook) WriteOff(order types.Order) bool { return false } +func (b *LocalActiveOrderBook) NumOfOrders() int { + return b.Asks.Len() + b.Bids.Len() +} + func (b *LocalActiveOrderBook) Orders() types.OrderSlice { return append(b.Asks.Orders(), b.Bids.Orders()...) } From f69cbe9c316e818f01832280a9c4574fa5bffa4a Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 11:53:07 +0800 Subject: [PATCH 04/12] add basic TwapExecution --- pkg/bbgo/twap_order_executor.go | 290 ++++++++++++++++++++++++++++++++ pkg/cmd/orders.go | 154 +++++++++++++---- pkg/cmd/root.go | 34 ++++ pkg/exchange/max/convert.go | 5 +- pkg/fixedpoint/convert.go | 8 + pkg/strategy/xmaker/strategy.go | 10 +- pkg/types/orderbook.go | 21 +++ pkg/types/side.go | 51 +++--- 8 files changed, 509 insertions(+), 64 deletions(-) create mode 100644 pkg/bbgo/twap_order_executor.go diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/bbgo/twap_order_executor.go new file mode 100644 index 000000000..b597687ff --- /dev/null +++ b/pkg/bbgo/twap_order_executor.go @@ -0,0 +1,290 @@ +package bbgo + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "golang.org/x/time/rate" +) + +const OrderExecutionReady = 1 + +type TwapExecution struct { + Session *ExchangeSession + Symbol string + Side types.SideType + TargetQuantity fixedpoint.Value + SliceQuantity fixedpoint.Value + + market types.Market + marketDataStream types.Stream + userDataStream types.Stream + orderBook *types.StreamOrderBook + currentPrice fixedpoint.Value + activePosition fixedpoint.Value + + activeMakerOrders *LocalActiveOrderBook + orderStore *OrderStore + position *Position + + state int + + mu sync.Mutex +} + +func (e *TwapExecution) connectMarketData(ctx context.Context) { + log.Infof("connecting market data stream...") + if err := e.marketDataStream.Connect(ctx); err != nil { + log.WithError(err).Errorf("market data stream connect error") + } +} + +func (e *TwapExecution) connectUserData(ctx context.Context) { + log.Infof("connecting user data stream...") + if err := e.userDataStream.Connect(ctx); err != nil { + log.WithError(err).Errorf("user data stream connect error") + } +} + +func (e *TwapExecution) getSideBook() (pvs types.PriceVolumeSlice, err error) { + book := e.orderBook.Get() + + switch e.Side { + case types.SideTypeSell: + pvs = book.Asks + + case types.SideTypeBuy: + pvs = book.Bids + + default: + err = fmt.Errorf("invalid side type: %+v", e.Side) + } + + return pvs, err +} + +func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, err error) { + book := e.orderBook.Get() + + sideBook, err := e.getSideBook() + if err != nil { + return orderForm, err + } + + first, ok := sideBook.First() + if !ok { + return orderForm, fmt.Errorf("empty %s %s side book", e.Symbol, e.Side) + } + + newPrice := first.Price + + spread, ok := book.Spread() + if !ok { + return orderForm, errors.New("can not calculate spread, neither bid price or ask price exists") + } + + tickSize := fixedpoint.NewFromFloat(e.market.TickSize) + if spread > tickSize { + switch e.Side { + case types.SideTypeSell: + newPrice -= fixedpoint.NewFromFloat(e.market.TickSize) + case types.SideTypeBuy: + newPrice += fixedpoint.NewFromFloat(e.market.TickSize) + } + } + + orderForm = types.SubmitOrder{ + // ClientOrderID: "", + Symbol: e.Symbol, + Side: e.Side, + Type: types.OrderTypeLimitMaker, + Quantity: e.SliceQuantity.Float64(), + Price: newPrice.Float64(), + Market: e.market, + TimeInForce: "GTC", + } + return orderForm, err +} + +func (e *TwapExecution) updateOrder(ctx context.Context) error { + book := e.orderBook.Get() + bestBid, _ := book.BestBid() + bestAsk, _ := book.BestAsk() + log.Infof("best bid %f, best ask %f", bestBid.Price.Float64(), bestAsk.Price.Float64()) + + sideBook, err := e.getSideBook() + if err != nil { + return err + } + + first, ok := sideBook.First() + if !ok { + return fmt.Errorf("empty %s %s side book", e.Symbol, e.Side) + } + + tickSize := fixedpoint.NewFromFloat(e.market.TickSize) + + // check and see if we need to cancel the existing active orders + for e.activeMakerOrders.NumOfOrders() > 0 { + orders := e.activeMakerOrders.Orders() + + if len(orders) > 1 { + log.Warnf("there are more than 1 open orders in the strategy...") + } + + // get the first order + order := orders[0] + price := fixedpoint.NewFromFloat(order.Price) + quantity := fixedpoint.NewFromFloat(order.Quantity) + + // if the first bid price or first ask price is the same to the current active order + // we should skip updating the order + if first.Price == price { + // there are other orders in the same price, it means if we cancel ours, the price is still the best price. + if first.Volume > quantity { + return nil + } + + // if there is no gap between the first price entry and the second price entry + second, ok := sideBook.Second() + if !ok { + return fmt.Errorf("there is no secoond price on the %s order book %s", e.Symbol, e.Side) + } + + log.Infof("checking second price %f - %f") + // if there is no gap + if fixedpoint.Abs(first.Price-second.Price) == tickSize { + return nil + } + } + + log.Infof("canceling orders...") + if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { + log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) + } + + time.Sleep(3 * time.Second) + } + + orderForm, err := e.newBestPriceMakerOrder() + if err != nil { + return err + } + + createdOrders, err := e.Session.OrderExecutor.SubmitOrders(ctx, orderForm) + if err != nil { + return err + } + + e.activeMakerOrders.Add(createdOrders...) + e.orderStore.Add(createdOrders...) + return nil +} + +func (e *TwapExecution) orderUpdater(ctx context.Context) { + rateLimiter := rate.NewLimiter(rate.Every(time.Minute), 15) + ticker := time.NewTimer(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + + case <-e.orderBook.C: + if !rateLimiter.Allow() { + break + } + + if err := e.updateOrder(ctx); err != nil { + log.WithError(err).Errorf("order update failed") + } + + case <-ticker.C: + if !rateLimiter.Allow() { + break + } + + if err := e.updateOrder(ctx); err != nil { + log.WithError(err).Errorf("order update failed") + } + + } + } +} + +func (e TwapExecution) tradeUpdateHandler(trade types.Trade) { + // ignore trades that are not in the symbol we interested + if trade.Symbol != e.Symbol { + return + } + + if !e.orderStore.Exists(trade.OrderID) { + return + } + + q := fixedpoint.NewFromFloat(trade.Quantity) + _ = q + + e.position.AddTrade(trade) + + log.Infof("position updated: %+v", e.position) +} + +func (e *TwapExecution) Run(ctx context.Context) error { + var ok bool + e.market, ok = e.Session.Market(e.Symbol) + if !ok { + return fmt.Errorf("market %s not found", e.Symbol) + } + + e.marketDataStream = e.Session.Exchange.NewStream() + e.marketDataStream.SetPublicOnly() + e.marketDataStream.Subscribe(types.BookChannel, e.Symbol, types.SubscribeOptions{}) + + e.orderBook = types.NewStreamBook(e.Symbol) + e.orderBook.BindStream(e.marketDataStream) + go e.connectMarketData(ctx) + + e.userDataStream = e.Session.Exchange.NewStream() + e.userDataStream.OnTradeUpdate(e.tradeUpdateHandler) + e.position = &Position{ + Symbol: e.Symbol, + BaseCurrency: e.market.BaseCurrency, + QuoteCurrency: e.market.QuoteCurrency, + } + + e.orderStore = NewOrderStore(e.Symbol) + e.orderStore.BindStream(e.userDataStream) + e.activeMakerOrders = NewLocalActiveOrderBook() + e.activeMakerOrders.BindStream(e.userDataStream) + + go e.connectUserData(ctx) + go e.orderUpdater(ctx) + return nil +} + +type TwapOrderExecutor struct { + Session *ExchangeSession + + // Execution parameters + // DelayTime is the order update delay time + DelayTime types.Duration +} + +func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity fixedpoint.Value) (*TwapExecution, error) { + execution := &TwapExecution{ + Session: e.Session, + Symbol: symbol, + Side: side, + TargetQuantity: targetQuantity, + SliceQuantity: sliceQuantity, + } + err := execution.Run(ctx) + return execution, err +} diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index a730f295c..41a19d1ce 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -4,9 +4,13 @@ import ( "context" "fmt" "os" + "syscall" "time" + "github.com/c9s/bbgo/pkg/cmd/cmdutil" + "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/google/uuid" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -103,47 +107,14 @@ var listOrdersCmd = &cobra.Command{ }, } - - - - - -// go run ./cmd/bbgo placeorder --session=ftx --symbol=BTC/USDT --side=buy --price= --quantity= -var submitOrderCmd = &cobra.Command{ - Use: "submit-order", +var executeOrderCmd = &cobra.Command{ + Use: "execute-order", SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - configFile, err := cmd.Flags().GetString("config") - if err != nil { - return err - } - - if len(configFile) == 0 { - return fmt.Errorf("--config option is required") - } - - // if config file exists, use the config loaded from the config file. - // otherwise, use a empty config object - var userConfig *bbgo.Config - if _, err := os.Stat(configFile); err == nil { - // load successfully - userConfig, err = bbgo.Load(configFile, false) - if err != nil { - return err - } - } else if os.IsNotExist(err) { - // config file doesn't exist - userConfig = &bbgo.Config{} - } else { - // other error - return err - } - environ := bbgo.NewEnvironment() - - if err := environ.ConfigureExchangeSessions(userConfig); err != nil { - return err + if userConfig == nil { + return errors.New("config file is required") } sessionName, err := cmd.Flags().GetString("session") @@ -151,11 +122,99 @@ var submitOrderCmd = &cobra.Command{ return err } + symbol, err := cmd.Flags().GetString("symbol") + if err != nil { + return fmt.Errorf("can not get the symbol from flags: %w", err) + } + + if symbol == "" { + return fmt.Errorf("symbol not found") + } + + sideS, err := cmd.Flags().GetString("side") + if err != nil { + return fmt.Errorf("can't get side: %w", err) + } + + side, err := types.StrToSideType(sideS) + if err != nil { + return err + } + + targetQuantityS, err := cmd.Flags().GetString("target-quantity") + if err != nil { + return err + } + + targetQuantity, err := fixedpoint.NewFromString(targetQuantityS) + if err != nil { + return err + } + + sliceQuantityS, err := cmd.Flags().GetString("slice-quantity") + if err != nil { + return err + } + + sliceQuantity, err := fixedpoint.NewFromString(sliceQuantityS) + if err != nil { + return err + } + + environ := bbgo.NewEnvironment() + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + if err := environ.Init(ctx); err != nil { + return err + } + session, ok := environ.Session(sessionName) if !ok { return fmt.Errorf("session %s not found", sessionName) } + executor := &bbgo.TwapOrderExecutor{ + Session: session, + } + + execCtx, execCancel := context.WithCancel(ctx) + + execution, err := executor.Execute(execCtx, symbol, side, targetQuantity, sliceQuantity) + if err != nil { + execCancel() + _ = execution + return err + } + + // report execution here... + _ = execution + + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + + log.Infof("shutting down order executor...") + execCancel() + return nil + }, +} + +// go run ./cmd/bbgo submit-order --session=ftx --symbol=BTC/USDT --side=buy --price= --quantity= +var submitOrderCmd = &cobra.Command{ + Use: "submit-order", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + + if userConfig == nil { + return errors.New("config file is required") + } + + sessionName, err := cmd.Flags().GetString("session") + if err != nil { + return err + } + symbol, err := cmd.Flags().GetString("symbol") if err != nil { return fmt.Errorf("can't get the symbol from flags: %w", err) @@ -179,6 +238,20 @@ var submitOrderCmd = &cobra.Command{ return fmt.Errorf("can't get quantity: %w", err) } + environ := bbgo.NewEnvironment() + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + if err := environ.Init(ctx); err != nil { + return err + } + + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) + } + so := types.SubmitOrder{ ClientOrderID: uuid.New().String(), Symbol: symbol, @@ -211,6 +284,13 @@ func init() { submitOrderCmd.Flags().String("price", "", "the trading price") submitOrderCmd.Flags().String("quantity", "", "the trading quantity") + executeOrderCmd.Flags().String("session", "", "the exchange session name for sync") + executeOrderCmd.Flags().String("symbol", "", "the trading pair, like btcusdt") + executeOrderCmd.Flags().String("side", "", "the trading side: buy or sell") + executeOrderCmd.Flags().String("target-quantity", "", "target quantity") + executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity") + RootCmd.AddCommand(listOrdersCmd) RootCmd.AddCommand(submitOrderCmd) + RootCmd.AddCommand(executeOrderCmd) } diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index 87a110206..97fe05839 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/joho/godotenv" "github.com/lestrrat-go/file-rotatelogs" "github.com/pkg/errors" @@ -18,6 +19,8 @@ import ( _ "github.com/go-sql-driver/mysql" ) +var userConfig *bbgo.Config + var RootCmd = &cobra.Command{ Use: "bbgo", Short: "bbgo is a crypto trading bot", @@ -44,6 +47,31 @@ var RootCmd = &cobra.Command{ } } + configFile, err := cmd.Flags().GetString("config") + if err != nil { + return errors.Wrapf(err, "failed to get the config flag") + } + + // load config file nicely + if len(configFile) > 0 { + // if config file exists, use the config loaded from the config file. + // otherwise, use a empty config object + if _, err := os.Stat(configFile); err == nil { + // load successfully + userConfig, err = bbgo.Load(configFile, false) + if err != nil { + return errors.Wrapf(err, "can not load config file: %s", configFile) + } + + } else if os.IsNotExist(err) { + // config file doesn't exist, we should use the empty config + userConfig = &bbgo.Config{} + } else { + // other error + return errors.Wrapf(err, "config file load error: %s", configFile) + } + } + return nil }, @@ -80,6 +108,7 @@ func init() { RootCmd.PersistentFlags().String("ftx-subaccount-name", "", "subaccount name. Specify it if the credential is for subaccount.") } + func Execute() { viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) @@ -105,12 +134,17 @@ func Execute() { // Once the flags are defined, we can bind config keys with flags. if err := viper.BindPFlags(RootCmd.PersistentFlags()); err != nil { log.WithError(err).Errorf("failed to bind persistent flags. please check the flag settings.") + return } if err := viper.BindPFlags(RootCmd.Flags()); err != nil { log.WithError(err).Errorf("failed to bind local flags. please check the flag settings.") + return } + + + log.SetFormatter(&prefixed.TextFormatter{}) logger := log.StandardLogger() diff --git a/pkg/exchange/max/convert.go b/pkg/exchange/max/convert.go index e099126a3..31dcb5cac 100644 --- a/pkg/exchange/max/convert.go +++ b/pkg/exchange/max/convert.go @@ -127,9 +127,12 @@ func toGlobalOrderType(orderType max.OrderType) types.OrderType { case max.OrderTypeIOCLimit: return types.OrderTypeIOCLimit + case max.OrderTypePostOnly: + return types.OrderTypeLimitMaker + } - logger.Errorf("unknown order type: %v", orderType) + logger.Errorf("order convert error, unknown order type: %v", orderType) return types.OrderType(orderType) } diff --git a/pkg/fixedpoint/convert.go b/pkg/fixedpoint/convert.go index e349088ea..77307ca17 100644 --- a/pkg/fixedpoint/convert.go +++ b/pkg/fixedpoint/convert.go @@ -279,3 +279,11 @@ func NumFractionalDigits(a Value) int { } return numPow - numZeros } + + +func Abs(a Value) Value { + if a < 0 { + return -a + } + return a +} diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index e7588903b..4f3750951 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -675,14 +675,9 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order time.Sleep(s.UpdateInterval.Duration()) - for { + for s.activeMakerOrders.NumOfOrders() > 0 { orders := s.activeMakerOrders.Orders() - if len(orders) == 0 { - log.Info("all orders are cancelled successfully") - break - } - - log.Warnf("%d orders are not cancelled yet...", len(orders)) + log.Warnf("%d orders are not cancelled yet:", len(orders)) s.activeMakerOrders.Print() if err := s.makerSession.Exchange.CancelOrders(ctx, s.activeMakerOrders.Orders()...); err != nil { @@ -692,6 +687,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order log.Warnf("waiting for orders to be cancelled...") time.Sleep(3 * time.Second) } + log.Info("all orders are cancelled successfully") if err := s.Persistence.Save(s.state, ID, s.Symbol, stateKey); err != nil { log.WithError(err).Errorf("can not save state: %+v", s.state) diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 0f89dcd71..a8512128d 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -44,6 +44,13 @@ func (slice PriceVolumeSlice) Copy() PriceVolumeSlice { return s } +func (slice PriceVolumeSlice) Second() (PriceVolume, bool) { + if len(slice) > 1 { + return slice[1], true + } + return PriceVolume{}, false +} + func (slice PriceVolumeSlice) First() (PriceVolume, bool) { if len(slice) > 0 { return slice[0], true @@ -127,6 +134,20 @@ type OrderBook struct { asksChangeCallbacks []func(pvs PriceVolumeSlice) } +func (b *OrderBook) Spread() (fixedpoint.Value, bool) { + bestBid, ok := b.BestBid() + if !ok { + return 0, false + } + + bestAsk, ok := b.BestBid() + if !ok { + return 0, false + } + + return bestAsk.Price - bestBid.Price, true +} + func (b *OrderBook) BestBid() (PriceVolume, bool) { if len(b.Bids) == 0 { return PriceVolume{}, false diff --git a/pkg/types/side.go b/pkg/types/side.go index cc1ac7c30..492ebbdda 100644 --- a/pkg/types/side.go +++ b/pkg/types/side.go @@ -21,30 +21,39 @@ const ( var ErrInvalidSideType = errors.New("invalid side type") -func (side *SideType) UnmarshalJSON(data []byte) (err error) { +func StrToSideType(s string) (side SideType, err error) { + switch strings.ToLower(s) { + case "buy": + side = SideTypeBuy + + case "sell": + side = SideTypeSell + + case "both": + side = SideTypeBoth + + default: + err = ErrInvalidSideType + return side, err + + } + + return side, err +} + +func (side *SideType) UnmarshalJSON(data []byte) error { var s string - err = json.Unmarshal(data, &s) + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + ss, err := StrToSideType(s) if err != nil { return err } - switch strings.ToLower(s) { - case "buy": - *side = SideTypeBuy - - case "sell": - *side = SideTypeSell - - case "both": - *side = SideTypeBoth - - default: - err = ErrInvalidSideType - return err - - } - - return err + *side = ss + return nil } func (side SideType) Reverse() SideType { @@ -59,6 +68,10 @@ func (side SideType) Reverse() SideType { return side } +func (side SideType) String() string { + return string(side) +} + func (side SideType) Color() string { if side == SideTypeBuy { return Green From dc040bb82b28261838f7cabcedda96d0a1dc431c Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 12:23:07 +0800 Subject: [PATCH 05/12] improving logs --- pkg/bbgo/twap_order_executor.go | 59 +++++++++++++++++++++++---------- pkg/types/orderbook.go | 2 +- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/bbgo/twap_order_executor.go index b597687ff..f4dbfa555 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/bbgo/twap_order_executor.go @@ -25,9 +25,10 @@ type TwapExecution struct { market types.Market marketDataStream types.Stream userDataStream types.Stream - orderBook *types.StreamOrderBook - currentPrice fixedpoint.Value - activePosition fixedpoint.Value + + orderBook *types.StreamOrderBook + currentPrice fixedpoint.Value + activePosition fixedpoint.Value activeMakerOrders *LocalActiveOrderBook orderStore *OrderStore @@ -91,6 +92,7 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e tickSize := fixedpoint.NewFromFloat(e.market.TickSize) if spread > tickSize { + log.Infof("spread %f is greater than the tick size %f, adding 1 tick to the price...", spread.Float64(), tickSize.Float64()) switch e.Side { case types.SideTypeSell: newPrice -= fixedpoint.NewFromFloat(e.market.TickSize) @@ -113,11 +115,6 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } func (e *TwapExecution) updateOrder(ctx context.Context) error { - book := e.orderBook.Get() - bestBid, _ := book.BestBid() - bestAsk, _ := book.BestAsk() - log.Infof("best bid %f, best ask %f", bestBid.Price.Float64(), bestAsk.Price.Float64()) - sideBook, err := e.getSideBook() if err != nil { return err @@ -154,22 +151,20 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { // if there is no gap between the first price entry and the second price entry second, ok := sideBook.Second() if !ok { - return fmt.Errorf("there is no secoond price on the %s order book %s", e.Symbol, e.Side) + return fmt.Errorf("there is no secoond price on the %s order book %s, can not update", e.Symbol, e.Side) } - log.Infof("checking second price %f - %f") // if there is no gap if fixedpoint.Abs(first.Price-second.Price) == tickSize { + log.Infof("there is no gap between the second price %f and the first price %f (tick size = %f), skip updating", + first.Price.Float64(), + second.Price.Float64(), + tickSize.Float64()) return nil } } - log.Infof("canceling orders...") - if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { - log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) - } - - time.Sleep(3 * time.Second) + e.cancelActiveOrders(ctx) } orderForm, err := e.newBestPriceMakerOrder() @@ -187,10 +182,33 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { return nil } +func (e *TwapExecution) cancelActiveOrders(ctx context.Context) { + didCancel := false + for e.activeMakerOrders.NumOfOrders() > 0 { + didCancel = true + + log.Infof("canceling open orders...") + orders := e.activeMakerOrders.Orders() + if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { + log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) + } + time.Sleep(3 * time.Second) + } + + if didCancel { + log.Infof("orders are canceled successfully") + } +} + func (e *TwapExecution) orderUpdater(ctx context.Context) { rateLimiter := rate.NewLimiter(rate.Every(time.Minute), 15) ticker := time.NewTimer(5 * time.Second) defer ticker.Stop() + + defer func() { + e.cancelActiveOrders(context.Background()) + }() + for { select { case <-ctx.Done(): @@ -218,7 +236,7 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { } } -func (e TwapExecution) tradeUpdateHandler(trade types.Trade) { +func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { return @@ -236,6 +254,10 @@ func (e TwapExecution) tradeUpdateHandler(trade types.Trade) { log.Infof("position updated: %+v", e.position) } +func (e *TwapExecution) handleFilledOrder(order types.Order) { + log.Infof("order is filled: %s", order.String()) +} + func (e *TwapExecution) Run(ctx context.Context) error { var ok bool e.market, ok = e.Session.Market(e.Symbol) @@ -252,7 +274,7 @@ func (e *TwapExecution) Run(ctx context.Context) error { go e.connectMarketData(ctx) e.userDataStream = e.Session.Exchange.NewStream() - e.userDataStream.OnTradeUpdate(e.tradeUpdateHandler) + e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) e.position = &Position{ Symbol: e.Symbol, BaseCurrency: e.market.BaseCurrency, @@ -262,6 +284,7 @@ func (e *TwapExecution) Run(ctx context.Context) error { e.orderStore = NewOrderStore(e.Symbol) e.orderStore.BindStream(e.userDataStream) e.activeMakerOrders = NewLocalActiveOrderBook() + e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) go e.connectUserData(ctx) diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index a8512128d..e8b93fd4a 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -140,7 +140,7 @@ func (b *OrderBook) Spread() (fixedpoint.Value, bool) { return 0, false } - bestAsk, ok := b.BestBid() + bestAsk, ok := b.BestAsk() if !ok { return 0, false } From bb34b1002a6a3f1c1c60fefd3d2f276407b26d2f Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 14:52:19 +0800 Subject: [PATCH 06/12] improve order execution graceful shutdown --- pkg/bbgo/order_execution.go | 10 +- pkg/bbgo/order_processor.go | 17 ++- pkg/bbgo/order_processor_test.go | 2 +- pkg/bbgo/twap_order_executor.go | 191 ++++++++++++++++++++++++++++--- 4 files changed, 194 insertions(+), 26 deletions(-) diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index fc169e7d2..b701272ff 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -162,10 +162,10 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... // Increase the quantity if the amount is not enough, // this is the only increase op, later we will decrease the quantity if it meets the criteria - quantity = AdjustQuantityByMinAmount(quantity, price, market.MinAmount*1.01) + quantity = AdjustFloatQuantityByMinAmount(quantity, price, market.MinAmount*1.01) if c.MaxOrderAmount > 0 { - quantity = AdjustQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) + quantity = AdjustFloatQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) } quoteAssetQuota := math.Max(0.0, quoteBalance.Available.Float64()-c.MinQuoteBalance.Float64()) @@ -178,7 +178,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... continue } - quantity = AdjustQuantityByMaxAmount(quantity, price, quoteAssetQuota) + quantity = AdjustFloatQuantityByMaxAmount(quantity, price, quoteAssetQuota) // if MaxBaseAssetBalance is enabled, we should check the current base asset balance if baseBalance, hasBaseAsset := balances[market.BaseCurrency]; hasBaseAsset && c.MaxBaseAssetBalance > 0 { @@ -226,7 +226,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... } // if the amount is too small, we should increase it. - quantity = AdjustQuantityByMinAmount(quantity, price, market.MinNotional*1.01) + quantity = AdjustFloatQuantityByMinAmount(quantity, price, market.MinNotional*1.01) // we should not SELL too much quantity = math.Min(quantity, baseAssetBalance.Available.Float64()) @@ -253,7 +253,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... } if c.MaxOrderAmount > 0 { - quantity = AdjustQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) + quantity = AdjustFloatQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) } notional := quantity * lastPrice diff --git a/pkg/bbgo/order_processor.go b/pkg/bbgo/order_processor.go index b891fd010..4d203fee7 100644 --- a/pkg/bbgo/order_processor.go +++ b/pkg/bbgo/order_processor.go @@ -1,6 +1,7 @@ package bbgo import ( + "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/pkg/errors" ) @@ -14,7 +15,19 @@ var ( ) // AdjustQuantityByMinAmount adjusts the quantity to make the amount greater than the given minAmount -func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount float64) float64 { +func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount fixedpoint.Value) fixedpoint.Value { + // modify quantity for the min amount + amount := currentPrice.Mul(quantity) + if amount < minAmount { + ratio := minAmount.Div(amount) + quantity = quantity.Mul(ratio) + } + + return quantity +} + +// AdjustFloatQuantityByMinAmount adjusts the quantity to make the amount greater than the given minAmount +func AdjustFloatQuantityByMinAmount(quantity, currentPrice, minAmount float64) float64 { // modify quantity for the min amount amount := currentPrice * quantity if amount < minAmount { @@ -25,7 +38,7 @@ func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount float64) float6 return quantity } -func AdjustQuantityByMaxAmount(quantity float64, price float64, maxAmount float64) float64 { +func AdjustFloatQuantityByMaxAmount(quantity float64, price float64, maxAmount float64) float64 { amount := price * quantity if amount > maxAmount { ratio := maxAmount / amount diff --git a/pkg/bbgo/order_processor_test.go b/pkg/bbgo/order_processor_test.go index df1687e96..3e522a9d9 100644 --- a/pkg/bbgo/order_processor_test.go +++ b/pkg/bbgo/order_processor_test.go @@ -36,7 +36,7 @@ func TestAdjustQuantityByMinAmount(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - q := AdjustQuantityByMinAmount(test.args.quantity, test.args.price, test.args.minAmount) + q := AdjustFloatQuantityByMinAmount(test.args.quantity, test.args.price, test.args.minAmount) assert.Equal(t, test.wanted, q) }) } diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/bbgo/twap_order_executor.go index f4dbfa555..15cd2d276 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/bbgo/twap_order_executor.go @@ -13,18 +13,20 @@ import ( "golang.org/x/time/rate" ) -const OrderExecutionReady = 1 - type TwapExecution struct { Session *ExchangeSession Symbol string Side types.SideType TargetQuantity fixedpoint.Value SliceQuantity fixedpoint.Value + StopPrice fixedpoint.Value market types.Market marketDataStream types.Stream - userDataStream types.Stream + + userDataStream types.Stream + userDataStreamCtx context.Context + cancelUserDataStream context.CancelFunc orderBook *types.StreamOrderBook currentPrice fixedpoint.Value @@ -34,6 +36,11 @@ type TwapExecution struct { orderStore *OrderStore position *Position + executionCtx context.Context + cancelExecution context.CancelFunc + + stoppedC chan struct{} + state int mu sync.Mutex @@ -84,7 +91,6 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } newPrice := first.Price - spread, ok := book.Spread() if !ok { return orderForm, errors.New("can not calculate spread, neither bid price or ask price exists") @@ -101,12 +107,55 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } } + if e.StopPrice > 0 { + switch e.Side { + case types.SideTypeSell: + if newPrice < e.StopPrice { + log.Infof("%s order price %f is lower than the stop sell price %f, setting order price to the stop sell price %f", + e.Symbol, + newPrice.Float64(), + e.StopPrice.Float64(), + e.StopPrice.Float64()) + newPrice = e.StopPrice + } + + case types.SideTypeBuy: + if newPrice > e.StopPrice { + log.Infof("%s order price %f is higher than the stop buy price %f, setting order price to the stop buy price %f", + e.Symbol, + newPrice.Float64(), + e.StopPrice.Float64(), + e.StopPrice.Float64()) + newPrice = e.StopPrice + } + } + } + + minQuantity := fixedpoint.NewFromFloat(e.market.MinQuantity) + restQuantity := e.TargetQuantity - fixedpoint.Abs(e.position.Base) + if restQuantity < minQuantity { + return orderForm, fmt.Errorf("can not continue placing orders, rest quantity %f is less than the min quantity %f", restQuantity.Float64(), minQuantity.Float64()) + } + + // if the rest quantity in the next round is not enough, we should merge the rest quantity into this round + orderQuantity := e.SliceQuantity + nextRestQuantity := restQuantity - e.SliceQuantity + if nextRestQuantity < minQuantity { + orderQuantity = restQuantity + } + + minNotional := fixedpoint.NewFromFloat(e.market.MinNotional) + orderAmount := newPrice.Mul(orderQuantity) + if orderAmount <= minNotional { + orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) + } + orderForm = types.SubmitOrder{ // ClientOrderID: "", Symbol: e.Symbol, Side: e.Side, Type: types.OrderTypeLimitMaker, - Quantity: e.SliceQuantity.Float64(), + Quantity: orderQuantity.Float64(), Price: newPrice.Float64(), Market: e.market, TimeInForce: "GTC", @@ -115,6 +164,7 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } func (e *TwapExecution) updateOrder(ctx context.Context) error { + sideBook, err := e.getSideBook() if err != nil { return err @@ -132,7 +182,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { orders := e.activeMakerOrders.Orders() if len(orders) > 1 { - log.Warnf("there are more than 1 open orders in the strategy...") + log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) } // get the first order @@ -140,6 +190,29 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { price := fixedpoint.NewFromFloat(order.Price) quantity := fixedpoint.NewFromFloat(order.Quantity) + remainingQuantity := order.Quantity - order.ExecutedQuantity + if remainingQuantity <= e.market.MinQuantity { + log.Infof("order remaining quantity %f is less than the market minimal quantity %f, skip updating order", remainingQuantity, e.market.MinQuantity) + return nil + } + + if e.StopPrice > 0 { + switch e.Side { + case types.SideTypeBuy: + if first.Price > e.StopPrice { + log.Infof("%s first bid price %f is higher than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64()) + return nil + } + + case types.SideTypeSell: + if first.Price < e.StopPrice { + log.Infof("%s first ask price %f is lower than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64()) + return nil + } + } + + } + // if the first bid price or first ask price is the same to the current active order // we should skip updating the order if first.Price == price { @@ -151,12 +224,12 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { // if there is no gap between the first price entry and the second price entry second, ok := sideBook.Second() if !ok { - return fmt.Errorf("there is no secoond price on the %s order book %s, can not update", e.Symbol, e.Side) + return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.Symbol, e.Side) } // if there is no gap if fixedpoint.Abs(first.Price-second.Price) == tickSize { - log.Infof("there is no gap between the second price %f and the first price %f (tick size = %f), skip updating", + log.Infof("no gap between the second price %f and the first price %f (tick size = %f), skip updating", first.Price.Float64(), second.Price.Float64(), tickSize.Float64()) @@ -187,8 +260,10 @@ func (e *TwapExecution) cancelActiveOrders(ctx context.Context) { for e.activeMakerOrders.NumOfOrders() > 0 { didCancel = true - log.Infof("canceling open orders...") orders := e.activeMakerOrders.Orders() + log.Infof("canceling %d open orders:", len(orders)) + e.activeMakerOrders.Print() + if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) } @@ -205,8 +280,13 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { ticker := time.NewTimer(5 * time.Second) defer ticker.Stop() + // we should stop updater and clean up our open orders, if + // 1. the given context is canceled. + // 2. the base quantity equals to or greater than the target quantity defer func() { e.cancelActiveOrders(context.Background()) + e.cancelUserDataStream() + e.emitDone() }() for { @@ -219,6 +299,10 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { break } + if e.cancelContextIfTargetQuantityFilled() { + return + } + if err := e.updateOrder(ctx); err != nil { log.WithError(err).Errorf("order update failed") } @@ -228,6 +312,10 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { break } + if e.cancelContextIfTargetQuantityFilled() { + return + } + if err := e.updateOrder(ctx); err != nil { log.WithError(err).Errorf("order update failed") } @@ -236,6 +324,15 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { } } +func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool { + if fixedpoint.Abs(e.position.Base) >= e.TargetQuantity { + log.Infof("filled target quantity, canceling the order execution context") + e.cancelExecution() + return true + } + return false +} + func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { @@ -246,19 +343,27 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { return } - q := fixedpoint.NewFromFloat(trade.Quantity) - _ = q + log.Info(trade.String()) e.position.AddTrade(trade) - log.Infof("position updated: %+v", e.position) } func (e *TwapExecution) handleFilledOrder(order types.Order) { - log.Infof("order is filled: %s", order.String()) + log.Info(order.String()) + + // filled event triggers the order removal from the active order store + // we need to ensure we received every order update event before the execution is done. + e.cancelContextIfTargetQuantityFilled() } -func (e *TwapExecution) Run(ctx context.Context) error { +func (e *TwapExecution) Run(parentCtx context.Context) error { + e.mu.Lock() + e.stoppedC = make(chan struct{}) + e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) + e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(context.Background()) + e.mu.Unlock() + var ok bool e.market, ok = e.Session.Market(e.Symbol) if !ok { @@ -271,7 +376,7 @@ func (e *TwapExecution) Run(ctx context.Context) error { e.orderBook = types.NewStreamBook(e.Symbol) e.orderBook.BindStream(e.marketDataStream) - go e.connectMarketData(ctx) + go e.connectMarketData(e.executionCtx) e.userDataStream = e.Session.Exchange.NewStream() e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) @@ -287,11 +392,60 @@ func (e *TwapExecution) Run(ctx context.Context) error { e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) - go e.connectUserData(ctx) - go e.orderUpdater(ctx) + go e.connectUserData(e.userDataStreamCtx) + go e.orderUpdater(e.executionCtx) return nil } +func (e *TwapExecution) emitDone() { + e.mu.Lock() + if e.stoppedC == nil { + e.stoppedC = make(chan struct{}) + } + close(e.stoppedC) + e.mu.Unlock() +} + +func (e *TwapExecution) Done() (c <-chan struct{}) { + e.mu.Lock() + // if the channel is not allocated, it means it's not started yet, we need to return a closed channel + if e.stoppedC == nil { + e.stoppedC = make(chan struct{}) + close(e.stoppedC) + c = e.stoppedC + } else { + c = e.stoppedC + } + + e.mu.Unlock() + return c +} + +// Shutdown stops the execution +// If we call this method, it means the execution is still running, +// We need to: +// 1. stop the order updater (by using the execution context) +// 2. the order updater cancels all open orders and close the user data stream +func (e *TwapExecution) Shutdown(shutdownCtx context.Context) { + e.mu.Lock() + if e.cancelExecution != nil { + e.cancelExecution() + } + e.mu.Unlock() + + for { + select { + + case <-shutdownCtx.Done(): + return + + case <-e.Done(): + return + + } + } +} + type TwapOrderExecutor struct { Session *ExchangeSession @@ -300,13 +454,14 @@ type TwapOrderExecutor struct { DelayTime types.Duration } -func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity fixedpoint.Value) (*TwapExecution, error) { +func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity, stopPrice fixedpoint.Value) (*TwapExecution, error) { execution := &TwapExecution{ Session: e.Session, Symbol: symbol, Side: side, TargetQuantity: targetQuantity, SliceQuantity: sliceQuantity, + StopPrice: stopPrice, } err := execution.Run(ctx) return execution, err From 66bc06bc5f64882311cb7bc7294dcd53a5d48d94 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 14:52:32 +0800 Subject: [PATCH 07/12] add more order execution parameter checks --- pkg/cmd/orders.go | 48 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 41a19d1ce..20eca81a9 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -4,10 +4,10 @@ import ( "context" "fmt" "os" + "os/signal" "syscall" "time" - "github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/google/uuid" "github.com/pkg/errors" @@ -145,6 +145,9 @@ var executeOrderCmd = &cobra.Command{ if err != nil { return err } + if len(targetQuantityS) == 0 { + return errors.New("--target-quantity can not be empty") + } targetQuantity, err := fixedpoint.NewFromString(targetQuantityS) if err != nil { @@ -155,12 +158,25 @@ var executeOrderCmd = &cobra.Command{ if err != nil { return err } + if len(sliceQuantityS) == 0 { + return errors.New("--slice-quantity can not be empty") + } sliceQuantity, err := fixedpoint.NewFromString(sliceQuantityS) if err != nil { return err } + stopPriceS, err := cmd.Flags().GetString("stop-price") + if err != nil { + return err + } + + stopPrice, err := fixedpoint.NewFromString(stopPriceS) + if err != nil { + return err + } + environ := bbgo.NewEnvironment() if err := environ.ConfigureExchangeSessions(userConfig); err != nil { return err @@ -179,22 +195,33 @@ var executeOrderCmd = &cobra.Command{ Session: session, } - execCtx, execCancel := context.WithCancel(ctx) + executionCtx, cancelExecution := context.WithCancel(ctx) + defer cancelExecution() - execution, err := executor.Execute(execCtx, symbol, side, targetQuantity, sliceQuantity) + execution, err := executor.Execute(executionCtx, symbol, side, targetQuantity, sliceQuantity, stopPrice) if err != nil { - execCancel() - _ = execution return err } - // report execution here... - _ = execution + var sigC = make(chan os.Signal, 1) + signal.Notify(sigC, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigC) - cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + select { + case sig := <-sigC: + log.Warnf("signal %v", sig) + log.Infof("shutting down order executor...") + shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) + execution.Shutdown(shutdownCtx) + cancelShutdown() + + case <-execution.Done(): + log.Infof("the order execution is completed") + + case <-ctx.Done(): + + } - log.Infof("shutting down order executor...") - execCancel() return nil }, } @@ -289,6 +316,7 @@ func init() { executeOrderCmd.Flags().String("side", "", "the trading side: buy or sell") executeOrderCmd.Flags().String("target-quantity", "", "target quantity") executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity") + executeOrderCmd.Flags().String("stop-price", "0", "stop price") RootCmd.AddCommand(listOrdersCmd) RootCmd.AddCommand(submitOrderCmd) From f1fe49211762f38c7784714b51c6f75252e295f5 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 14:52:54 +0800 Subject: [PATCH 08/12] improve string format --- pkg/types/order.go | 2 +- pkg/types/trade.go | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/types/order.go b/pkg/types/order.go index 46f1f5e5c..f9f50ee8f 100644 --- a/pkg/types/order.go +++ b/pkg/types/order.go @@ -169,7 +169,7 @@ func (o Order) Backup() SubmitOrder { } func (o Order) String() string { - return fmt.Sprintf("order %s %s %f/%f at %f -> %s", o.Symbol, o.Side, o.ExecutedQuantity, o.Quantity, o.Price, o.Status) + return fmt.Sprintf("ORDER %s %s %s %f/%f @ %f -> %s", o.Exchange, o.Symbol, o.Side, o.ExecutedQuantity, o.Quantity, o.Price, o.Status) } func (o Order) PlainText() string { diff --git a/pkg/types/trade.go b/pkg/types/trade.go index 98c130015..ebc2ec8c7 100644 --- a/pkg/types/trade.go +++ b/pkg/types/trade.go @@ -71,16 +71,20 @@ type Trade struct { PnL sql.NullFloat64 `json:"pnl" db:"pnl"` } -func (trade Trade) PlainText() string { - return fmt.Sprintf("%s Trade %s %s price %s, quantity %s, amount %s", +func (trade Trade) String() string { + return fmt.Sprintf("TRADE %s %s %s %s @ %s, amount %s", trade.Exchange, trade.Symbol, trade.Side, - util.FormatFloat(trade.Price, 2), util.FormatFloat(trade.Quantity, 4), + util.FormatFloat(trade.Price, 3), util.FormatFloat(trade.QuoteQuantity, 2)) } +func (trade Trade) PlainText() string { + return trade.String() +} + var slackTradeTextTemplate = ":handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}" func (trade Trade) SlackAttachment() slack.Attachment { From abd6f4c7eff47729c98697a44206b29ce3775c51 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 14:53:18 +0800 Subject: [PATCH 09/12] rename bbgo.AdjustQuantityByMaxAmount to bbgo.AdjustFloatQuantityByMaxAmount --- pkg/fixedpoint/convert.go | 26 +++++++++++++------------- pkg/strategy/support/strategy.go | 2 +- pkg/strategy/xmaker/strategy.go | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/fixedpoint/convert.go b/pkg/fixedpoint/convert.go index 77307ca17..b3abcf9df 100644 --- a/pkg/fixedpoint/convert.go +++ b/pkg/fixedpoint/convert.go @@ -252,6 +252,19 @@ func NewFromInt64(val int64) Value { return Value(val * DefaultPow) } + +func NumFractionalDigits(a Value) int { + numPow := 0 + for pow := int64(DefaultPow); pow%10 != 1; pow /= 10 { + numPow++ + } + numZeros := 0 + for v := a.Int64(); v%10 == 0; v /= 10 { + numZeros++ + } + return numPow - numZeros +} + func Min(a, b Value) Value { if a < b { return a @@ -268,19 +281,6 @@ func Max(a, b Value) Value { return b } -func NumFractionalDigits(a Value) int { - numPow := 0 - for pow := int64(DefaultPow); pow%10 != 1; pow /= 10 { - numPow++ - } - numZeros := 0 - for v := a.Int64(); v%10 == 0; v /= 10 { - numZeros++ - } - return numPow - numZeros -} - - func Abs(a Value) Value { if a < 0 { return -a diff --git a/pkg/strategy/support/strategy.go b/pkg/strategy/support/strategy.go index 6a6a3865e..a41b5bea5 100644 --- a/pkg/strategy/support/strategy.go +++ b/pkg/strategy/support/strategy.go @@ -124,7 +124,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se if minNotional > b.Available.Float64() { log.Warnf("modifying quantity %f according to the min quote balance %f %s", quantity, b.Available.Float64(), market.QuoteCurrency) - quantity = bbgo.AdjustQuantityByMaxAmount(quantity, closePrice, b.Available.Float64()) + quantity = bbgo.AdjustFloatQuantityByMaxAmount(quantity, closePrice, b.Available.Float64()) } } diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 4f3750951..51cd1b7c7 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -417,7 +417,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { // check quote quantity if quote, ok := account.Balance(s.sourceMarket.QuoteCurrency); ok { if quote.Available < notional { - // qf := bbgo.AdjustQuantityByMaxAmount(quantity.Float64(), lastPrice, quote.Available.Float64()) + // qf := bbgo.AdjustFloatQuantityByMaxAmount(quantity.Float64(), lastPrice, quote.Available.Float64()) // quantity = fixedpoint.NewFromFloat(qf) } } From e3cb2ad86ca2dc8e07ef789f1a8178debbc1c427 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 14:57:22 +0800 Subject: [PATCH 10/12] fix telegram arguments index update --- pkg/notifier/telegramnotifier/telegram.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/notifier/telegramnotifier/telegram.go b/pkg/notifier/telegramnotifier/telegram.go index 8cf47b72c..1a5dae95f 100644 --- a/pkg/notifier/telegramnotifier/telegram.go +++ b/pkg/notifier/telegramnotifier/telegram.go @@ -12,7 +12,6 @@ type Notifier struct { type NotifyOption func(notifier *Notifier) - // New // TODO: register interaction with channel, so that we can route message to the specific telegram bot func New(interaction *Interaction, options ...NotifyOption) *Notifier { @@ -38,12 +37,15 @@ func filterPlaintextMessages(args []interface{}) (texts []string, pureArgs []int case types.PlainText: texts = append(texts, a.PlainText()) - textArgsOffset = idx + if textArgsOffset == -1 { + textArgsOffset = idx + } case types.Stringer: texts = append(texts, a.String()) - textArgsOffset = idx - + if textArgsOffset == -1 { + textArgsOffset = idx + } } } From a2bcfc86307e87506ebc199a4e64024485abf3e2 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 15:34:58 +0800 Subject: [PATCH 11/12] fix bollgrid function call --- pkg/strategy/bollgrid/strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/strategy/bollgrid/strategy.go b/pkg/strategy/bollgrid/strategy.go index f104e29f3..cd5f00725 100644 --- a/pkg/strategy/bollgrid/strategy.go +++ b/pkg/strategy/bollgrid/strategy.go @@ -157,7 +157,7 @@ func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types continue } // adjust buy quantity using current quote balance - quantity := bbgo.AdjustQuantityByMaxAmount(s.Quantity, price, quoteBalance.Float64()) + quantity := bbgo.AdjustFloatQuantityByMaxAmount(s.Quantity, price, quoteBalance.Float64()) order := types.SubmitOrder{ Symbol: s.Symbol, Side: types.SideTypeBuy, From 445feb016abac6d7ba8087f9e82967038508a1e3 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 15:35:11 +0800 Subject: [PATCH 12/12] support price ticks option --- pkg/bbgo/twap_order_executor.go | 56 ++++++++++++++++++--------------- pkg/cmd/orders.go | 23 ++++++++++---- pkg/fixedpoint/convert.go | 5 ++- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/bbgo/twap_order_executor.go index 15cd2d276..3cb81a937 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/bbgo/twap_order_executor.go @@ -20,6 +20,8 @@ type TwapExecution struct { TargetQuantity fixedpoint.Value SliceQuantity fixedpoint.Value StopPrice fixedpoint.Value + NumOfTicks int + UpdateInterval time.Duration market types.Market marketDataStream types.Stream @@ -96,14 +98,30 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e return orderForm, errors.New("can not calculate spread, neither bid price or ask price exists") } + // for example, we have tickSize = 0.01, and spread is 28.02 - 28.00 = 0.02 + // assign tickSpread = min(spread - tickSize, tickSpread) + // + // if number of ticks = 0, than the tickSpread is 0 + // tickSpread = min(0.02 - 0.01, 0) + // price = first bid price 28.00 + tickSpread (0.00) = 28.00 + // + // if number of ticks = 1, than the tickSpread is 0.01 + // tickSpread = min(0.02 - 0.01, 0.01) + // price = first bid price 28.00 + tickSpread (0.01) = 28.01 + // + // if number of ticks = 2, than the tickSpread is 0.02 + // tickSpread = min(0.02 - 0.01, 0.02) + // price = first bid price 28.00 + tickSpread (0.01) = 28.01 tickSize := fixedpoint.NewFromFloat(e.market.TickSize) + tickSpread := tickSize.MulInt(e.NumOfTicks) if spread > tickSize { - log.Infof("spread %f is greater than the tick size %f, adding 1 tick to the price...", spread.Float64(), tickSize.Float64()) + // there is a gap in the spread + tickSpread = fixedpoint.Min(tickSpread, spread-tickSize) switch e.Side { case types.SideTypeSell: - newPrice -= fixedpoint.NewFromFloat(e.market.TickSize) + newPrice -= tickSpread case types.SideTypeBuy: - newPrice += fixedpoint.NewFromFloat(e.market.TickSize) + newPrice += tickSpread } } @@ -228,7 +246,10 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { } // if there is no gap - if fixedpoint.Abs(first.Price-second.Price) == tickSize { + gap := fixedpoint.Abs(first.Price - second.Price) + if gap > tickSize.MulInt(e.NumOfTicks) { + // found gap, we should update our price + } else { log.Infof("no gap between the second price %f and the first price %f (tick size = %f), skip updating", first.Price.Float64(), second.Price.Float64(), @@ -277,7 +298,7 @@ func (e *TwapExecution) cancelActiveOrders(ctx context.Context) { func (e *TwapExecution) orderUpdater(ctx context.Context) { rateLimiter := rate.NewLimiter(rate.Every(time.Minute), 15) - ticker := time.NewTimer(5 * time.Second) + ticker := time.NewTimer(e.UpdateInterval) defer ticker.Stop() // we should stop updater and clean up our open orders, if @@ -364,6 +385,10 @@ func (e *TwapExecution) Run(parentCtx context.Context) error { e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(context.Background()) e.mu.Unlock() + if e.UpdateInterval == 0 { + e.UpdateInterval = 10 * time.Second + } + var ok bool e.market, ok = e.Session.Market(e.Symbol) if !ok { @@ -445,24 +470,3 @@ func (e *TwapExecution) Shutdown(shutdownCtx context.Context) { } } } - -type TwapOrderExecutor struct { - Session *ExchangeSession - - // Execution parameters - // DelayTime is the order update delay time - DelayTime types.Duration -} - -func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity, stopPrice fixedpoint.Value) (*TwapExecution, error) { - execution := &TwapExecution{ - Session: e.Session, - Symbol: symbol, - Side: side, - TargetQuantity: targetQuantity, - SliceQuantity: sliceQuantity, - StopPrice: stopPrice, - } - err := execution.Run(ctx) - return execution, err -} diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 20eca81a9..6c3ccbac1 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -167,6 +167,11 @@ var executeOrderCmd = &cobra.Command{ return err } + numOfPriceTicks, err := cmd.Flags().GetInt("price-ticks") + if err != nil { + return err + } + stopPriceS, err := cmd.Flags().GetString("stop-price") if err != nil { return err @@ -191,15 +196,20 @@ var executeOrderCmd = &cobra.Command{ return fmt.Errorf("session %s not found", sessionName) } - executor := &bbgo.TwapOrderExecutor{ - Session: session, - } - executionCtx, cancelExecution := context.WithCancel(ctx) defer cancelExecution() - execution, err := executor.Execute(executionCtx, symbol, side, targetQuantity, sliceQuantity, stopPrice) - if err != nil { + execution := &bbgo.TwapExecution{ + Session: session, + Symbol: symbol, + Side: side, + TargetQuantity: targetQuantity, + SliceQuantity: sliceQuantity, + StopPrice: stopPrice, + NumOfTicks: numOfPriceTicks, + } + + if err := execution.Run(executionCtx); err != nil { return err } @@ -317,6 +327,7 @@ func init() { executeOrderCmd.Flags().String("target-quantity", "", "target quantity") executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity") executeOrderCmd.Flags().String("stop-price", "0", "stop price") + executeOrderCmd.Flags().Int("price-ticks", 0, "the number of price tick for the jump spread, default to 0") RootCmd.AddCommand(listOrdersCmd) RootCmd.AddCommand(submitOrderCmd) diff --git a/pkg/fixedpoint/convert.go b/pkg/fixedpoint/convert.go index b3abcf9df..7fd20ca5d 100644 --- a/pkg/fixedpoint/convert.go +++ b/pkg/fixedpoint/convert.go @@ -58,6 +58,10 @@ func (v Value) Mul(v2 Value) Value { return NewFromFloat(v.Float64() * v2.Float64()) } +func (v Value) MulInt(v2 int) Value { + return NewFromFloat(v.Float64() * float64(v2)) +} + func (v Value) MulFloat64(v2 float64) Value { return NewFromFloat(v.Float64() * v2) } @@ -252,7 +256,6 @@ func NewFromInt64(val int64) Value { return Value(val * DefaultPow) } - func NumFractionalDigits(a Value) int { numPow := 0 for pow := int64(DefaultPow); pow%10 != 1; pow /= 10 {