Merge pull request #354 from austin362667/order-trade

binance: parse OrderTrade event stream & add futures client connection
This commit is contained in:
Yo-An Lin 2021-12-13 02:12:19 +08:00 committed by GitHub
commit a3215d6f31
5 changed files with 370 additions and 70 deletions

View File

@ -50,7 +50,6 @@ func toGlobalMarket(symbol binance.Symbol) types.Market {
return market
}
func toGlobalIsolatedUserAsset(userAsset binance.IsolatedUserAsset) types.IsolatedUserAsset {
return types.IsolatedUserAsset{
Asset: userAsset.Asset,
@ -137,10 +136,9 @@ func toGlobalTicker(stats *binance.PriceChangeStats) (*types.Ticker, error) {
Buy: util.MustParseFloat(stats.BidPrice),
Sell: util.MustParseFloat(stats.AskPrice),
Time: time.Unix(0, stats.CloseTime*int64(time.Millisecond)),
},nil
}, nil
}
func toLocalOrderType(orderType types.OrderType) (binance.OrderType, error) {
switch orderType {
@ -203,7 +201,7 @@ func millisecondTime(t int64) time.Time {
return time.Unix(0, t*int64(time.Millisecond))
}
func ToGlobalTrade(t binance.TradeV3, isMargin bool) (*types.Trade, error) {
func toGlobalTrade(t binance.TradeV3, isMargin bool) (*types.Trade, error) {
// skip trade ID that is the same. however this should not happen
var side types.SideType
if t.IsBuyer {
@ -270,6 +268,20 @@ func toGlobalSideType(side binance.SideType) types.SideType {
}
}
func toGlobalFuturesSideType(side futures.SideType) types.SideType {
switch side {
case futures.SideTypeBuy:
return types.SideTypeBuy
case futures.SideTypeSell:
return types.SideTypeSell
default:
log.Errorf("can not convert futures side type, unknown side type: %q", side)
return ""
}
}
func toGlobalOrderType(orderType binance.OrderType) types.OrderType {
switch orderType {
@ -292,6 +304,27 @@ func toGlobalOrderType(orderType binance.OrderType) types.OrderType {
}
}
func toGlobalFuturesOrderType(orderType futures.OrderType) types.OrderType {
switch orderType {
// TODO
case futures.OrderTypeLimit: // , futures.OrderTypeLimitMaker, futures.OrderTypeTakeProfitLimit:
return types.OrderTypeLimit
case futures.OrderTypeMarket:
return types.OrderTypeMarket
// TODO
// case futures.OrderTypeStopLossLimit:
// return types.OrderTypeStopLimit
// TODO
// case futures.OrderTypeStopLoss:
// return types.OrderTypeStopMarket
default:
log.Errorf("unsupported order type: %v", orderType)
return ""
}
}
func toGlobalOrderStatus(orderStatus binance.OrderStatusType) types.OrderStatus {
switch orderStatus {
case binance.OrderStatusTypeNew:
@ -313,10 +346,31 @@ func toGlobalOrderStatus(orderStatus binance.OrderStatusType) types.OrderStatus
return types.OrderStatus(orderStatus)
}
func toGlobalFuturesOrderStatus(orderStatus futures.OrderStatusType) types.OrderStatus {
switch orderStatus {
case futures.OrderStatusTypeNew:
return types.OrderStatusNew
case futures.OrderStatusTypeRejected:
return types.OrderStatusRejected
case futures.OrderStatusTypeCanceled:
return types.OrderStatusCanceled
case futures.OrderStatusTypePartiallyFilled:
return types.OrderStatusPartiallyFilled
case futures.OrderStatusTypeFilled:
return types.OrderStatusFilled
}
return types.OrderStatus(orderStatus)
}
// ConvertTrades converts the binance v3 trade into the global trade type
func ConvertTrades(remoteTrades []*binance.TradeV3) (trades []types.Trade, err error) {
for _, t := range remoteTrades {
trade, err := ToGlobalTrade(*t, false)
trade, err := toGlobalTrade(*t, false)
if err != nil {
return nil, errors.Wrapf(err, "binance v3 trade parse error, trade: %+v", *t)
}
@ -364,4 +418,3 @@ func convertPremiumIndex(index *futures.PremiumIndex) (*types.PremiumIndex, erro
Time: t,
}, nil
}

View File

@ -3,6 +3,7 @@ package binance
import (
"context"
"fmt"
"github.com/adshao/go-binance/v2/futures"
"net/http"
"os"
"strconv"
@ -14,7 +15,6 @@ import (
"github.com/adshao/go-binance/v2"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/fixedpoint"
@ -27,7 +27,6 @@ const BNB = "BNB"
// 50 per 10 seconds = 5 per second
var orderLimiter = rate.NewLimiter(5, 5)
var log = logrus.WithFields(logrus.Fields{
"exchange": "binance",
})
@ -35,6 +34,7 @@ var log = logrus.WithFields(logrus.Fields{
func init() {
_ = types.Exchange(&Exchange{})
_ = types.MarginExchange(&Exchange{})
_ = types.FuturesExchange(&Exchange{})
// FIXME: this is not effected since dotenv is loaded in the rootCmd, not in the init function
if ok, _ := strconv.ParseBool(os.Getenv("DEBUG_BINANCE_STREAM")); ok {
@ -46,20 +46,38 @@ type Exchange struct {
types.MarginSettings
types.FuturesSettings
key, secret string
Client *binance.Client
key, secret string
Client *binance.Client // Spot & Margin
futuresClient *futures.Client // USDT-M Futures
// deliveryClient *delivery.Client // Coin-M Futures
}
func New(key, secret string) *Exchange {
var client = binance.NewClient(key, secret)
client.HTTPClient = &http.Client{Timeout: 15 * time.Second}
_, _ = client.NewSetServerTimeService().Do(context.Background())
return &Exchange{
key: key,
secret: secret,
Client: client,
var futuresClient = binance.NewFuturesClient(key, secret)
futuresClient.HTTPClient = &http.Client{Timeout: 15 * time.Second}
_, _ = futuresClient.NewSetServerTimeService().Do(context.Background())
var err error
_, err = client.NewSetServerTimeService().Do(context.Background())
if err != nil {
panic(err)
}
_, err = futuresClient.NewSetServerTimeService().Do(context.Background())
if err != nil {
panic(err)
}
return &Exchange{
key: key,
secret: secret,
Client: client,
futuresClient: futuresClient,
// deliveryClient: deliveryClient,
}
}
@ -152,8 +170,9 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6
}
func (e *Exchange) NewStream() types.Stream {
stream := NewStream(e.Client)
stream := NewStream(e.Client, e.futuresClient)
stream.MarginSettings = e.MarginSettings
stream.FuturesSettings = e.FuturesSettings
return stream
}
@ -180,7 +199,6 @@ func (e *Exchange) QueryIsolatedMarginAccount(ctx context.Context, symbols ...st
return toGlobalIsolatedMarginAccount(account), nil
}
func (e *Exchange) Withdrawal(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *types.WithdrawalOptions) error {
req := e.Client.NewCreateWithdrawService()
req.Coin(asset)
@ -700,7 +718,7 @@ func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder)
func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
for _, order := range orders {
if err := orderLimiter.Wait(ctx) ; err != nil {
if err := orderLimiter.Wait(ctx); err != nil {
log.WithError(err).Errorf("order rate limiter wait error")
}
@ -847,7 +865,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}
for _, t := range remoteTrades {
localTrade, err := ToGlobalTrade(*t, e.IsMargin)
localTrade, err := toGlobalTrade(*t, e.IsMargin)
if err != nil {
log.WithError(err).Errorf("can not convert binance trade: %+v", t)
continue
@ -932,4 +950,4 @@ func getLaunchDate() (time.Time, error) {
}
return time.Date(2017, time.July, 14, 0, 0, 0, 0, loc), nil
}
}

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/adshao/go-binance/v2/futures"
"time"
"github.com/adshao/go-binance/v2"
@ -292,15 +293,23 @@ func ParseEvent(message string) (interface{}, error) {
case "depthUpdate":
return parseDepthEvent(val)
case "markPriceUpdate":
var event MarkPriceUpdateEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
case "continuousKline":
var event ContinuousKLineEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
// Binance futures data --------------
case "continuousKline":
var event ContinuousKLineEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
case "ORDER_TRADE_UPDATE":
var event OrderTradeUpdateEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
default:
id := val.GetInt("id")
if id > 0 {
@ -470,6 +479,37 @@ type KLine struct {
Closed bool `json:"x"`
}
/*
kline
{
"e": "kline", // KLineEvent type
"E": 123456789, // KLineEvent time
"s": "BNBBTC", // Symbol
"k": {
"t": 123400000, // Kline start time
"T": 123460000, // Kline close time
"s": "BNBBTC", // Symbol
"i": "1m", // Interval
"f": 100, // First trade ID
"L": 200, // Last trade ID
"o": "0.0010", // Open price
"c": "0.0020", // Close price
"h": "0.0025", // High price
"l": "0.0015", // Low price
"v": "1000", // Base asset volume
"n": 100, // Number of trades
"x": false, // Is this kline closed?
"q": "1.0000", // Quote asset volume
"V": "500", // Taker buy base asset volume
"Q": "0.500", // Taker buy quote asset volume
"B": "123456" // Ignore
}
}
*/
type KLineEvent struct {
EventBase
Symbol string `json:"s"`
@ -497,18 +537,17 @@ func (k *KLine) KLine() types.KLine {
}
}
type MarkPriceUpdateEvent struct {
EventBase
Symbol string `json:"s"`
Symbol string `json:"s"`
MarkPrice fixedpoint.Value `json:"p"`
IndexPrice fixedpoint.Value `json:"i"`
EstimatedPrice fixedpoint.Value `json:"P"`
FundingRate fixedpoint.Value `json:"r"`
NextFundingTime int64 `json:"T"`
MarkPrice fixedpoint.Value `json:"p"`
IndexPrice fixedpoint.Value `json:"i"`
EstimatedPrice fixedpoint.Value `json:"P"`
FundingRate fixedpoint.Value `json:"r"`
NextFundingTime int64 `json:"T"`
}
/*
@ -558,36 +597,123 @@ type ContinuousKLineEvent struct {
}
*/
/*
// Similar to the ExecutionReportEvent's fields. But with totally different json key.
// e.g., Stop price. So that, we can not merge them.
type OrderTrade struct {
Symbol string `json:"s"`
ClientOrderID string `json:"c"`
Side string `json:"S"`
OrderType string `json:"o"`
TimeInForce string `json:"f"`
OriginalQuantity string `json:"q"`
OriginalPrice string `json:"p"`
kline
AveragePrice string `json:"ap"`
StopPrice string `json:"sp"`
CurrentExecutionType string `json:"x"`
CurrentOrderStatus string `json:"X"`
{
"e": "kline", // KLineEvent type
"E": 123456789, // KLineEvent time
"s": "BNBBTC", // Symbol
"k": {
"t": 123400000, // Kline start time
"T": 123460000, // Kline close time
"s": "BNBBTC", // Symbol
"i": "1m", // Interval
"f": 100, // First trade ID
"L": 200, // Last trade ID
"o": "0.0010", // Open price
"c": "0.0020", // Close price
"h": "0.0025", // High price
"l": "0.0015", // Low price
"v": "1000", // Base asset volume
"n": 100, // Number of trades
"x": false, // Is this kline closed?
"q": "1.0000", // Quote asset volume
"V": "500", // Taker buy base asset volume
"Q": "0.500", // Taker buy quote asset volume
"B": "123456" // Ignore
}
OrderId int64 `json:"i"`
OrderLastFilledQuantity string `json:"l"`
OrderFilledAccumulatedQuantity string `json:"z"`
LastFilledPrice string `json:"L"`
CommissionAmount string `json:"n"`
CommissionAsset string `json:"N"`
OrderTradeTime int64 `json:"T"`
TradeId int64 `json:"t"`
BidsNotional string `json:"b"`
AskNotional string `json:"a"`
IsMaker bool `json:"m"`
IsReduceOnly bool ` json:"r"`
StopPriceWorkingType string `json:"wt"`
OriginalOrderType string `json:"ot"`
PositionSide string `json:"ps"`
RealizedProfit string `json:"rp"`
}
type OrderTradeUpdateEvent struct {
EventBase
Transaction int64 `json:"T"`
OrderTrade OrderTrade `json:"o"`
}
// {
// "e":"ORDER_TRADE_UPDATE", // Event Type
// "E":1568879465651, // Event Time
// "T":1568879465650, // Transaction Time
// "o":{
// "s":"BTCUSDT", // Symbol
// "c":"TEST", // Client Order Id
// // special client order id:
// // starts with "autoclose-": liquidation order
// // "adl_autoclose": ADL auto close order
// "S":"SELL", // Side
// "o":"TRAILING_STOP_MARKET", // Order Type
// "f":"GTC", // Time in Force
// "q":"0.001", // Original Quantity
// "p":"0", // Original Price
// "ap":"0", // Average Price
// "sp":"7103.04", // Stop Price. Please ignore with TRAILING_STOP_MARKET order
// "x":"NEW", // Execution Type
// "X":"NEW", // Order Status
// "i":8886774, // Order Id
// "l":"0", // Order Last Filled Quantity
// "z":"0", // Order Filled Accumulated Quantity
// "L":"0", // Last Filled Price
// "N":"USDT", // Commission Asset, will not push if no commission
// "n":"0", // Commission, will not push if no commission
// "T":1568879465651, // Order Trade Time
// "t":0, // Trade Id
// "b":"0", // Bids Notional
// "a":"9.91", // Ask Notional
// "m":false, // Is this trade the maker side?
// "R":false, // Is this reduce only
// "wt":"CONTRACT_PRICE", // Stop Price Working Type
// "ot":"TRAILING_STOP_MARKET", // Original Order Type
// "ps":"LONG", // Position Side
// "cp":false, // If Close-All, pushed with conditional order
// "AP":"7476.89", // Activation Price, only puhed with TRAILING_STOP_MARKET order
// "cr":"5.0", // Callback Rate, only puhed with TRAILING_STOP_MARKET order
// "rp":"0" // Realized Profit of the trade
// }
// }
func (e *OrderTradeUpdateEvent) OrderFutures() (*types.Order, error) {
switch e.OrderTrade.CurrentExecutionType {
case "NEW", "CANCELED", "EXPIRED":
case "CALCULATED - Liquidation Execution":
case "TRADE": // For Order FILLED status. And the order has been completed.
default:
return nil, errors.New("execution report type is not for futures order")
}
orderCreationTime := time.Unix(0, e.OrderTrade.OrderTradeTime*int64(time.Millisecond))
return &types.Order{
Exchange: types.ExchangeBinance,
SubmitOrder: types.SubmitOrder{
Symbol: e.OrderTrade.Symbol,
ClientOrderID: e.OrderTrade.ClientOrderID,
Side: toGlobalFuturesSideType(futures.SideType(e.OrderTrade.Side)),
Type: toGlobalFuturesOrderType(futures.OrderType(e.OrderTrade.OrderType)),
Quantity: util.MustParseFloat(e.OrderTrade.OriginalQuantity),
Price: util.MustParseFloat(e.OrderTrade.OriginalPrice),
TimeInForce: e.OrderTrade.TimeInForce,
},
OrderID: uint64(e.OrderTrade.OrderId),
Status: toGlobalFuturesOrderStatus(futures.OrderStatusType(e.OrderTrade.CurrentOrderStatus)),
ExecutedQuantity: util.MustParseFloat(e.OrderTrade.OrderFilledAccumulatedQuantity),
CreationTime: types.Time(orderCreationTime),
}, nil
}
*/
type EventBase struct {
Event string `json:"e"` // event
Time int64 `json:"E"`

View File

@ -12,6 +12,8 @@ import (
"time"
"github.com/adshao/go-binance/v2"
"github.com/adshao/go-binance/v2/futures"
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/types"
@ -61,7 +63,9 @@ type Stream struct {
types.FuturesSettings
types.StandardStream
Client *binance.Client
Client *binance.Client
futuresClient *futures.Client
Conn *websocket.Conn
ConnLock sync.Mutex
@ -76,23 +80,28 @@ type Stream struct {
kLineClosedEventCallbacks []func(e *KLineEvent)
markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent)
continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)
continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)
continuousKLineClosedEventCallbacks []func(e *ContinuousKLineEvent)
balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent)
outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent)
outboundAccountPositionEventCallbacks []func(event *OutboundAccountPositionEvent)
executionReportEventCallbacks []func(event *ExecutionReportEvent)
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
depthFrames map[string]*DepthFrame
}
func NewStream(client *binance.Client) *Stream {
func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
stream := &Stream{
StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1),
},
Client: client,
depthFrames: make(map[string]*DepthFrame),
Client: client,
futuresClient: futuresClient,
depthFrames: make(map[string]*DepthFrame),
}
stream.OnDepthEvent(func(e *DepthEvent) {
@ -207,6 +216,54 @@ func NewStream(client *binance.Client) *Stream {
}
})
stream.OnContinuousKLineEvent(func(e *ContinuousKLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
stream.EmitContinuousKLineClosedEvent(e)
stream.EmitKLineClosed(kline)
} else {
stream.EmitKLine(kline)
}
})
stream.OnOrderTradeUpdateEvent(func(e *OrderTradeUpdateEvent) {
switch e.OrderTrade.CurrentExecutionType {
case "NEW", "CANCELED", "EXPIRED":
order, err := e.OrderFutures()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
stream.EmitOrderUpdate(*order)
case "TRADE":
// TODO
// trade, err := e.Trade()
// if err != nil {
// log.WithError(err).Error("trade convert error")
// return
// }
// stream.EmitTradeUpdate(*trade)
// order, err := e.OrderFutures()
// if err != nil {
// log.WithError(err).Error("order convert error")
// return
// }
// Update Order with FILLED event
// if order.Status == types.OrderStatusFilled {
// stream.EmitOrderUpdate(*order)
// }
case "CALCULATED - Liquidation Execution":
log.Infof("CALCULATED - Liquidation Execution not support yet.")
}
})
stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...")
for _, f := range stream.depthFrames {
@ -246,9 +303,17 @@ func (s *Stream) SetPublicOnly() {
func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
var url string
if s.publicOnly {
url = "wss://stream.binance.com:9443/ws"
if s.IsFutures {
url = "wss://fstream.binance.com/ws/"
} else {
url = "wss://stream.binance.com:9443/ws"
}
} else {
url = "wss://stream.binance.com:9443/ws/" + listenKey
if s.IsFutures {
url = "wss://fstream.binance.com/ws/" + listenKey
} else {
url = "wss://stream.binance.com:9443/ws/" + listenKey
}
}
conn, _, err := defaultDialer.Dial(url, nil)
@ -278,7 +343,12 @@ func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
log.Infof("margin mode is enabled, requesting margin user stream listen key...")
req := s.Client.NewStartMarginUserStreamService()
return req.Do(ctx)
} else if s.IsFutures {
log.Infof("futures mode is enabled, requesting futures user stream listen key...")
req := s.futuresClient.NewStartUserStreamService()
return req.Do(ctx)
}
log.Infof("spot mode is enabled, requesting margin user stream listen key...")
return s.Client.NewStartUserStreamService().Do(ctx)
}
@ -290,9 +360,11 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error
req.Symbol(s.IsolatedMarginSymbol)
return req.Do(ctx)
}
req := s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey)
return req.Do(ctx)
} else if s.IsFutures {
req := s.futuresClient.NewKeepaliveUserStreamService().ListenKey(listenKey)
return req.Do(ctx)
}
return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx)
@ -541,11 +613,15 @@ func (s *Stream) read(ctx context.Context) {
case *ExecutionReportEvent:
s.EmitExecutionReportEvent(e)
case *MarkPriceUpdateEvent:
s.EmitMarkPriceUpdateEvent(e)
case *ContinuousKLineEvent:
s.EmitContinuousKLineEvent(e)
case *ContinuousKLineEvent:
s.EmitContinuousKLineEvent(e)
case *OrderTradeUpdateEvent:
s.EmitOrderTradeUpdateEvent(e)
}
}
}
@ -565,6 +641,9 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
err = req.Do(ctx)
}
} else if s.IsFutures {
req := s.futuresClient.NewCloseUserStreamService().ListenKey(listenKey)
err = req.Do(ctx)
} else {
err = s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx)
}

View File

@ -54,6 +54,16 @@ func (s *Stream) EmitContinuousKLineEvent(e *ContinuousKLineEvent) {
}
}
func (s *Stream) OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent)) {
s.continuousKLineClosedEventCallbacks = append(s.continuousKLineClosedEventCallbacks, cb)
}
func (s *Stream) EmitContinuousKLineClosedEvent(e *ContinuousKLineEvent) {
for _, cb := range s.continuousKLineClosedEventCallbacks {
cb(e)
}
}
func (s *Stream) OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) {
s.balanceUpdateEventCallbacks = append(s.balanceUpdateEventCallbacks, cb)
}
@ -94,6 +104,16 @@ func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) {
}
}
func (s *Stream) OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) {
s.orderTradeUpdateEventCallbacks = append(s.orderTradeUpdateEventCallbacks, cb)
}
func (s *Stream) EmitOrderTradeUpdateEvent(e *OrderTradeUpdateEvent) {
for _, cb := range s.orderTradeUpdateEventCallbacks {
cb(e)
}
}
type StreamEventHub interface {
OnDepthEvent(cb func(e *DepthEvent))
@ -105,6 +125,8 @@ type StreamEventHub interface {
OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent))
OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent))
OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent))
OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent))
@ -112,4 +134,6 @@ type StreamEventHub interface {
OnOutboundAccountPositionEvent(cb func(event *OutboundAccountPositionEvent))
OnExecutionReportEvent(cb func(event *ExecutionReportEvent))
OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent))
}