Merge pull request #412 from austin362667/refactor/futures-account

binance: add futures stream
This commit is contained in:
Yo-An Lin 2021-12-31 01:27:34 +08:00 committed by GitHub
commit 8aef3c002a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 134 additions and 60 deletions

View File

@ -154,20 +154,20 @@ func toGlobalFuturesBalance(balances []*futures.Balance) types.BalanceMap {
return retBalances
}
func toGlobalFuturesPositions(positions []*futures.AccountPosition) types.PositionMap {
retPositions := make(types.PositionMap)
for _, position := range positions {
retPositions[position.Symbol] = types.Position{
Isolated: position.Isolated,
func toGlobalFuturesPositions(futuresPositions []*futures.AccountPosition) types.FuturesPositionMap {
retFuturesPositions := make(types.FuturesPositionMap)
for _, futuresPosition := range futuresPositions {
retFuturesPositions[futuresPosition.Symbol] = types.FuturesPosition{ //TODO: types.FuturesPosition
Isolated: futuresPosition.Isolated,
PositionRisk: &types.PositionRisk{
Leverage: fixedpoint.MustNewFromString(position.Leverage),
Leverage: fixedpoint.MustNewFromString(futuresPosition.Leverage),
},
Symbol: position.Symbol,
UpdateTime: position.UpdateTime,
Symbol: futuresPosition.Symbol,
UpdateTime: futuresPosition.UpdateTime,
}
}
return retPositions
return retFuturesPositions
}
func toGlobalFuturesUserAssets(assets []*futures.AccountAsset) (retAssets map[types.Asset]types.FuturesUserAsset) {

View File

@ -100,7 +100,9 @@ type Stream struct {
executionReportEventCallbacks []func(event *ExecutionReportEvent)
bookTickerEventCallbacks []func(event *BookTickerEvent)
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
accountUpdateEventCallbacks []func(e *AccountUpdateEvent)
accountConfigUpdateEventCallbacks []func(e *AccountConfigUpdateEvent)
depthBuffers map[string]*depth.Buffer
}
@ -158,43 +160,12 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
stream.OnExecutionReportEvent(stream.handleExecutionReportEvent)
stream.OnContinuousKLineEvent(stream.handleContinuousKLineEvent)
stream.OnOrderTradeUpdateEvent(func(e *OrderTradeUpdateEvent) {
switch e.OrderTrade.CurrentExecutionType {
// Event type ACCOUNT_UPDATE from user data stream updates Balance and FuturesPosition.
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
case "NEW", "CANCELED", "EXPIRED":
order, err := e.OrderFutures()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
stream.OnAccountConfigUpdateEvent(stream.handleAccountConfigUpdateEvent)
stream.EmitOrderUpdate(*order)
case "TRADE":
// TODO
// trade, err := e.Trade()
// if err != nil {
// log.WithError(err).Error("trade convert error")
// return
// }
// stream.EmitTradeUpdate(*trade)
// order, err := e.OrderFutures()
// if err != nil {
// log.WithError(err).Error("order convert error")
// return
// }
// Update Order with FILLED event
// if order.Status == types.OrderStatusFilled {
// stream.EmitOrderUpdate(*order)
// }
case "CALCULATED - Liquidation Execution":
log.Infof("CALCULATED - Liquidation Execution not support yet.")
}
})
stream.OnOrderTradeUpdateEvent(stream.handleOrderTradeUpdateEvent)
stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...")
@ -298,6 +269,57 @@ func (s *Stream) handleOutboundAccountPositionEvent(e *OutboundAccountPositionEv
s.EmitBalanceSnapshot(snapshot)
}
func (s *Stream) handleAccountUpdateEvent(e *AccountUpdateEvent) {
futuresPositionSnapshot := toGlobalFuturesPositions(e.AccountUpdate.Positions)
s.EmitFuturesPositionSnapshot(futuresPositionSnapshot)
balanceSnapshot := toGlobalFuturesBalance(e.AccountUpdate.Balances)
s.EmitBalanceSnapshot(balanceSnapshot)
}
// TODO: emit account config leverage updates
func (s *Stream) handleAccountConfigUpdateEvent(e *AccountConfigUpdateEvent) {
}
func (s *Stream) handleOrderTradeUpdateEvent(e *OrderTradeUpdateEvent) {
switch e.OrderTrade.CurrentExecutionType {
case "NEW", "CANCELED", "EXPIRED":
order, err := e.OrderFutures()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
s.EmitOrderUpdate(*order)
case "TRADE":
// TODO
// trade, err := e.Trade()
// if err != nil {
// log.WithError(err).Error("trade convert error")
// return
// }
// stream.EmitTradeUpdate(*trade)
// order, err := e.OrderFutures()
// if err != nil {
// log.WithError(err).Error("order convert error")
// return
// }
// Update Order with FILLED event
// if order.Status == types.OrderStatusFilled {
// stream.EmitOrderUpdate(*order)
// }
case "CALCULATED - Liquidation Execution":
log.Infof("CALCULATED - Liquidation Execution not support yet.")
}
}
func (s *Stream) getEndpointUrl(listenKey string) string {
var url string
@ -557,6 +579,12 @@ func (s *Stream) dispatchEvent(e interface{}) {
case *OrderTradeUpdateEvent:
s.EmitOrderTradeUpdateEvent(e)
case *AccountUpdateEvent:
s.EmitAccountUpdateEvent(e)
case *AccountConfigUpdateEvent:
s.EmitAccountConfigUpdateEvent(e)
}
}

View File

@ -124,6 +124,26 @@ func (s *Stream) EmitOrderTradeUpdateEvent(e *OrderTradeUpdateEvent) {
}
}
func (s *Stream) OnAccountUpdateEvent(cb func(e *AccountUpdateEvent)) {
s.accountUpdateEventCallbacks = append(s.accountUpdateEventCallbacks, cb)
}
func (s *Stream) EmitAccountUpdateEvent(e *AccountUpdateEvent) {
for _, cb := range s.accountUpdateEventCallbacks {
cb(e)
}
}
func (s *Stream) OnAccountConfigUpdateEvent(cb func(e *AccountConfigUpdateEvent)) {
s.accountConfigUpdateEventCallbacks = append(s.accountConfigUpdateEventCallbacks, cb)
}
func (s *Stream) EmitAccountConfigUpdateEvent(e *AccountConfigUpdateEvent) {
for _, cb := range s.accountConfigUpdateEventCallbacks {
cb(e)
}
}
type StreamEventHub interface {
OnDepthEvent(cb func(e *DepthEvent))
@ -148,4 +168,8 @@ type StreamEventHub interface {
OnBookTickerEvent(cb func(event *BookTickerEvent))
OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent))
OnAccountUpdateEvent(cb func(e *AccountUpdateEvent))
OnAccountConfigUpdateEvent(cb func(e *AccountConfigUpdateEvent))
}

View File

@ -121,6 +121,7 @@ func (m AssetMap) SlackAttachment() slack.Attachment {
type BalanceMap map[string]Balance
type PositionMap map[string]Position
type FuturesPositionMap map[string]FuturesPosition
func (m BalanceMap) String() string {
var ss []string
@ -229,7 +230,7 @@ type FuturesAccountInfo struct {
Assets map[Asset]FuturesUserAsset `json:"assets"`
FeeTier int `json:"feeTier"`
MaxWithdrawAmount fixedpoint.Value `json:"maxWithdrawAmount"`
Positions PositionMap `json:"positions"`
Positions FuturesPositionMap `json:"positions"`
TotalInitialMargin fixedpoint.Value `json:"totalInitialMargin"`
TotalMaintMargin fixedpoint.Value `json:"totalMaintMargin"`
TotalMarginBalance fixedpoint.Value `json:"totalMarginBalance"`

View File

@ -39,6 +39,27 @@ type Position struct {
FeeRate *ExchangeFee `json:"feeRate,omitempty"`
ExchangeFeeRates map[ExchangeName]ExchangeFee `json:"exchangeFeeRates"`
sync.Mutex
}
type FuturesPosition struct {
Symbol string `json:"symbol"`
BaseCurrency string `json:"baseCurrency"`
QuoteCurrency string `json:"quoteCurrency"`
Market Market `json:"market"`
Base fixedpoint.Value `json:"base"`
Quote fixedpoint.Value `json:"quote"`
AverageCost fixedpoint.Value `json:"averageCost"`
// ApproximateAverageCost adds the computed fee in quote in the average cost
// This is used for calculating net profit
ApproximateAverageCost fixedpoint.Value `json:"approximateAverageCost"`
FeeRate *ExchangeFee `json:"feeRate,omitempty"`
ExchangeFeeRates map[ExchangeName]ExchangeFee `json:"exchangeFeeRates"`
// Futures data fields
Isolated bool `json:"isolated"`
UpdateTime int64 `json:"updateTime"`

View File

@ -124,23 +124,23 @@ func (stream *StandardStream) EmitBookSnapshot(book SliceOrderBook) {
}
}
func (stream *StandardStream) OnPositionUpdate(cb func(position PositionMap)) {
stream.PositionUpdateCallbacks = append(stream.PositionUpdateCallbacks, cb)
func (stream *StandardStream) OnFuturesPositionUpdate(cb func(futuresPositions FuturesPositionMap)) {
stream.FuturesPositionUpdateCallbacks = append(stream.FuturesPositionUpdateCallbacks, cb)
}
func (stream *StandardStream) EmitPositionUpdate(position PositionMap) {
for _, cb := range stream.PositionUpdateCallbacks {
cb(position)
func (stream *StandardStream) EmitFuturesPositionUpdate(futuresPositions FuturesPositionMap) {
for _, cb := range stream.FuturesPositionUpdateCallbacks {
cb(futuresPositions)
}
}
func (stream *StandardStream) OnPositionSnapshot(cb func(position PositionMap)) {
stream.PositionSnapshotCallbacks = append(stream.PositionSnapshotCallbacks, cb)
func (stream *StandardStream) OnFuturesPositionSnapshot(cb func(futuresPositions FuturesPositionMap)) {
stream.FuturesPositionSnapshotCallbacks = append(stream.FuturesPositionSnapshotCallbacks, cb)
}
func (stream *StandardStream) EmitPositionSnapshot(position PositionMap) {
for _, cb := range stream.PositionSnapshotCallbacks {
cb(position)
func (stream *StandardStream) EmitFuturesPositionSnapshot(futuresPositions FuturesPositionMap) {
for _, cb := range stream.FuturesPositionSnapshotCallbacks {
cb(futuresPositions)
}
}
@ -169,7 +169,7 @@ type StandardStreamEventHub interface {
OnBookSnapshot(cb func(book SliceOrderBook))
OnPositionUpdate(cb func(position PositionMap))
OnFuturesPositionUpdate(cb func(futuresPositions FuturesPositionMap))
OnPositionSnapshot(cb func(position PositionMap))
OnFuturesPositionSnapshot(cb func(futuresPositions FuturesPositionMap))
}

View File

@ -62,9 +62,9 @@ type StandardStream struct {
bookSnapshotCallbacks []func(book SliceOrderBook)
// Futures
PositionUpdateCallbacks []func(position PositionMap)
FuturesPositionUpdateCallbacks []func(futuresPositions FuturesPositionMap)
PositionSnapshotCallbacks []func(position PositionMap)
FuturesPositionSnapshotCallbacks []func(futuresPositions FuturesPositionMap)
}
func (stream *StandardStream) Subscribe(channel Channel, symbol string, options SubscribeOptions) {