bbgo_origin/pkg/exchange/max/maxapi/websocket.go

315 lines
7.5 KiB
Go
Raw Normal View History

2020-10-01 08:07:18 +00:00
package max
import (
"context"
2020-10-01 08:48:08 +00:00
"fmt"
"net"
"sync"
2020-10-01 08:07:18 +00:00
"time"
2020-10-01 08:48:08 +00:00
"github.com/google/uuid"
2020-10-01 08:07:18 +00:00
"github.com/gorilla/websocket"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2020-10-01 08:07:18 +00:00
)
2020-10-02 04:43:14 +00:00
var WebSocketURL = "wss://max-stream.maicoin.com/ws"
2020-10-01 08:07:18 +00:00
var ErrMessageTypeNotSupported = errors.New("message type currently not supported")
2021-02-22 09:36:23 +00:00
type SubscribeOptions struct {
Depth int `json:"depth,omitempty"`
Resolution string `json:"resolution,omitempty"`
}
2020-10-01 08:07:18 +00:00
// Subscription is used for presenting the subscription metadata.
// This is used for sending subscribe and unsubscribe requests
type Subscription struct {
2020-12-21 07:43:54 +00:00
Channel string `json:"channel"`
Market string `json:"market"`
Depth int `json:"depth,omitempty"`
Resolution string `json:"resolution,omitempty"`
2020-10-01 08:07:18 +00:00
}
type WebsocketCommand struct {
// Action is used for specify the action of the websocket session.
// Valid values are "subscribe", "unsubscribe" and "auth"
Action string `json:"action"`
Subscriptions []Subscription `json:"subscriptions,omitempty"`
}
var SubscribeAction = "subscribe"
var UnsubscribeAction = "unsubscribe"
2020-10-02 04:43:14 +00:00
//go:generate callbackgen -type WebSocketService
2020-10-01 08:48:08 +00:00
type WebSocketService struct {
baseURL, key, secret string
2020-10-01 08:07:18 +00:00
mu sync.Mutex
2020-10-01 08:48:08 +00:00
conn *websocket.Conn
2020-10-01 08:07:18 +00:00
reconnectC chan struct{}
// Subscriptions is the subscription request payloads that will be used for sending subscription request
Subscriptions []Subscription
2020-10-02 13:29:56 +00:00
connectCallbacks []func(conn *websocket.Conn)
2021-03-15 02:27:01 +00:00
disconnectCallbacks []func()
2020-10-01 08:07:18 +00:00
errorCallbacks []func(err error)
messageCallbacks []func(message []byte)
bookEventCallbacks []func(e BookEvent)
2020-10-02 13:29:56 +00:00
tradeEventCallbacks []func(e PublicTradeEvent)
kLineEventCallbacks []func(e KLineEvent)
2020-10-01 08:07:18 +00:00
errorEventCallbacks []func(e ErrorEvent)
subscriptionEventCallbacks []func(e SubscriptionEvent)
2020-10-02 13:29:56 +00:00
2020-10-09 05:21:42 +00:00
tradeUpdateEventCallbacks []func(e TradeUpdateEvent)
tradeSnapshotEventCallbacks []func(e TradeSnapshotEvent)
orderUpdateEventCallbacks []func(e OrderUpdateEvent)
orderSnapshotEventCallbacks []func(e OrderSnapshotEvent)
2020-10-02 13:29:56 +00:00
accountSnapshotEventCallbacks []func(e AccountSnapshotEvent)
2020-10-09 05:21:42 +00:00
accountUpdateEventCallbacks []func(e AccountUpdateEvent)
2020-10-01 08:07:18 +00:00
}
2020-10-01 08:48:08 +00:00
func NewWebSocketService(wsURL string, key, secret string) *WebSocketService {
return &WebSocketService{
key: key,
secret: secret,
2020-10-01 08:07:18 +00:00
reconnectC: make(chan struct{}, 1),
2020-10-01 08:48:08 +00:00
baseURL: wsURL,
2020-10-01 08:07:18 +00:00
}
}
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) Connect(ctx context.Context) error {
2020-10-02 13:15:00 +00:00
s.OnConnect(func(c *websocket.Conn) {
if err := s.SendSubscriptionRequest(SubscribeAction); err != nil {
s.EmitError(err)
logger.WithError(err).Error("failed to subscribe")
}
})
2020-10-01 08:07:18 +00:00
// pre-allocate the websocket client, the websocket client can be used for reconnecting.
2020-10-02 13:29:56 +00:00
if err := s.connect(ctx); err != nil {
2020-10-02 04:43:14 +00:00
return err
}
go s.reconnector(ctx)
2020-10-02 04:43:14 +00:00
return nil
2020-10-01 08:07:18 +00:00
}
2020-10-02 04:43:14 +00:00
func (s *WebSocketService) Auth() error {
2020-10-01 08:48:08 +00:00
nonce := time.Now().UnixNano() / int64(time.Millisecond)
auth := &AuthMessage{
Action: "auth",
APIKey: s.key,
Nonce: nonce,
Signature: signPayload(fmt.Sprintf("%d", nonce), s.secret),
ID: uuid.New().String(),
}
return s.conn.WriteJSON(auth)
}
func (s *WebSocketService) connect(ctx context.Context) error {
2020-10-01 08:07:18 +00:00
dialer := websocket.DefaultDialer
2020-10-01 08:48:08 +00:00
conn, _, err := dialer.DialContext(ctx, s.baseURL, nil)
2020-10-01 08:07:18 +00:00
if err != nil {
return err
}
2021-05-18 05:59:58 +00:00
s.mu.Lock()
2020-10-01 08:48:08 +00:00
s.conn = conn
2021-05-18 05:59:58 +00:00
s.mu.Unlock()
2020-10-02 04:43:14 +00:00
s.EmitConnect(conn)
2020-10-02 13:15:00 +00:00
go s.read(ctx)
2020-10-01 08:07:18 +00:00
return nil
}
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) emitReconnect() {
2020-10-01 08:07:18 +00:00
select {
case s.reconnectC <- struct{}{}:
default:
}
}
func (s *WebSocketService) reconnector(ctx context.Context) {
2020-10-01 08:07:18 +00:00
for {
select {
case <-ctx.Done():
return
case <-s.reconnectC:
2021-05-18 05:59:58 +00:00
log.Warnf("received reconnect signal, reconnecting...")
2020-10-01 08:07:18 +00:00
time.Sleep(3 * time.Second)
if err := s.connect(ctx); err != nil {
s.emitReconnect()
}
}
}
}
func (s *WebSocketService) read(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
2020-10-01 08:07:18 +00:00
default:
s.mu.Lock()
if err := s.conn.SetReadDeadline(time.Now().Add(time.Second * 5)); err != nil {
log.WithError(err).Error("can not set read deadline")
}
2020-10-01 08:48:08 +00:00
mt, msg, err := s.conn.ReadMessage()
s.mu.Unlock()
2020-10-01 08:07:18 +00:00
if err != nil {
// if it's a network timeout error, we should re-connect
switch err := err.(type) {
// if it's a websocket related error
case *websocket.CloseError:
if err.Code == websocket.CloseNormalClosure {
return
}
// for unexpected close error, we should re-connect
// emit reconnect to start a new connection
s.EmitDisconnect()
s.emitReconnect()
return
case net.Error:
s.EmitDisconnect()
s.emitReconnect()
return
default:
log.WithError(err).Error("unexpected websocket connection error")
continue
}
2020-10-01 08:07:18 +00:00
}
if mt != websocket.TextMessage {
continue
}
s.EmitMessage(msg)
2020-10-02 04:43:14 +00:00
m, err := ParseMessage(msg)
2020-10-01 08:07:18 +00:00
if err != nil {
2020-12-28 08:24:57 +00:00
s.EmitError(errors.Wrapf(err, "failed to parse message: %s", msg))
2020-10-01 08:07:18 +00:00
continue
}
2020-12-29 10:18:32 +00:00
if m != nil {
s.dispatch(m)
}
2020-10-01 08:07:18 +00:00
}
}
}
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) dispatch(msg interface{}) {
2020-10-01 08:07:18 +00:00
switch e := msg.(type) {
2020-10-02 13:29:56 +00:00
2020-10-01 08:07:18 +00:00
case *BookEvent:
s.EmitBookEvent(*e)
2020-10-02 13:29:56 +00:00
case *PublicTradeEvent:
2020-10-01 08:07:18 +00:00
s.EmitTradeEvent(*e)
2020-10-02 13:29:56 +00:00
case *KLineEvent:
s.EmitKLineEvent(*e)
2020-10-01 08:07:18 +00:00
case *ErrorEvent:
s.EmitErrorEvent(*e)
2020-10-02 13:29:56 +00:00
2020-10-01 08:07:18 +00:00
case *SubscriptionEvent:
s.EmitSubscriptionEvent(*e)
2020-10-02 13:29:56 +00:00
case *TradeSnapshotEvent:
s.EmitTradeSnapshotEvent(*e)
case *TradeUpdateEvent:
s.EmitTradeUpdateEvent(*e)
case *AccountSnapshotEvent:
s.EmitAccountSnapshotEvent(*e)
case *AccountUpdateEvent:
s.EmitAccountUpdateEvent(*e)
case *OrderSnapshotEvent:
s.EmitOrderSnapshotEvent(*e)
case *OrderUpdateEvent:
s.EmitOrderUpdateEvent(*e)
2020-10-01 08:07:18 +00:00
default:
2020-11-09 08:34:35 +00:00
s.EmitError(fmt.Errorf("unsupported %T event: %+v", e, e))
2020-10-01 08:07:18 +00:00
}
}
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) ClearSubscriptions() {
2020-10-01 08:07:18 +00:00
s.Subscriptions = nil
}
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) Reconnect() {
2020-10-01 08:07:18 +00:00
logger.Info("reconnecting...")
s.emitReconnect()
}
// Subscribe is a helper method for building subscription request from the internal mapping types.
// (Internal public method)
2021-02-22 09:36:23 +00:00
func (s *WebSocketService) Subscribe(channel, market string, options SubscribeOptions) {
2020-10-01 08:07:18 +00:00
s.AddSubscription(Subscription{
Channel: channel,
Market: market,
Depth: options.Depth,
2021-02-22 09:36:23 +00:00
Resolution: options.Resolution,
2020-10-01 08:07:18 +00:00
})
}
// AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint.
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) AddSubscription(subscription Subscription) {
2020-10-01 08:07:18 +00:00
s.Subscriptions = append(s.Subscriptions, subscription)
}
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) Resubscribe() {
2020-10-01 08:07:18 +00:00
// Calling Resubscribe() by websocket is not enough to refresh orderbook.
// We still need to get orderbook snapshot by rest client.
// Therefore Reconnect() is used to simplify implementation.
logger.Info("resubscribing all subscription...")
if err := s.SendSubscriptionRequest(UnsubscribeAction); err != nil {
logger.WithError(err).Error("failed to unsubscribe")
}
if err := s.SendSubscriptionRequest(SubscribeAction); err != nil {
logger.WithError(err).Error("failed to unsubscribe")
}
}
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) SendSubscriptionRequest(action string) error {
2020-10-01 08:07:18 +00:00
request := WebsocketCommand{
Action: action,
Subscriptions: s.Subscriptions,
}
logger.Debugf("sending websocket subscription: %+v", request)
2020-10-01 08:48:08 +00:00
if err := s.conn.WriteJSON(request); err != nil {
2020-10-01 08:07:18 +00:00
return errors.Wrap(err, "Failed to send subscribe event")
}
return nil
}
// Close web socket connection
2020-10-01 08:48:08 +00:00
func (s *WebSocketService) Close() error {
return s.conn.Close()
2020-10-01 08:07:18 +00:00
}