add public only mode to stream

This commit is contained in:
c9s 2020-12-21 15:43:54 +08:00
parent ce0e28708a
commit f56318c9b6
5 changed files with 33 additions and 16 deletions

View File

@ -45,6 +45,7 @@ var rootCmd = &cobra.Command{
var exchange = binance.New(key, secret) var exchange = binance.New(key, secret)
stream := exchange.NewStream() stream := exchange.NewStream()
stream.SetPublicOnly()
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
stream.OnBookSnapshot(func(book types.OrderBook) { stream.OnBookSnapshot(func(book types.OrderBook) {

View File

@ -186,7 +186,13 @@ func (s *Stream) SetPublicOnly() {
} }
func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
url := "wss://stream.binance.com:9443/ws/" + listenKey var url string
if s.publicOnly {
url = "wss://stream.binance.com:9443/ws"
} else {
url = "wss://stream.binance.com:9443/ws/" + listenKey
}
conn, _, err := websocket.DefaultDialer.Dial(url, nil) conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -18,9 +17,9 @@ var ErrMessageTypeNotSupported = errors.New("message type currently not supporte
// Subscription is used for presenting the subscription metadata. // Subscription is used for presenting the subscription metadata.
// This is used for sending subscribe and unsubscribe requests // This is used for sending subscribe and unsubscribe requests
type Subscription struct { type Subscription struct {
Channel string `json:"channel"` Channel string `json:"channel"`
Market string `json:"market"` Market string `json:"market"`
Depth int `json:"depth,omitempty"` Depth int `json:"depth,omitempty"`
Resolution string `json:"resolution,omitempty"` Resolution string `json:"resolution,omitempty"`
} }
@ -82,13 +81,6 @@ func (s *WebSocketService) Connect(ctx context.Context) error {
} }
}) })
s.OnConnect(func(conn *websocket.Conn) {
if err := s.Auth(); err != nil {
s.EmitError(err)
logger.WithError(err).Error("failed to send auth request")
}
})
// pre-allocate the websocket client, the websocket client can be used for reconnecting. // pre-allocate the websocket client, the websocket client can be used for reconnecting.
if err := s.connect(ctx); err != nil { if err := s.connect(ctx); err != nil {
return err return err

View File

@ -19,6 +19,8 @@ type Stream struct {
types.StandardStream types.StandardStream
websocketService *max.WebSocketService websocketService *max.WebSocketService
publicOnly bool
} }
func NewStream(key, secret string) *Stream { func NewStream(key, secret string) *Stream {
@ -28,6 +30,17 @@ func NewStream(key, secret string) *Stream {
websocketService: wss, websocketService: wss,
} }
wss.OnConnect(func(conn *websocket.Conn) {
if key == "" || secret == "" {
log.Warn("MAX API key or secret is empty, will not send authentication command")
} else {
if err := wss.Auth(); err != nil {
wss.EmitError(err)
logger.WithError(err).Error("failed to send auth request")
}
}
})
wss.OnMessage(func(message []byte) { wss.OnMessage(func(message []byte) {
logger.Debugf("M: %s", message) logger.Debugf("M: %s", message)
}) })
@ -77,19 +90,19 @@ func NewStream(key, secret string) *Stream {
}) })
wss.OnBookEvent(func(e max.BookEvent) { wss.OnBookEvent(func(e max.BookEvent) {
newbook, err := e.OrderBook() newBook, err := e.OrderBook()
if err != nil { if err != nil {
logger.WithError(err).Error("book convert error") logger.WithError(err).Error("book convert error")
return return
} }
newbook.Symbol = toGlobalSymbol(e.Market) newBook.Symbol = toGlobalSymbol(e.Market)
switch e.Event { switch e.Event {
case "snapshot": case "snapshot":
stream.EmitBookSnapshot(newbook) stream.EmitBookSnapshot(newBook)
case "update": case "update":
stream.EmitBookUpdate(newbook) stream.EmitBookUpdate(newBook)
} }
}) })
@ -128,6 +141,10 @@ func NewStream(key, secret string) *Stream {
return stream return stream
} }
func (s *Stream) SetPublicOnly() {
s.publicOnly = true
}
func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) { func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) {
s.websocketService.Subscribe(string(channel), toLocalSymbol(symbol)) s.websocketService.Subscribe(string(channel), toLocalSymbol(symbol))
} }

View File

@ -8,6 +8,7 @@ type Stream interface {
StandardStreamEventHub StandardStreamEventHub
Subscribe(channel Channel, symbol string, options SubscribeOptions) Subscribe(channel Channel, symbol string, options SubscribeOptions)
SetPublicOnly()
Connect(ctx context.Context) error Connect(ctx context.Context) error
Close() error Close() error
} }