From efec21ca4babd836917be533baa504a4084c4fae Mon Sep 17 00:00:00 2001 From: zenix Date: Fri, 18 Mar 2022 17:17:04 +0900 Subject: [PATCH 1/2] feature: add market trade subscription in binance --- pkg/bbgo/session.go | 4 ++ pkg/exchange/binance/exchange.go | 8 ++-- pkg/exchange/binance/exchange_test.go | 4 +- pkg/exchange/binance/parse.go | 61 +++++++++++++++++++++++- pkg/exchange/binance/stream.go | 9 ++++ pkg/exchange/binance/stream_callbacks.go | 12 +++++ pkg/types/channel.go | 1 + pkg/types/exchange.go | 2 + pkg/types/standardstream_callbacks.go | 12 +++++ pkg/types/stream.go | 2 + 10 files changed, 108 insertions(+), 7 deletions(-) diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 525664872..f7e67bfda 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -358,6 +358,10 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) session.lastPrices[kline.Symbol] = kline.Close }) + session.MarketDataStream.OnMarketTrade(func(trade types.Trade) { + session.lastPrices[trade.Symbol] = trade.Price + }) + session.IsInitialized = true return nil } diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 31ead73dc..36bc38c60 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -471,10 +471,10 @@ func (e *Exchange) QuerySpotAccount(ctx context.Context) (*types.Account, error) } a := &types.Account{ - AccountType: types.AccountTypeSpot, - CanDeposit: account.CanDeposit, // if can transfer in asset - CanTrade: account.CanTrade, // if can trade - CanWithdraw: account.CanWithdraw, // if can transfer out asset + AccountType: types.AccountTypeSpot, + CanDeposit: account.CanDeposit, // if can transfer in asset + CanTrade: account.CanTrade, // if can trade + CanWithdraw: account.CanWithdraw, // if can transfer out asset } a.UpdateBalances(balances) return a, nil diff --git a/pkg/exchange/binance/exchange_test.go b/pkg/exchange/binance/exchange_test.go index 9913c6de6..bca405a82 100644 --- a/pkg/exchange/binance/exchange_test.go +++ b/pkg/exchange/binance/exchange_test.go @@ -10,8 +10,8 @@ import ( func Test_newClientOrderID(t *testing.T) { cID := newSpotClientOrderID("") assert.Len(t, cID, 32) - strings.HasPrefix(cID, "x-" + spotBrokerID) + strings.HasPrefix(cID, "x-"+spotBrokerID) cID = newSpotClientOrderID("myid1") - assert.Equal(t, cID, "x-" + spotBrokerID + "myid1") + assert.Equal(t, cID, "x-"+spotBrokerID+"myid1") } diff --git a/pkg/exchange/binance/parse.go b/pkg/exchange/binance/parse.go index 2f88a1db2..c89a3c876 100644 --- a/pkg/exchange/binance/parse.go +++ b/pkg/exchange/binance/parse.go @@ -121,7 +121,7 @@ func (e *ExecutionReportEvent) Order() (*types.Order, error) { Price: e.OrderPrice, TimeInForce: types.TimeInForce(e.TimeInForce), }, - Exchange: types.ExchangeBinance, + Exchange: types.ExchangeBinance, IsWorking: e.IsOnBook, OrderID: uint64(e.OrderID), Status: toGlobalOrderStatus(binance.OrderStatusType(e.CurrentOrderStatus)), @@ -337,6 +337,11 @@ func parseWebSocketEvent(message []byte) (interface{}, error) { err := json.Unmarshal([]byte(message), &event) return &event, err + case "trade": + var event MarketTradeEvent + err := json.Unmarshal([]byte(message), &event) + return &event, err + default: id := val.GetInt("id") if id > 0 { @@ -462,6 +467,60 @@ func parseDepthEvent(val *fastjson.Value) (*DepthEvent, error) { return depth, err } +type MarketTradeEvent struct { + EventBase + Symbol string `json:"s"` + Quantity fixedpoint.Value `json:"q"` + Price fixedpoint.Value `json:"p"` + + BuyerOrderId int64 `json:"b"` + SellerOrderId int64 `json:"a"` + + OrderTradeTime int64 `json:"T"` + TradeId int64 `json:"t"` + + IsMaker bool `json:"m"` +} + +/* + +market trade + +{ + "e": "trade", // Event type + "E": 123456789, // Event time + "s": "BNBBTC", // Symbol + "t": 12345, // Trade ID + "p": "0.001", // Price + "q": "100", // Quantity + "b": 88, // Buyer order ID + "a": 50, // Seller order ID + "T": 123456785, // Trade time + "m": true, // Is the buyer the market maker? + "M": true // Ignore +} + +*/ + +func (e *MarketTradeEvent) Trade() types.Trade { + tt := time.Unix(0, e.OrderTradeTime*int64(time.Millisecond)) + return types.Trade{ + ID: uint64(e.TradeId), + Exchange: types.ExchangeBinance, + Symbol: e.Symbol, + OrderID: uint64(e.BuyerOrderId), + Side: types.SideTypeBoth, + Price: e.Price, + Quantity: e.Quantity, + QuoteQuantity: e.Quantity, + IsBuyer: true, + IsMaker: e.IsMaker, + Time: types.Time(tt), + Fee: fixedpoint.Zero, + FeeCurrency: "", + } +} + type KLine struct { StartTime int64 `json:"t"` EndTime int64 `json:"T"` diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 17769ae5a..54e3754a2 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -47,6 +47,7 @@ type Stream struct { kLineClosedEventCallbacks []func(e *KLineEvent) markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent) + marketTradeEventCallbacks []func(e *MarketTradeEvent) continuousKLineEventCallbacks []func(e *ContinuousKLineEvent) continuousKLineClosedEventCallbacks []func(e *ContinuousKLineEvent) @@ -116,6 +117,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie stream.OnBookTickerEvent(stream.handleBookTickerEvent) stream.OnExecutionReportEvent(stream.handleExecutionReportEvent) stream.OnContinuousKLineEvent(stream.handleContinuousKLineEvent) + stream.OnMarketTradeEvent(stream.handleMarketTradeEvent) // Event type ACCOUNT_UPDATE from user data stream updates Balance and FuturesPosition. stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent) @@ -207,6 +209,10 @@ func (s *Stream) handleBookTickerEvent(e *BookTickerEvent) { s.EmitBookTickerUpdate(e.BookTicker()) } +func (s *Stream) handleMarketTradeEvent(e *MarketTradeEvent) { + s.EmitMarketTrade(e.Trade()) +} + func (s *Stream) handleKLineEvent(e *KLineEvent) { kline := e.KLine.KLine() if e.KLine.Closed { @@ -317,6 +323,9 @@ func (s *Stream) dispatchEvent(e interface{}) { case *BalanceUpdateEvent: s.EmitBalanceUpdateEvent(e) + case *MarketTradeEvent: + s.EmitMarketTradeEvent(e) + case *KLineEvent: s.EmitKLineEvent(e) diff --git a/pkg/exchange/binance/stream_callbacks.go b/pkg/exchange/binance/stream_callbacks.go index d8886e469..9f9d3cea9 100644 --- a/pkg/exchange/binance/stream_callbacks.go +++ b/pkg/exchange/binance/stream_callbacks.go @@ -44,6 +44,16 @@ func (s *Stream) EmitMarkPriceUpdateEvent(e *MarkPriceUpdateEvent) { } } +func (s *Stream) OnMarketTradeEvent(cb func(e *MarketTradeEvent)) { + s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb) +} + +func (s *Stream) EmitMarketTradeEvent(e *MarketTradeEvent) { + for _, cb := range s.marketTradeEventCallbacks { + cb(e) + } +} + func (s *Stream) OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent)) { s.continuousKLineEventCallbacks = append(s.continuousKLineEventCallbacks, cb) } @@ -153,6 +163,8 @@ type StreamEventHub interface { OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent)) + OnMarketTradeEvent(cb func(e *MarketTradeEvent)) + OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent)) OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent)) diff --git a/pkg/types/channel.go b/pkg/types/channel.go index 55aac460d..23a90fc1e 100644 --- a/pkg/types/channel.go +++ b/pkg/types/channel.go @@ -5,3 +5,4 @@ type Channel string var BookChannel = Channel("book") var KLineChannel = Channel("kline") var BookTickerChannel = Channel("bookticker") +var MarketTradeChannel = Channel("trade") diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 0a91d9861..2abbfec3e 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -60,6 +60,8 @@ func ValidExchangeName(a string) (ExchangeName, error) { return ExchangeFTX, nil case "okex": return ExchangeOKEx, nil + case "kucoin": + return ExchangeKucoin, nil } return "", fmt.Errorf("invalid exchange name: %s", a) diff --git a/pkg/types/standardstream_callbacks.go b/pkg/types/standardstream_callbacks.go index ab35e07e7..19fd47690 100644 --- a/pkg/types/standardstream_callbacks.go +++ b/pkg/types/standardstream_callbacks.go @@ -124,6 +124,16 @@ func (s *StandardStream) EmitBookSnapshot(book SliceOrderBook) { } } +func (s *StandardStream) OnMarketTrade(cb func(trade Trade)) { + s.marketTradeCallbacks = append(s.marketTradeCallbacks, cb) +} + +func (s *StandardStream) EmitMarketTrade(trade Trade) { + for _, cb := range s.marketTradeCallbacks { + cb(trade) + } +} + func (s *StandardStream) OnFuturesPositionUpdate(cb func(futuresPositions FuturesPositionMap)) { s.FuturesPositionUpdateCallbacks = append(s.FuturesPositionUpdateCallbacks, cb) } @@ -169,6 +179,8 @@ type StandardStreamEventHub interface { OnBookSnapshot(cb func(book SliceOrderBook)) + OnMarketTrade(cb func(trade Trade)) + OnFuturesPositionUpdate(cb func(futuresPositions FuturesPositionMap)) OnFuturesPositionSnapshot(cb func(futuresPositions FuturesPositionMap)) diff --git a/pkg/types/stream.go b/pkg/types/stream.go index c31ab510b..8d90cd479 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -96,6 +96,8 @@ type StandardStream struct { bookSnapshotCallbacks []func(book SliceOrderBook) + marketTradeCallbacks []func(trade Trade) + // Futures FuturesPositionUpdateCallbacks []func(futuresPositions FuturesPositionMap) From abbe04fae99e820b24617e36827ebf854119d161 Mon Sep 17 00:00:00 2001 From: zenix Date: Fri, 18 Mar 2022 18:49:59 +0900 Subject: [PATCH 2/2] fix: parse market trade as taker trade --- pkg/exchange/binance/parse.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/exchange/binance/parse.go b/pkg/exchange/binance/parse.go index c89a3c876..48527891d 100644 --- a/pkg/exchange/binance/parse.go +++ b/pkg/exchange/binance/parse.go @@ -480,6 +480,7 @@ type MarketTradeEvent struct { TradeId int64 `json:"t"` IsMaker bool `json:"m"` + Dummy bool `json:"M"` } /* @@ -504,16 +505,28 @@ market trade func (e *MarketTradeEvent) Trade() types.Trade { tt := time.Unix(0, e.OrderTradeTime*int64(time.Millisecond)) + var orderId int64 + var side types.SideType + var isBuyer bool + if e.IsMaker { + orderId = e.SellerOrderId // seller is taker + side = types.SideTypeSell + isBuyer = false + } else { + orderId = e.BuyerOrderId // buyer is taker + side = types.SideTypeBuy + isBuyer = true + } return types.Trade{ ID: uint64(e.TradeId), Exchange: types.ExchangeBinance, Symbol: e.Symbol, - OrderID: uint64(e.BuyerOrderId), - Side: types.SideTypeBoth, + OrderID: uint64(orderId), + Side: side, Price: e.Price, Quantity: e.Quantity, QuoteQuantity: e.Quantity, - IsBuyer: true, + IsBuyer: isBuyer, IsMaker: e.IsMaker, Time: types.Time(tt), Fee: fixedpoint.Zero,