mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 17:13:51 +00:00
347 lines
8.1 KiB
Go
347 lines
8.1 KiB
Go
package max
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/pkg/errors"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var WebSocketURL = "wss://max-stream.maicoin.com/ws"
|
|
|
|
var ErrMessageTypeNotSupported = errors.New("message type currently not supported")
|
|
|
|
type SubscribeOptions struct {
|
|
Depth int `json:"depth,omitempty"`
|
|
Resolution string `json:"resolution,omitempty"`
|
|
}
|
|
|
|
// Subscription is used for presenting the subscription metadata.
|
|
// This is used for sending subscribe and unsubscribe requests
|
|
type Subscription struct {
|
|
Channel string `json:"channel"`
|
|
Market string `json:"market"`
|
|
Depth int `json:"depth,omitempty"`
|
|
Resolution string `json:"resolution,omitempty"`
|
|
}
|
|
|
|
type WebsocketCommand struct {
|
|
// Action is used for specify the action of the websocket session.
|
|
// Valid values are "subscribe", "unsubscribe" and "auth"
|
|
Action string `json:"action"`
|
|
Subscriptions []Subscription `json:"subscriptions,omitempty"`
|
|
}
|
|
|
|
var SubscribeAction = "subscribe"
|
|
var UnsubscribeAction = "unsubscribe"
|
|
|
|
//go:generate callbackgen -type WebSocketService
|
|
type WebSocketService struct {
|
|
baseURL, key, secret string
|
|
|
|
mu sync.Mutex
|
|
conn *websocket.Conn
|
|
|
|
reconnectC chan struct{}
|
|
|
|
// Subscriptions is the subscription request payloads that will be used for sending subscription request
|
|
Subscriptions []Subscription
|
|
|
|
connectCallbacks []func(conn *websocket.Conn)
|
|
disconnectCallbacks []func()
|
|
|
|
errorCallbacks []func(err error)
|
|
messageCallbacks []func(message []byte)
|
|
bookEventCallbacks []func(e BookEvent)
|
|
tradeEventCallbacks []func(e PublicTradeEvent)
|
|
kLineEventCallbacks []func(e KLineEvent)
|
|
errorEventCallbacks []func(e ErrorEvent)
|
|
subscriptionEventCallbacks []func(e SubscriptionEvent)
|
|
|
|
tradeUpdateEventCallbacks []func(e TradeUpdateEvent)
|
|
tradeSnapshotEventCallbacks []func(e TradeSnapshotEvent)
|
|
orderUpdateEventCallbacks []func(e OrderUpdateEvent)
|
|
orderSnapshotEventCallbacks []func(e OrderSnapshotEvent)
|
|
|
|
accountSnapshotEventCallbacks []func(e AccountSnapshotEvent)
|
|
accountUpdateEventCallbacks []func(e AccountUpdateEvent)
|
|
}
|
|
|
|
func NewWebSocketService(wsURL string, key, secret string) *WebSocketService {
|
|
return &WebSocketService{
|
|
key: key,
|
|
secret: secret,
|
|
reconnectC: make(chan struct{}, 1),
|
|
baseURL: wsURL,
|
|
}
|
|
}
|
|
|
|
func (s *WebSocketService) Connect(ctx context.Context) error {
|
|
s.OnConnect(func(c *websocket.Conn) {
|
|
if err := s.SendSubscriptionRequest(SubscribeAction); err != nil {
|
|
s.EmitError(err)
|
|
logger.WithError(err).Error("failed to subscribe")
|
|
}
|
|
})
|
|
|
|
// pre-allocate the websocket client, the websocket client can be used for reconnecting.
|
|
if err := s.connect(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
go s.reconnector(ctx)
|
|
go s.ping(ctx)
|
|
return nil
|
|
}
|
|
|
|
func (s *WebSocketService) ping(ctx context.Context) {
|
|
pingTicker := time.NewTicker(5 * time.Second)
|
|
defer pingTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
log.Debug("ping worker stopped")
|
|
return
|
|
|
|
case <-pingTicker.C:
|
|
s.mu.Lock()
|
|
conn := s.conn
|
|
s.mu.Unlock()
|
|
|
|
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil {
|
|
log.WithError(err).Error("ping error", err)
|
|
s.Reconnect()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *WebSocketService) Auth() error {
|
|
nonce := time.Now().UnixNano() / int64(time.Millisecond)
|
|
auth := &AuthMessage{
|
|
Action: "auth",
|
|
APIKey: s.key,
|
|
Nonce: nonce,
|
|
Signature: signPayload(fmt.Sprintf("%d", nonce), s.secret),
|
|
ID: uuid.New().String(),
|
|
}
|
|
return s.conn.WriteJSON(auth)
|
|
}
|
|
|
|
func (s *WebSocketService) connect(ctx context.Context) error {
|
|
dialer := websocket.DefaultDialer
|
|
conn, _, err := dialer.DialContext(ctx, s.baseURL, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
|
|
conn.SetPongHandler(func(string) error {
|
|
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
|
|
return nil
|
|
})
|
|
|
|
s.mu.Lock()
|
|
s.conn = conn
|
|
s.mu.Unlock()
|
|
|
|
s.EmitConnect(conn)
|
|
|
|
go s.read(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *WebSocketService) emitReconnect() {
|
|
select {
|
|
case s.reconnectC <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *WebSocketService) reconnector(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case <-s.reconnectC:
|
|
log.Warnf("received reconnect signal, reconnecting...")
|
|
time.Sleep(3 * time.Second)
|
|
if err := s.connect(ctx); err != nil {
|
|
s.emitReconnect()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *WebSocketService) read(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
default:
|
|
s.mu.Lock()
|
|
conn := s.conn
|
|
s.mu.Unlock()
|
|
|
|
if err := conn.SetReadDeadline(time.Now().Add(time.Second * 5)); err != nil {
|
|
log.WithError(err).Error("can not set read deadline")
|
|
}
|
|
|
|
mt, msg, err := conn.ReadMessage()
|
|
|
|
if err != nil {
|
|
// if it's a network timeout error, we should re-connect
|
|
switch err := err.(type) {
|
|
|
|
// if it's a websocket related error
|
|
case *websocket.CloseError:
|
|
if err.Code == websocket.CloseNormalClosure {
|
|
return
|
|
}
|
|
// for unexpected close error, we should re-connect
|
|
// emit reconnect to start a new connection
|
|
s.EmitDisconnect()
|
|
s.emitReconnect()
|
|
return
|
|
|
|
case net.Error:
|
|
s.EmitDisconnect()
|
|
s.emitReconnect()
|
|
return
|
|
|
|
default:
|
|
log.WithError(err).Error("unexpected websocket connection error")
|
|
continue
|
|
}
|
|
}
|
|
|
|
if mt != websocket.TextMessage {
|
|
continue
|
|
}
|
|
|
|
s.EmitMessage(msg)
|
|
|
|
m, err := ParseMessage(msg)
|
|
if err != nil {
|
|
s.EmitError(errors.Wrapf(err, "failed to parse message: %s", msg))
|
|
continue
|
|
}
|
|
|
|
if m != nil {
|
|
s.dispatch(m)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *WebSocketService) dispatch(msg interface{}) {
|
|
switch e := msg.(type) {
|
|
|
|
case *BookEvent:
|
|
s.EmitBookEvent(*e)
|
|
|
|
case *PublicTradeEvent:
|
|
s.EmitTradeEvent(*e)
|
|
|
|
case *KLineEvent:
|
|
s.EmitKLineEvent(*e)
|
|
|
|
case *ErrorEvent:
|
|
s.EmitErrorEvent(*e)
|
|
|
|
case *SubscriptionEvent:
|
|
s.EmitSubscriptionEvent(*e)
|
|
|
|
case *TradeSnapshotEvent:
|
|
s.EmitTradeSnapshotEvent(*e)
|
|
|
|
case *TradeUpdateEvent:
|
|
s.EmitTradeUpdateEvent(*e)
|
|
|
|
case *AccountSnapshotEvent:
|
|
s.EmitAccountSnapshotEvent(*e)
|
|
|
|
case *AccountUpdateEvent:
|
|
s.EmitAccountUpdateEvent(*e)
|
|
|
|
case *OrderSnapshotEvent:
|
|
s.EmitOrderSnapshotEvent(*e)
|
|
|
|
case *OrderUpdateEvent:
|
|
s.EmitOrderUpdateEvent(*e)
|
|
|
|
default:
|
|
s.EmitError(fmt.Errorf("unsupported %T event: %+v", e, e))
|
|
}
|
|
}
|
|
|
|
func (s *WebSocketService) ClearSubscriptions() {
|
|
s.Subscriptions = nil
|
|
}
|
|
|
|
func (s *WebSocketService) Reconnect() {
|
|
logger.Info("reconnecting...")
|
|
s.emitReconnect()
|
|
}
|
|
|
|
// Subscribe is a helper method for building subscription request from the internal mapping types.
|
|
// (Internal public method)
|
|
func (s *WebSocketService) Subscribe(channel, market string, options SubscribeOptions) {
|
|
s.AddSubscription(Subscription{
|
|
Channel: channel,
|
|
Market: market,
|
|
Depth: options.Depth,
|
|
Resolution: options.Resolution,
|
|
})
|
|
}
|
|
|
|
// AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint.
|
|
func (s *WebSocketService) AddSubscription(subscription Subscription) {
|
|
s.Subscriptions = append(s.Subscriptions, subscription)
|
|
}
|
|
|
|
func (s *WebSocketService) Resubscribe() {
|
|
// Calling Resubscribe() by websocket is not enough to refresh orderbook.
|
|
// We still need to get orderbook snapshot by rest client.
|
|
// Therefore Reconnect() is used to simplify implementation.
|
|
logger.Info("resubscribing all subscription...")
|
|
if err := s.SendSubscriptionRequest(UnsubscribeAction); err != nil {
|
|
logger.WithError(err).Error("failed to unsubscribe")
|
|
}
|
|
|
|
if err := s.SendSubscriptionRequest(SubscribeAction); err != nil {
|
|
logger.WithError(err).Error("failed to unsubscribe")
|
|
}
|
|
}
|
|
|
|
func (s *WebSocketService) SendSubscriptionRequest(action string) error {
|
|
request := WebsocketCommand{
|
|
Action: action,
|
|
Subscriptions: s.Subscriptions,
|
|
}
|
|
|
|
logger.Debugf("sending websocket subscription: %+v", request)
|
|
|
|
if err := s.conn.WriteJSON(request); err != nil {
|
|
return errors.Wrap(err, "Failed to send subscribe event")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close web socket connection
|
|
func (s *WebSocketService) Close() error {
|
|
return s.conn.Close()
|
|
}
|