clean up binance stream

This commit is contained in:
c9s 2022-01-02 11:25:34 +08:00
parent b22bb4b28d
commit cc0e5f71b0

View File

@ -3,8 +3,6 @@ package binance
import (
"context"
"net"
"os"
"strconv"
"time"
"github.com/c9s/bbgo/pkg/depth"
@ -16,8 +14,6 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
var debugBinanceDepth bool
// from Binance document:
// The websocket server will send a ping frame every 3 minutes.
// If the websocket server does not receive a pong frame back from the connection within a 10 minute period, the connection will be disconnected.
@ -29,13 +25,6 @@ var debugBinanceDepth bool
// A JSON controlled message (e.g. subscribe, unsubscribe)
const listenKeyKeepAliveInterval = 10 * time.Minute
func init() {
debugBinanceDepth, _ = strconv.ParseBool(os.Getenv("DEBUG_BINANCE_DEPTH"))
if debugBinanceDepth {
log.Info("binance depth debugging is enabled")
}
}
type WebSocketCommand struct {
// request ID is required
ID int `json:"id"`
@ -78,9 +67,9 @@ type Stream struct {
func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Client) *Stream {
stream := &Stream{
StandardStream: types.NewStandardStream(),
client: client,
futuresClient: futuresClient,
depthBuffers: make(map[string]*depth.Buffer),
client: client,
futuresClient: futuresClient,
depthBuffers: make(map[string]*depth.Buffer),
}
stream.SetParser(parseWebSocketEvent)
@ -88,10 +77,6 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
stream.SetEndpointCreator(stream.createEndpoint)
stream.OnDepthEvent(func(e *DepthEvent) {
if debugBinanceDepth {
log.Infof("received %s depth event updateID %d ~ %d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
f, ok := stream.depthBuffers[e.Symbol]
if ok {
f.AddUpdate(types.SliceOrderBook{
@ -103,8 +88,6 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
return ex.QueryDepth(context.Background(), e.Symbol)
})
stream.depthBuffers[e.Symbol] = f
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
@ -120,6 +103,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
f.OnPush(func(update depth.Update) {
stream.EmitBookUpdate(update.Object)
})
stream.depthBuffers[e.Symbol] = f
}
})
@ -131,45 +115,46 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
// Event type ACCOUNT_UPDATE from user data stream updates Balance and FuturesPosition.
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
stream.OnAccountConfigUpdateEvent(stream.handleAccountConfigUpdateEvent)
stream.OnOrderTradeUpdateEvent(stream.handleOrderTradeUpdateEvent)
stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...")
for _, f := range stream.depthBuffers {
f.Reset()
}
})
stream.OnConnect(func() {
if stream.PublicOnly {
var params []string
for _, subscription := range stream.Subscriptions {
params = append(params, convertSubscription(subscription))
}
if len(params) == 0 {
return
}
log.Infof("subscribing channels: %+v", params)
err := stream.Conn.WriteJSON(WebSocketCommand{
Method: "SUBSCRIBE",
Params: params,
ID: 1,
})
if err != nil {
log.WithError(err).Error("subscribe error")
}
}
})
stream.OnDisconnect(stream.handleDisconnect)
stream.OnConnect(stream.handleConnect)
return stream
}
func (s *Stream) handleDisconnect() {
log.Debugf("resetting depth snapshots...")
for _, f := range s.depthBuffers {
f.Reset()
}
}
func (s *Stream) handleConnect() {
if !s.PublicOnly {
return
}
var params []string
for _, subscription := range s.Subscriptions {
params = append(params, convertSubscription(subscription))
}
if len(params) == 0 {
return
}
log.Infof("subscribing channels: %+v", params)
err := s.Conn.WriteJSON(WebSocketCommand{
Method: "SUBSCRIBE",
Params: params,
ID: 1,
})
if err != nil {
log.WithError(err).Error("subscribe error")
}
}
func (s *Stream) handleContinuousKLineEvent(e *ContinuousKLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
@ -328,7 +313,6 @@ func (s *Stream) createEndpoint(ctx context.Context) (string, error) {
return url, nil
}
func (s *Stream) dispatchEvent(e interface{}) {
switch e := e.(type) {