This commit is contained in:
c9s 2021-12-30 15:47:13 +08:00
parent 7fa05b33f8
commit e73866a232
2 changed files with 62 additions and 24 deletions

View File

@ -10,6 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/depth" "github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
@ -38,6 +40,16 @@ var defaultDialer = &websocket.Dialer{
// A PING frame // A PING frame
// A PONG frame // A PONG frame
// A JSON controlled message (e.g. subscribe, unsubscribe) // A JSON controlled message (e.g. subscribe, unsubscribe)
// The connect() method dials and creates the connection object, then it starts 2 go-routine, 1 for reading message, 2 for writing ping messages.
// The re-connector uses the ReconnectC signal channel to start a new websocket connection.
// When ReconnectC is triggered
// - The context created for the connection must be canceled
// - The read goroutine must close the connection and exit
// - The ping goroutine must stop the ticker and exit
// - the re-connector calls connect() to create the new connection object, go to the 1 step.
// When stream.Close() is called, a close message must be written to the websocket connection.
const readTimeout = 3 * time.Minute const readTimeout = 3 * time.Minute
const writeTimeout = 10 * time.Second const writeTimeout = 10 * time.Second
const listenKeyKeepAliveInterval = 10 * time.Minute const listenKeyKeepAliveInterval = 10 * time.Minute
@ -99,6 +111,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
stream := &Stream{ stream := &Stream{
StandardStream: types.StandardStream{ StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1), ReconnectC: make(chan struct{}, 1),
CloseC: make(chan struct{}),
}, },
Client: client, Client: client,
futuresClient: futuresClient, futuresClient: futuresClient,
@ -325,6 +338,7 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
return conn, nil return conn, nil
} }
// Connect starts the stream and create the websocket connection
func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) Connect(ctx context.Context) error {
err := s.connect(ctx) err := s.connect(ctx)
if err != nil { if err != nil {
@ -341,13 +355,18 @@ func (s *Stream) Connect(ctx context.Context) error {
func (s *Stream) reconnector(ctx context.Context) { func (s *Stream) reconnector(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-s.CloseC:
return
case <-s.ReconnectC: case <-s.ReconnectC:
log.Warnf("received reconnect signal, reconnecting...") log.Warnf("received reconnect signal")
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
log.Warnf("re-connecting...")
if err := s.connect(ctx); err != nil { if err := s.connect(ctx); err != nil {
log.WithError(err).Errorf("connect error, try to reconnect again...") log.WithError(err).Errorf("connect error, try to reconnect again...")
s.Reconnect() s.Reconnect()
@ -387,38 +406,38 @@ func (s *Stream) connect(ctx context.Context) error {
s.connCancel() s.connCancel()
} }
// create a new context // create a new context for this connection
s.connCtx, s.connCancel = context.WithCancel(ctx) s.connCtx, s.connCancel = context.WithCancel(ctx)
s.Conn = conn s.Conn = conn
s.ConnLock.Unlock() s.ConnLock.Unlock()
s.EmitConnect() s.EmitConnect()
if !s.PublicOnly { if !s.PublicOnly {
go s.listenKeyKeepAlive(s.connCtx, listenKey) go s.listenKeyKeepAlive(s.connCtx, listenKey)
} }
go s.read(s.connCtx) go s.read(s.connCtx, conn)
go s.ping(s.connCtx) go s.ping(s.connCtx, conn, readTimeout / 3)
return nil return nil
} }
func (s *Stream) ping(ctx context.Context) { func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, interval time.Duration) {
pingTicker := time.NewTicker(readTimeout / 2) defer log.Debug("ping worker stopped")
var pingTicker = time.NewTicker(interval)
defer pingTicker.Stop() defer pingTicker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Debug("ping worker stopped") return
case <-s.CloseC:
return return
case <-pingTicker.C: case <-pingTicker.C:
s.ConnLock.Lock()
conn := s.Conn
s.ConnLock.Unlock()
log.Debugf("websocket -> ping") log.Debugf("websocket -> ping")
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeTimeout)); err != nil { if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeTimeout)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
@ -474,11 +493,13 @@ func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
} }
} }
func (s *Stream) read(ctx context.Context) { func (s *Stream) read(ctx context.Context, conn *websocket.Conn) {
defer func() { defer func() {
// if we failed to read, we need to cancel the context
if s.connCancel != nil { if s.connCancel != nil {
s.connCancel() s.connCancel()
} }
_ = conn.Close()
s.EmitDisconnect() s.EmitDisconnect()
}() }()
@ -488,12 +509,10 @@ func (s *Stream) read(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-s.CloseC:
return
default: default:
s.ConnLock.Lock()
conn := s.Conn
s.ConnLock.Unlock()
if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error()) log.WithError(err).Errorf("set read deadline error: %s", err.Error())
} }
@ -509,6 +528,8 @@ func (s *Stream) read(ctx context.Context) {
return return
} }
log.WithError(err).Errorf("websocket error abnormal close: %+v", err)
_ = conn.Close() _ = conn.Close()
// for unexpected close error, we should re-connect // for unexpected close error, we should re-connect
// emit reconnect to start a new connection // emit reconnect to start a new connection
@ -516,13 +537,13 @@ func (s *Stream) read(ctx context.Context) {
return return
case net.Error: case net.Error:
log.WithError(err).Error("websocket network error") log.WithError(err).Error("websocket read network error")
_ = conn.Close() _ = conn.Close()
s.Reconnect() s.Reconnect()
return return
default: default:
log.WithError(err).Error("unexpected connection error") log.WithError(err).Error("unexpected websocket error")
_ = conn.Close() _ = conn.Close()
s.Reconnect() s.Reconnect()
return return
@ -581,14 +602,27 @@ func (s *Stream) read(ctx context.Context) {
func (s *Stream) Close() error { func (s *Stream) Close() error {
log.Infof("closing stream...") log.Infof("closing stream...")
// close the close signal channel
close(s.CloseC)
// get the connection object before call the context cancel function
s.ConnLock.Lock()
conn := s.Conn
s.ConnLock.Unlock()
// cancel the context so that the ticker loop and listen key updater will be stopped.
if s.connCancel != nil { if s.connCancel != nil {
s.connCancel() s.connCancel()
} }
s.ConnLock.Lock() // gracefully write the close message to the connection
err := s.Conn.Close() err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
s.ConnLock.Unlock() if err != nil {
return err return errors.Wrap(err, "websocket write close message error")
}
err = s.Conn.Close()
return errors.Wrap(err, "websocket connection close error")
} }
func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {

View File

@ -26,8 +26,12 @@ var BookTickerChannel = Channel("bookticker")
type StandardStream struct { type StandardStream struct {
PublicOnly bool PublicOnly bool
// ReconnectC is a signal channel for reconnecting
ReconnectC chan struct{} ReconnectC chan struct{}
// CloseC is a signal channel for closing stream
CloseC chan struct{}
Subscriptions []Subscription Subscriptions []Subscription
startCallbacks []func() startCallbacks []func()