From 6d249cf83c25124b3944bcc682f754c785f484f4 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 10:27:01 +0800 Subject: [PATCH 01/18] bypass disconnect event --- pkg/exchange/max/maxapi/websocket.go | 3 ++- pkg/exchange/max/maxapi/websocketservice_callbacks.go | 6 +++--- pkg/exchange/max/stream.go | 2 ++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/exchange/max/maxapi/websocket.go b/pkg/exchange/max/maxapi/websocket.go index f005416f2..9177a4824 100644 --- a/pkg/exchange/max/maxapi/websocket.go +++ b/pkg/exchange/max/maxapi/websocket.go @@ -53,7 +53,7 @@ type WebSocketService struct { Subscriptions []Subscription connectCallbacks []func(conn *websocket.Conn) - disconnectCallbacks []func(conn *websocket.Conn) + disconnectCallbacks []func() errorCallbacks []func(err error) messageCallbacks []func(message []byte) @@ -163,6 +163,7 @@ func (s *WebSocketService) read(ctx context.Context) { if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + s.EmitDisconnect() // emit reconnect to start a new connection s.emitReconnect() return diff --git a/pkg/exchange/max/maxapi/websocketservice_callbacks.go b/pkg/exchange/max/maxapi/websocketservice_callbacks.go index e32d2b5db..aec6a66b4 100644 --- a/pkg/exchange/max/maxapi/websocketservice_callbacks.go +++ b/pkg/exchange/max/maxapi/websocketservice_callbacks.go @@ -16,13 +16,13 @@ func (s *WebSocketService) EmitConnect(conn *websocket.Conn) { } } -func (s *WebSocketService) OnDisconnect(cb func(conn *websocket.Conn)) { +func (s *WebSocketService) OnDisconnect(cb func()) { s.disconnectCallbacks = append(s.disconnectCallbacks, cb) } -func (s *WebSocketService) EmitDisconnect(conn *websocket.Conn) { +func (s *WebSocketService) EmitDisconnect() { for _, cb := range s.disconnectCallbacks { - cb(conn) + cb() } } diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index e5d06797c..c179f37c7 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -42,6 +42,8 @@ func NewStream(key, secret string) *Stream { } }) + wss.OnDisconnect(stream.EmitDisconnect) + wss.OnMessage(func(message []byte) { logger.Debugf("M: %s", message) }) From c95e71242046856e65172b5e3fbafeb98d2ea08b Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 13:27:18 +0800 Subject: [PATCH 02/18] binance: emit disconnect --- pkg/exchange/binance/stream.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 40e97c291..1d32ce6e8 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -366,6 +366,8 @@ func (s *Stream) read(ctx context.Context) { log.Info("websocket connection closed, going away") } + s.EmitDisconnect() + // reconnect for err != nil { select { From 9f7af3ce824b983135467d12c1736ab648c12653 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 17:51:17 +0800 Subject: [PATCH 03/18] assign SubAccount name to the new exchange session --- pkg/bbgo/environment.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 1da074846..d41262a5b 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -246,6 +246,7 @@ func NewExchangeSessionFromConfig(name string, sessionConfig *ExchangeSession) ( session.EnvVarPrefix = sessionConfig.EnvVarPrefix session.Key = sessionConfig.Key session.Secret = sessionConfig.Secret + session.SubAccount = sessionConfig.SubAccount session.PublicOnly = sessionConfig.PublicOnly session.Margin = sessionConfig.Margin session.IsolatedMargin = sessionConfig.IsolatedMargin From 3ab349f38af60de7b181b49ed8ce331e845d01ab Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 17:57:03 +0800 Subject: [PATCH 04/18] add more details to the readme --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index c61911f0b..e3ea250f3 100644 --- a/README.md +++ b/README.md @@ -53,22 +53,28 @@ go get -u github.com/c9s/bbgo/cmd/bbgo Add your dotenv file: ``` +# optional SLACK_TOKEN= +# optional TELEGRAM_BOT_TOKEN= TELEGRAM_BOT_AUTH_TOKEN= +# if you have one BINANCE_API_KEY= BINANCE_API_SECRET= +# if you have one MAX_API_KEY= MAX_API_SECRET= +# if you have one FTX_API_KEY= FTX_API_SECRET= # specify it if credentials are for subaccount FTX_SUBACCOUNT_NAME= +# optional, if you have the db setup MYSQL_URL=root@tcp(127.0.0.1:3306)/bbgo?parseTime=true ``` @@ -220,6 +226,16 @@ then the following types could be injected automatically: - `*bbgo.ExchangeSession` - `types.Market` + +## Strategy Execution Phases + +1. Load config from the config file. +2. Allocate and initialize exchange sessions. +3. Add exchange sessions to the environment (the data layer). +4. Use the given environment to initialize the trader object. +5. The trader initializes the environment and start the exchange connections. +6. Call strategy.Run() method sequentially. + ## Exchange API Examples Please check out the example directory: [examples](examples) From 9b172e88be8791579d3887491942783988fc965c Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 17:57:42 +0800 Subject: [PATCH 05/18] add How to add a new exchange session --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index e3ea250f3..923eb51e5 100644 --- a/README.md +++ b/README.md @@ -260,6 +260,10 @@ streambook := types.NewStreamBook(symbol) streambook.BindStream(stream) ``` +## How To Add A New Exchange + +(TBD) + ## Telegram Integration - In telegram: @botFather From 4061f2aef046b15409f60f8e0f99e93ff2f4f0e3 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 17:58:10 +0800 Subject: [PATCH 06/18] add logic layer comment --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 923eb51e5..de84f75e1 100644 --- a/README.md +++ b/README.md @@ -232,7 +232,7 @@ then the following types could be injected automatically: 1. Load config from the config file. 2. Allocate and initialize exchange sessions. 3. Add exchange sessions to the environment (the data layer). -4. Use the given environment to initialize the trader object. +4. Use the given environment to initialize the trader object (the logic layer). 5. The trader initializes the environment and start the exchange connections. 6. Call strategy.Run() method sequentially. From e311a182faf1bc2b193b2d3099f442145d1cb090 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 18:04:03 +0800 Subject: [PATCH 07/18] add onStart callbacks --- pkg/bbgo/environment.go | 2 ++ pkg/types/standardstream_callbacks.go | 12 ++++++++++++ pkg/types/stream.go | 2 ++ 3 files changed, 16 insertions(+) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index d41262a5b..dae4686d1 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -489,6 +489,8 @@ func (environ *Environment) Connect(ctx context.Context) error { if err := session.Stream.Connect(ctx); err != nil { return err } + + session.Stream.EmitStart() } return nil diff --git a/pkg/types/standardstream_callbacks.go b/pkg/types/standardstream_callbacks.go index 7777aba72..1d779564d 100644 --- a/pkg/types/standardstream_callbacks.go +++ b/pkg/types/standardstream_callbacks.go @@ -4,6 +4,16 @@ package types import () +func (stream *StandardStream) OnStart(cb func()) { + stream.startCallbacks = append(stream.startCallbacks, cb) +} + +func (stream *StandardStream) EmitStart() { + for _, cb := range stream.startCallbacks { + cb() + } +} + func (stream *StandardStream) OnConnect(cb func()) { stream.connectCallbacks = append(stream.connectCallbacks, cb) } @@ -105,6 +115,8 @@ func (stream *StandardStream) EmitBookSnapshot(book OrderBook) { } type StandardStreamEventHub interface { + OnStart(cb func()) + OnConnect(cb func()) OnDisconnect(cb func()) diff --git a/pkg/types/stream.go b/pkg/types/stream.go index 5a4f35423..c6a727c6e 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -23,6 +23,8 @@ var KLineChannel = Channel("kline") type StandardStream struct { Subscriptions []Subscription + startCallbacks []func() + connectCallbacks []func() disconnectCallbacks []func() From 2bf4a555ec6199122723254b69a7d50397df2472 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 18:04:55 +0800 Subject: [PATCH 08/18] use OnStart instead of OnConnect this is for avoiding re-connect issue --- pkg/strategy/grid/strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index dc4964d59..5a811c7dc 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -367,7 +367,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se }) session.Stream.OnTradeUpdate(s.tradeUpdateHandler) - session.Stream.OnConnect(func() { + session.Stream.OnStart(func() { s.placeGridOrders(orderExecutor, session) }) From 46c59f500915d40452fd1fdb7d7e9040828e1e76 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 18:09:55 +0800 Subject: [PATCH 09/18] add both side and support json unmarshalling --- pkg/types/side.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/types/side.go b/pkg/types/side.go index a24a6d508..dcf2e4983 100644 --- a/pkg/types/side.go +++ b/pkg/types/side.go @@ -1,5 +1,10 @@ package types +import ( + "encoding/json" + "strings" +) + // SideType define side type of order type SideType string @@ -7,8 +12,33 @@ const ( SideTypeBuy = SideType("BUY") SideTypeSell = SideType("SELL") SideTypeSelf = SideType("SELF") + + // SideTypeBoth is only used for the configuration context + SideTypeBoth = SideType("BOTH") ) +func (side *SideType) UnmarshalJSON(data []byte) (err error) { + var s string + err = json.Unmarshal(data, &s) + if err != nil { + return err + } + + switch strings.ToLower(s) { + case "buy": + *side = SideTypeBuy + + case "sell": + *side = SideTypeSell + + case "both": + *side = SideTypeBoth + + } + + return err +} + func (side SideType) Reverse() SideType { switch side { case SideTypeBuy: From c3996aee2b265f29d47926a8c257ef0934606650 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 15 Mar 2021 18:25:36 +0800 Subject: [PATCH 10/18] add Backup method to the local active order book --- pkg/bbgo/active_book.go | 4 ++++ pkg/types/order.go | 8 ++++++++ pkg/types/ordermap.go | 14 ++++++++++++++ 3 files changed, 26 insertions(+) diff --git a/pkg/bbgo/active_book.go b/pkg/bbgo/active_book.go index b0fd94104..662b60eb5 100644 --- a/pkg/bbgo/active_book.go +++ b/pkg/bbgo/active_book.go @@ -22,6 +22,10 @@ func NewLocalActiveOrderBook() *LocalActiveOrderBook { } } +func (b *LocalActiveOrderBook) Backup() []types.SubmitOrder { + return append(b.Bids.Backup(), b.Asks.Backup()...) +} + func (b *LocalActiveOrderBook) BindStream(stream types.Stream) { stream.OnOrderUpdate(b.orderUpdateHandler) } diff --git a/pkg/types/order.go b/pkg/types/order.go index b5f1e3c2c..6abf47b4a 100644 --- a/pkg/types/order.go +++ b/pkg/types/order.go @@ -155,6 +155,14 @@ type Order struct { IsIsolated bool `json:"isIsolated" db:"is_isolated"` } +// Backup backs up the current order quantity to a SubmitOrder object +// so that we can post the order later when we want to restore the orders. +func (o Order) Backup() SubmitOrder { + so := o.SubmitOrder + so.Quantity = o.Quantity - o.ExecutedQuantity + return so +} + func (o Order) String() string { return fmt.Sprintf("order %s %f/%f at %f -> %s", o.Side, o.ExecutedQuantity, o.Quantity, o.Price, o.Status) } diff --git a/pkg/types/ordermap.go b/pkg/types/ordermap.go index d021f2652..401bf482b 100644 --- a/pkg/types/ordermap.go +++ b/pkg/types/ordermap.go @@ -5,6 +5,14 @@ import "sync" // OrderMap is used for storing orders by their order id type OrderMap map[uint64]Order +func (m OrderMap) Backup() (orderForms []SubmitOrder) { + for _, order := range m { + orderForms = append(orderForms, order.Backup()) + } + + return orderForms +} + func (m OrderMap) Add(o Order) { m[o.OrderID] = o } @@ -70,6 +78,12 @@ func NewSyncOrderMap() *SyncOrderMap { } } +func (m *SyncOrderMap) Backup() []SubmitOrder { + m.Lock() + defer m.Unlock() + return m.orders.Backup() +} + func (m *SyncOrderMap) Remove(orderID uint64) (exists bool) { m.Lock() defer m.Unlock() From 7951c38edc94768853a0298d8c4ef55e00d2cd7b Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 01:31:56 +0800 Subject: [PATCH 11/18] skip connection if there is no subscription --- pkg/bbgo/environment.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index dae4686d1..c1f83f7aa 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -476,7 +476,8 @@ func (environ *Environment) Connect(ctx context.Context) error { var logger = log.WithField("session", n) if len(session.Subscriptions) == 0 { - logger.Warnf("exchange session %s has no subscriptions", session.Name) + logger.Warnf("exchange session %s has no subscriptions, skipping", session.Name) + continue } else { // add the subscribe requests to the stream for _, s := range session.Subscriptions { From 2f7c7d344b6e1ef6f138adf31cfa447b431bcb09 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 01:32:27 +0800 Subject: [PATCH 12/18] move emitStart method call into the stream Connect method --- pkg/bbgo/environment.go | 2 -- pkg/exchange/binance/stream.go | 2 ++ pkg/exchange/max/stream.go | 8 +++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index c1f83f7aa..60fe162bc 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -490,8 +490,6 @@ func (environ *Environment) Connect(ctx context.Context) error { if err := session.Stream.Connect(ctx); err != nil { return err } - - session.Stream.EmitStart() } return nil diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 1d32ce6e8..b7bb15b5a 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -310,6 +310,8 @@ func (s *Stream) Connect(ctx context.Context) error { } go s.read(ctx) + + s.EmitStart() return nil } diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index c179f37c7..212c71c29 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -171,7 +171,13 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.S } func (s *Stream) Connect(ctx context.Context) error { - return s.websocketService.Connect(ctx) + err := s.websocketService.Connect(ctx) + if err != nil { + return err + } + + s.EmitStart() + return nil } func (s *Stream) Close() error { From c5eb6483a5cdddb9ddb24631ccdfcb1f188df672 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 02:13:52 +0800 Subject: [PATCH 13/18] integrate QueryTicker for backtesting --- pkg/backtest/exchange.go | 18 ++++++++++++++++-- pkg/backtest/matching.go | 2 ++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/backtest/exchange.go b/pkg/backtest/exchange.go index 6689f9fef..80e8e87f1 100644 --- a/pkg/backtest/exchange.go +++ b/pkg/backtest/exchange.go @@ -211,8 +211,22 @@ func (e Exchange) QueryTrades(ctx context.Context, symbol string, options *types } func (e Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) { - // Not using Tickers in back test (yet) - return nil, ErrUnimplemented + matching, ok := e.matchingBooks[symbol] + if !ok { + return nil, fmt.Errorf("matching engine is not initialized for symbol %s", symbol) + } + + kline := matching.LastKLine + return &types.Ticker{ + Time: kline.EndTime, + Volume: kline.Volume, + Last: kline.Close, + Open: kline.Open, + High: kline.High, + Low: kline.Low, + Buy: kline.Close, + Sell: kline.Close, + }, nil } func (e Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) { diff --git a/pkg/backtest/matching.go b/pkg/backtest/matching.go index fb963d806..0ad6adb83 100644 --- a/pkg/backtest/matching.go +++ b/pkg/backtest/matching.go @@ -42,6 +42,7 @@ type SimplePriceMatching struct { askOrders []types.Order LastPrice fixedpoint.Value + LastKLine types.KLine CurrentTime time.Time Account *types.Account @@ -400,6 +401,7 @@ func (m *SimplePriceMatching) SellToPrice(price fixedpoint.Value) (closedOrders func (m *SimplePriceMatching) processKLine(kline types.KLine) { m.CurrentTime = kline.EndTime + m.LastKLine = kline switch kline.Direction() { case types.DirectionDown: From 40fded70b240156dc33715884d697121cabb5041 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 02:14:10 +0800 Subject: [PATCH 14/18] reformat scale.go --- pkg/bbgo/scale.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/bbgo/scale.go b/pkg/bbgo/scale.go index 694b01af8..3d014e659 100644 --- a/pkg/bbgo/scale.go +++ b/pkg/bbgo/scale.go @@ -238,7 +238,6 @@ func (rule *SlideRule) Scale() (Scale, error) { return nil, errors.New("no any scale is defined") } - // PriceVolumeScale defines the scale DSL for strategy, e.g., // // scaleQuantity: @@ -282,7 +281,7 @@ func (q *PriceVolumeScale) ScaleByPrice(price float64) (float64, error) { return 0, err } - if err := scale.Solve() ; err != nil { + if err := scale.Solve(); err != nil { return 0, err } @@ -300,7 +299,7 @@ func (q *PriceVolumeScale) ScaleByVolume(volume float64) (float64, error) { return 0, err } - if err := scale.Solve() ; err != nil { + if err := scale.Solve(); err != nil { return 0, err } From 60aa7df69ac430cf260238aac359340225e0cdd4 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 02:14:24 +0800 Subject: [PATCH 15/18] adjust withdraw/deposit query limit since there are no many in most cases --- pkg/service/deposit.go | 3 +-- pkg/service/withdraw.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/service/deposit.go b/pkg/service/deposit.go index 2a066daf0..aff2bf77d 100644 --- a/pkg/service/deposit.go +++ b/pkg/service/deposit.go @@ -18,7 +18,7 @@ func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error { txnIDs := map[string]struct{}{} // query descending - records, err := s.QueryLast(ex.Name(), 100) + records, err := s.QueryLast(ex.Name(), 10) if err != nil { return err } @@ -56,7 +56,6 @@ func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error { return nil } - func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) { sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ diff --git a/pkg/service/withdraw.go b/pkg/service/withdraw.go index 4a84a80dc..bcf8ec5b3 100644 --- a/pkg/service/withdraw.go +++ b/pkg/service/withdraw.go @@ -18,7 +18,7 @@ func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error { txnIDs := map[string]struct{}{} // query descending - records, err := s.QueryLast(ex.Name(), 100) + records, err := s.QueryLast(ex.Name(), 10) if err != nil { return err } From f56df038aa4325129b79850c03cfef0b0ddbe3ce Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 02:18:17 +0800 Subject: [PATCH 16/18] fix position and add catchup mode for grid strategy --- pkg/strategy/grid/strategy.go | 145 ++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 31 deletions(-) diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index 5a811c7dc..5ffda93a1 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -56,25 +56,38 @@ type Strategy struct { LowerPrice fixedpoint.Value `json:"lowerPrice" yaml:"lowerPrice"` // Quantity is the quantity you want to submit for each order. - Quantity fixedpoint.Value `json:"quantity,omitempty"` + Quantity fixedpoint.Value `json:"quantity,omitempty"` + + // ScaleQuantity helps user to define the quantity by price scale or volume scale ScaleQuantity *bbgo.PriceVolumeScale `json:"scaleQuantity,omitempty"` // FixedAmount is used for fixed amount (dynamic quantity) if you don't want to use fixed quantity. FixedAmount fixedpoint.Value `json:"amount,omitempty" yaml:"amount"` + // Side is the initial maker orders side. defaults to "both" + Side types.SideType `json:"side" yaml:"side"` + + // CatchUp let the maker grid catch up with the price change. + CatchUp bool `json:"catchUp" yaml:"catchUp"` + // Long means you want to hold more base asset than the quote asset. Long bool `json:"long,omitempty" yaml:"long,omitempty"` + filledBuyGrids map[fixedpoint.Value]struct{} + filledSellGrids map[fixedpoint.Value]struct{} + + // orderStore is used to store all the created orders, so that we can filter the trades. orderStore *bbgo.OrderStore // activeOrders is the locally maintained active order book of the maker orders. activeOrders *bbgo.LocalActiveOrderBook - position fixedpoint.Value + position bbgo.Position // any created orders for tracking trades orders map[uint64]types.Order + // groupID is the group ID used for the strategy instance for canceling orders groupID int64 } @@ -141,6 +154,11 @@ func (s *Strategy) generateGridSellOrders(session *bbgo.ExchangeSession) ([]type baseBalance.Available.Float64()) } + if _, filled := s.filledSellGrids[price]; filled { + log.Infof("sell grid at price %f is already filled, skipping", price.Float64()) + continue + } + orders = append(orders, types.SubmitOrder{ Symbol: s.Symbol, Side: types.SideTypeSell, @@ -152,12 +170,15 @@ func (s *Strategy) generateGridSellOrders(session *bbgo.ExchangeSession) ([]type GroupID: s.groupID, }) baseBalance.Available -= quantity + + s.filledSellGrids[price] = struct{}{} } return orders, nil } func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types.SubmitOrder, error) { + // session.Exchange.QueryTicker() currentPriceFloat, ok := session.LastPrice(s.Symbol) if !ok { return nil, fmt.Errorf("%s last price not found, skipping", s.Symbol) @@ -217,6 +238,11 @@ func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types quoteQuantity.Float64()) } + if _, filled := s.filledBuyGrids[price]; filled { + log.Infof("buy grid at price %f is already filled, skipping", price.Float64()) + continue + } + orders = append(orders, types.SubmitOrder{ Symbol: s.Symbol, Side: types.SideTypeBuy, @@ -228,40 +254,74 @@ func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types GroupID: s.groupID, }) balance.Available -= quoteQuantity + + s.filledBuyGrids[price] = struct{}{} } return orders, nil } +func (s *Strategy) placeGridSellOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { + orderForms, err := s.generateGridSellOrders(session) + if err != nil { + return err + } + + if len(orderForms) > 0 { + createdOrders, err := orderExecutor.SubmitOrders(context.Background(), orderForms...) + if err != nil { + return err + } + + s.activeOrders.Add(createdOrders...) + } + + return nil +} + +func (s *Strategy) placeGridBuyOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { + orderForms, err := s.generateGridBuyOrders(session) + if err != nil { + return err + } + + if len(orderForms) > 0 { + createdOrders, err := orderExecutor.SubmitOrders(context.Background(), orderForms...) + if err != nil { + return err + } else { + s.activeOrders.Add(createdOrders...) + } + } + + return nil +} + func (s *Strategy) placeGridOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) { log.Infof("placing grid orders...") - sellOrders, err := s.generateGridSellOrders(session) - if err != nil { - log.Warn(err.Error()) - } - if len(sellOrders) > 0 { - createdSellOrders, err := orderExecutor.SubmitOrders(context.Background(), sellOrders...) - if err != nil { - log.WithError(err).Error(err.Error()) - } else { - s.activeOrders.Add(createdSellOrders...) + switch s.Side { + + case types.SideTypeBuy: + if err := s.placeGridBuyOrders(orderExecutor, session); err != nil { + log.Warn(err.Error()) + } + + case types.SideTypeSell: + if err := s.placeGridSellOrders(orderExecutor, session); err != nil { + log.Warn(err.Error()) + } + + case types.SideTypeBoth: + if err := s.placeGridSellOrders(orderExecutor, session); err != nil { + log.Warn(err.Error()) + } + + if err := s.placeGridBuyOrders(orderExecutor, session); err != nil { + log.Warn(err.Error()) } } - buyOrders, err := s.generateGridBuyOrders(session) - if err != nil { - log.Warn(err.Error()) - } - - if len(buyOrders) > 0 { - createdBuyOrders, err := orderExecutor.SubmitOrders(context.Background(), buyOrders...) - if err != nil { - log.WithError(err).Error(err.Error()) - } else { - s.activeOrders.Add(createdBuyOrders...) - } - } } func (s *Strategy) tradeUpdateHandler(trade types.Trade) { @@ -271,11 +331,14 @@ func (s *Strategy) tradeUpdateHandler(trade types.Trade) { if s.orderStore.Exists(trade.OrderID) { log.Infof("received trade update of order %d: %+v", trade.OrderID, trade) - switch trade.Side { - case types.SideTypeBuy: - s.position.AtomicAdd(fixedpoint.NewFromFloat(trade.Quantity)) - case types.SideTypeSell: - s.position.AtomicAdd(-fixedpoint.NewFromFloat(trade.Quantity)) + + if trade.Side == types.SideTypeSelf { + return + } + + profit, madeProfit := s.position.AddTrade(trade) + if madeProfit { + s.Notify("profit: %f", profit.Float64()) } } } @@ -333,10 +396,17 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.GridNum = 10 } + if s.Side == "" { + s.Side = types.SideTypeBoth + } + if s.UpperPrice <= s.LowerPrice { return fmt.Errorf("upper price (%f) should not be less than lower price (%f)", s.UpperPrice.Float64(), s.LowerPrice.Float64()) } + s.filledBuyGrids = make(map[fixedpoint.Value]struct{}) + s.filledSellGrids = make(map[fixedpoint.Value]struct{}) + position, ok := session.Position(s.Symbol) if !ok { return fmt.Errorf("position not found") @@ -344,6 +414,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se log.Infof("position: %+v", position) + s.position = bbgo.Position{ + Symbol: s.Symbol, + BaseCurrency: s.Market.BaseCurrency, + QuoteCurrency: s.Market.QuoteCurrency, + } + instanceID := fmt.Sprintf("grid-%s-%d", s.Symbol, s.GridNum) s.groupID = generateGroupID(instanceID) log.Infof("using group id %d from fnv(%s)", s.groupID, instanceID) @@ -360,12 +436,19 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se defer wg.Done() log.Infof("canceling active orders...") - if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil { log.WithError(err).Errorf("cancel order error") } }) + if s.CatchUp { + session.Stream.OnKLineClosed(func(kline types.KLine) { + log.Infof("catchUp mode is enabled, updating grid orders...") + // update grid + s.placeGridOrders(orderExecutor, session) + }) + } + session.Stream.OnTradeUpdate(s.tradeUpdateHandler) session.Stream.OnStart(func() { s.placeGridOrders(orderExecutor, session) From 98995bc75c8089efc0e02f389c0efa0023e5e796 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 02:21:46 +0800 Subject: [PATCH 17/18] use debug log for skipping filled grid --- pkg/strategy/grid/strategy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index 5ffda93a1..96f69f93c 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -155,7 +155,7 @@ func (s *Strategy) generateGridSellOrders(session *bbgo.ExchangeSession) ([]type } if _, filled := s.filledSellGrids[price]; filled { - log.Infof("sell grid at price %f is already filled, skipping", price.Float64()) + log.Debugf("sell grid at price %f is already filled, skipping", price.Float64()) continue } @@ -239,7 +239,7 @@ func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types } if _, filled := s.filledBuyGrids[price]; filled { - log.Infof("buy grid at price %f is already filled, skipping", price.Float64()) + log.Debugf("buy grid at price %f is already filled, skipping", price.Float64()) continue } From 478bef526d9216c706a758fef6cf7ff015f10709 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 02:22:00 +0800 Subject: [PATCH 18/18] copy the position object and send notification --- pkg/strategy/grid/strategy.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index 96f69f93c..07d233df2 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -412,13 +412,9 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se return fmt.Errorf("position not found") } - log.Infof("position: %+v", position) + s.position = *position - s.position = bbgo.Position{ - Symbol: s.Symbol, - BaseCurrency: s.Market.BaseCurrency, - QuoteCurrency: s.Market.QuoteCurrency, - } + s.Notify("current position %+v", position) instanceID := fmt.Sprintf("grid-%s-%d", s.Symbol, s.GridNum) s.groupID = generateGroupID(instanceID)