Merge pull request #489 from zenixls2/feature/market_trade

feature: add market trade subscription in binance
This commit is contained in:
Yo-An Lin 2022-03-22 20:18:39 +08:00 committed by GitHub
commit ae4a3d81fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 117 additions and 3 deletions

View File

@ -358,6 +358,10 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
session.lastPrices[kline.Symbol] = kline.Close
})
session.MarketDataStream.OnMarketTrade(func(trade types.Trade) {
session.lastPrices[trade.Symbol] = trade.Price
})
session.IsInitialized = true
return nil
}

View File

@ -10,8 +10,8 @@ import (
func Test_newClientOrderID(t *testing.T) {
cID := newSpotClientOrderID("")
assert.Len(t, cID, 32)
strings.HasPrefix(cID, "x-" + spotBrokerID)
strings.HasPrefix(cID, "x-"+spotBrokerID)
cID = newSpotClientOrderID("myid1")
assert.Equal(t, cID, "x-" + spotBrokerID + "myid1")
assert.Equal(t, cID, "x-"+spotBrokerID+"myid1")
}

View File

@ -121,7 +121,7 @@ func (e *ExecutionReportEvent) Order() (*types.Order, error) {
Price: e.OrderPrice,
TimeInForce: types.TimeInForce(e.TimeInForce),
},
Exchange: types.ExchangeBinance,
Exchange: types.ExchangeBinance,
IsWorking: e.IsOnBook,
OrderID: uint64(e.OrderID),
Status: toGlobalOrderStatus(binance.OrderStatusType(e.CurrentOrderStatus)),
@ -337,6 +337,11 @@ func parseWebSocketEvent(message []byte) (interface{}, error) {
err := json.Unmarshal([]byte(message), &event)
return &event, err
case "trade":
var event MarketTradeEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
default:
id := val.GetInt("id")
if id > 0 {
@ -462,6 +467,73 @@ func parseDepthEvent(val *fastjson.Value) (*DepthEvent, error) {
return depth, err
}
type MarketTradeEvent struct {
EventBase
Symbol string `json:"s"`
Quantity fixedpoint.Value `json:"q"`
Price fixedpoint.Value `json:"p"`
BuyerOrderId int64 `json:"b"`
SellerOrderId int64 `json:"a"`
OrderTradeTime int64 `json:"T"`
TradeId int64 `json:"t"`
IsMaker bool `json:"m"`
Dummy bool `json:"M"`
}
/*
market trade
{
"e": "trade", // Event type
"E": 123456789, // Event time
"s": "BNBBTC", // Symbol
"t": 12345, // Trade ID
"p": "0.001", // Price
"q": "100", // Quantity
"b": 88, // Buyer order ID
"a": 50, // Seller order ID
"T": 123456785, // Trade time
"m": true, // Is the buyer the market maker?
"M": true // Ignore
}
*/
func (e *MarketTradeEvent) Trade() types.Trade {
tt := time.Unix(0, e.OrderTradeTime*int64(time.Millisecond))
var orderId int64
var side types.SideType
var isBuyer bool
if e.IsMaker {
orderId = e.SellerOrderId // seller is taker
side = types.SideTypeSell
isBuyer = false
} else {
orderId = e.BuyerOrderId // buyer is taker
side = types.SideTypeBuy
isBuyer = true
}
return types.Trade{
ID: uint64(e.TradeId),
Exchange: types.ExchangeBinance,
Symbol: e.Symbol,
OrderID: uint64(orderId),
Side: side,
Price: e.Price,
Quantity: e.Quantity,
QuoteQuantity: e.Quantity,
IsBuyer: isBuyer,
IsMaker: e.IsMaker,
Time: types.Time(tt),
Fee: fixedpoint.Zero,
FeeCurrency: "",
}
}
type KLine struct {
StartTime int64 `json:"t"`
EndTime int64 `json:"T"`

View File

@ -47,6 +47,7 @@ type Stream struct {
kLineClosedEventCallbacks []func(e *KLineEvent)
markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent)
marketTradeEventCallbacks []func(e *MarketTradeEvent)
continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)
continuousKLineClosedEventCallbacks []func(e *ContinuousKLineEvent)
@ -116,6 +117,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
stream.OnBookTickerEvent(stream.handleBookTickerEvent)
stream.OnExecutionReportEvent(stream.handleExecutionReportEvent)
stream.OnContinuousKLineEvent(stream.handleContinuousKLineEvent)
stream.OnMarketTradeEvent(stream.handleMarketTradeEvent)
// Event type ACCOUNT_UPDATE from user data stream updates Balance and FuturesPosition.
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
@ -207,6 +209,10 @@ func (s *Stream) handleBookTickerEvent(e *BookTickerEvent) {
s.EmitBookTickerUpdate(e.BookTicker())
}
func (s *Stream) handleMarketTradeEvent(e *MarketTradeEvent) {
s.EmitMarketTrade(e.Trade())
}
func (s *Stream) handleKLineEvent(e *KLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
@ -317,6 +323,9 @@ func (s *Stream) dispatchEvent(e interface{}) {
case *BalanceUpdateEvent:
s.EmitBalanceUpdateEvent(e)
case *MarketTradeEvent:
s.EmitMarketTradeEvent(e)
case *KLineEvent:
s.EmitKLineEvent(e)

View File

@ -44,6 +44,16 @@ func (s *Stream) EmitMarkPriceUpdateEvent(e *MarkPriceUpdateEvent) {
}
}
func (s *Stream) OnMarketTradeEvent(cb func(e *MarketTradeEvent)) {
s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb)
}
func (s *Stream) EmitMarketTradeEvent(e *MarketTradeEvent) {
for _, cb := range s.marketTradeEventCallbacks {
cb(e)
}
}
func (s *Stream) OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent)) {
s.continuousKLineEventCallbacks = append(s.continuousKLineEventCallbacks, cb)
}
@ -153,6 +163,8 @@ type StreamEventHub interface {
OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent))
OnMarketTradeEvent(cb func(e *MarketTradeEvent))
OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent))
OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent))

