mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
fix reconnect
This commit is contained in:
parent
707f96ae08
commit
c329f6dc86
|
@ -34,16 +34,9 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6
|
|||
}
|
||||
|
||||
func (e *Exchange) NewPrivateStream(ctx context.Context) (*PrivateStream, error) {
|
||||
logrus.Infof("[binance] creating user data stream...")
|
||||
listenKey, err := e.Client.NewStartUserStreamService().Do(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logrus.Infof("[binance] user data stream created. listenKey: %s", listenKey)
|
||||
return &PrivateStream{
|
||||
Client: e.Client,
|
||||
ListenKey: listenKey,
|
||||
// ListenKey: listenKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -69,9 +69,27 @@ func (s *PrivateStream) Subscribe(channel string, symbol string, options Subscri
|
|||
})
|
||||
}
|
||||
|
||||
func (s *PrivateStream) Connect(ctx context.Context, eventC chan interface{}) error {
|
||||
url := "wss://stream.binance.com:9443/ws/" + s.ListenKey
|
||||
func (s *PrivateStream) dial(listenKey string) (*websocket.Conn, error) {
|
||||
url := "wss://stream.binance.com:9443/ws/" + listenKey
|
||||
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (s *PrivateStream) connect(ctx context.Context) error {
|
||||
log.Infof("[binance] creating user data stream...")
|
||||
listenKey, err := s.Client.NewStartUserStreamService().Do(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ListenKey = listenKey
|
||||
log.Infof("[binance] user data stream created. listenKey: %s", s.ListenKey)
|
||||
|
||||
conn, err := s.dial(s.ListenKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -87,18 +105,20 @@ func (s *PrivateStream) Connect(ctx context.Context, eventC chan interface{}) er
|
|||
}
|
||||
|
||||
log.Infof("[binance] subscribing channels: %+v", params)
|
||||
err = conn.WriteJSON(StreamRequest{
|
||||
return conn.WriteJSON(StreamRequest{
|
||||
Method: "SUBSCRIBE",
|
||||
Params: params,
|
||||
ID: 1,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *PrivateStream) Connect(ctx context.Context, eventC chan interface{}) error {
|
||||
err := s.connect(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go s.read(ctx, eventC)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -121,14 +141,21 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
|
|||
}
|
||||
|
||||
default:
|
||||
if err := s.Conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil {
|
||||
if err := s.Conn.SetReadDeadline(time.Now().Add(6 * time.Second)); err != nil {
|
||||
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
|
||||
}
|
||||
|
||||
mt, message, err := s.Conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("read error: %s", err.Error())
|
||||
return
|
||||
|
||||
// reconnect
|
||||
for err != nil {
|
||||
err = s.connect(ctx)
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// skip non-text messages
|
||||
|
@ -184,18 +211,22 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *PrivateStream) Close() error {
|
||||
log.Infof("[binance] closing user data stream...")
|
||||
|
||||
defer s.Conn.Close()
|
||||
|
||||
// use background context to close user stream
|
||||
err := s.Client.NewCloseUserStreamService().ListenKey(s.ListenKey).Do(context.Background())
|
||||
func (s *PrivateStream) invalidateListenKey(ctx context.Context, listenKey string) error {
|
||||
// use background context to invalidate the user stream
|
||||
err := s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("[binance] error close user data stream")
|
||||
log.WithError(err).Error("[binance] error deleting listen key")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("[binance] user data stream closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PrivateStream) Close() error {
|
||||
log.Infof("[binance] closing user data stream...")
|
||||
defer s.Conn.Close()
|
||||
err := s.invalidateListenKey(context.Background(), s.ListenKey)
|
||||
|
||||
log.Infof("[binance] user data stream closed")
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user