Merge pull request #931 from c9s/fix/binance-listenkey-expired

fix: binance listenkey expired
This commit is contained in:
Yo-An Lin 2022-09-11 14:19:43 +08:00 committed by GitHub
commit 3d1dd18802
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 10 deletions

View File

@ -291,22 +291,22 @@ func parseWebSocketEvent(message []byte) (interface{}, error) {
case "outboundAccountPosition":
var event OutboundAccountPositionEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "outboundAccountInfo":
var event OutboundAccountInfoEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "balanceUpdate":
var event BalanceUpdateEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "executionReport":
var event ExecutionReportEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "depthUpdate":
@ -314,35 +314,40 @@ func parseWebSocketEvent(message []byte) (interface{}, error) {
case "markPriceUpdate":
var event MarkPriceUpdateEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "listenKeyExpired":
var event ListenKeyExpired
err = json.Unmarshal([]byte(message), &event)
return &event, err
// Binance futures data --------------
case "continuousKline":
var event ContinuousKLineEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "ORDER_TRADE_UPDATE":
var event OrderTradeUpdateEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
// Event: Balance and Position Update
case "ACCOUNT_UPDATE":
var event AccountUpdateEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
// Event: Order Update
case "ACCOUNT_CONFIG_UPDATE":
var event AccountConfigUpdateEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "trade":
var event MarketTradeEvent
err := json.Unmarshal([]byte(message), &event)
err = json.Unmarshal([]byte(message), &event)
return &event, err
default:
@ -618,6 +623,10 @@ func (k *KLine) KLine() types.KLine {
}
}
type ListenKeyExpired struct {
EventBase
}
type MarkPriceUpdateEvent struct {
EventBase

View File

@ -62,6 +62,8 @@ type Stream struct {
accountUpdateEventCallbacks []func(e *AccountUpdateEvent)
accountConfigUpdateEventCallbacks []func(e *AccountConfigUpdateEvent)
listenKeyExpiredCallbacks []func(e *ListenKeyExpired)
depthBuffers map[string]*depth.Buffer
}
@ -125,6 +127,9 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
stream.OnOrderTradeUpdateEvent(stream.handleOrderTradeUpdateEvent)
stream.OnDisconnect(stream.handleDisconnect)
stream.OnConnect(stream.handleConnect)
stream.OnListenKeyExpired(func(e *ListenKeyExpired) {
stream.Reconnect()
})
return stream
}
@ -363,6 +368,10 @@ func (s *Stream) dispatchEvent(e interface{}) {
case *AccountConfigUpdateEvent:
s.EmitAccountConfigUpdateEvent(e)
case *ListenKeyExpired:
s.EmitListenKeyExpired(e)
}
}

View File

@ -154,6 +154,16 @@ func (s *Stream) EmitAccountConfigUpdateEvent(e *AccountConfigUpdateEvent) {
}
}
func (s *Stream) OnListenKeyExpired(cb func(e *ListenKeyExpired)) {
s.listenKeyExpiredCallbacks = append(s.listenKeyExpiredCallbacks, cb)
}
func (s *Stream) EmitListenKeyExpired(e *ListenKeyExpired) {
for _, cb := range s.listenKeyExpiredCallbacks {
cb(e)
}
}
type StreamEventHub interface {
OnDepthEvent(cb func(e *DepthEvent))
@ -184,4 +194,6 @@ type StreamEventHub interface {
OnAccountUpdateEvent(cb func(e *AccountUpdateEvent))
OnAccountConfigUpdateEvent(cb func(e *AccountConfigUpdateEvent))
OnListenKeyExpired(cb func(e *ListenKeyExpired))
}