View File

@ -5,3 +5,4 @@ type Channel string
var BookChannel = Channel("book")
var KLineChannel = Channel("kline")
var BookTickerChannel = Channel("bookticker")
var MarketTradeChannel = Channel("trade")

View File

@ -60,6 +60,8 @@ func ValidExchangeName(a string) (ExchangeName, error) {
return ExchangeFTX, nil
case "okex":
return ExchangeOKEx, nil
case "kucoin":
return ExchangeKucoin, nil
}
return "", fmt.Errorf("invalid exchange name: %s", a)

View File

@ -124,6 +124,16 @@ func (s *StandardStream) EmitBookSnapshot(book SliceOrderBook) {
}
}
func (s *StandardStream) OnMarketTrade(cb func(trade Trade)) {
s.marketTradeCallbacks = append(s.marketTradeCallbacks, cb)
}
func (s *StandardStream) EmitMarketTrade(trade Trade) {
for _, cb := range s.marketTradeCallbacks {
cb(trade)
}
}
func (s *StandardStream) OnFuturesPositionUpdate(cb func(futuresPositions FuturesPositionMap)) {
s.FuturesPositionUpdateCallbacks = append(s.FuturesPositionUpdateCallbacks, cb)
}
@ -169,6 +179,8 @@ type StandardStreamEventHub interface {
OnBookSnapshot(cb func(book SliceOrderBook))
OnMarketTrade(cb func(trade Trade))
OnFuturesPositionUpdate(cb func(futuresPositions FuturesPositionMap))
OnFuturesPositionSnapshot(cb func(futuresPositions FuturesPositionMap))

View File

@ -96,6 +96,8 @@ type StandardStream struct {
bookSnapshotCallbacks []func(book SliceOrderBook)
marketTradeCallbacks []func(trade Trade)
// Futures
FuturesPositionUpdateCallbacks []func(futuresPositions FuturesPositionMap)