bbgo_origin/pkg/exchange/max/stream.go

372 lines
8.9 KiB
Go
Raw Normal View History

2020-08-31 04:32:51 +00:00
package max
2020-10-03 10:35:28 +00:00
import (
"context"
2022-01-02 04:00:06 +00:00
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"os"
2023-10-02 09:22:03 +00:00
"strconv"
2020-10-19 14:23:49 +00:00
"time"
2020-10-03 10:35:28 +00:00
2022-01-02 04:00:06 +00:00
"github.com/google/uuid"
2020-11-07 04:38:57 +00:00
2024-10-04 01:53:28 +00:00
"github.com/c9s/bbgo/pkg/depth"
2020-10-11 08:46:15 +00:00
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/types"
2020-10-03 10:35:28 +00:00
)
2022-01-02 04:00:06 +00:00
//go:generate callbackgen -type Stream
2020-10-03 10:35:28 +00:00
type Stream struct {
types.StandardStream
2022-05-25 06:38:09 +00:00
types.MarginSettings
2020-10-03 10:35:28 +00:00
2022-01-02 04:00:06 +00:00
key, secret string
2023-10-02 09:22:03 +00:00
privateChannels []string
authEventCallbacks []func(e max.AuthEvent)
2022-01-02 04:00:06 +00:00
bookEventCallbacks []func(e max.BookEvent)
tradeEventCallbacks []func(e max.PublicTradeEvent)
kLineEventCallbacks []func(e max.KLineEvent)
errorEventCallbacks []func(e max.ErrorEvent)
subscriptionEventCallbacks []func(e max.SubscriptionEvent)
tradeUpdateEventCallbacks []func(e max.TradeUpdateEvent)
tradeSnapshotEventCallbacks []func(e max.TradeSnapshotEvent)
orderUpdateEventCallbacks []func(e max.OrderUpdateEvent)
orderSnapshotEventCallbacks []func(e max.OrderSnapshotEvent)
2022-05-25 12:34:25 +00:00
adRatioEventCallbacks []func(e max.ADRatioEvent)
debtEventCallbacks []func(e max.DebtEvent)
2022-01-02 04:00:06 +00:00
accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent)
accountUpdateEventCallbacks []func(e max.AccountUpdateEvent)
2024-10-04 01:53:28 +00:00
// depthBuffers is used for storing the depth info
depthBuffers map[string]*depth.Buffer
2020-10-03 10:35:28 +00:00
}
2024-10-04 01:53:28 +00:00
func NewStream(ex *Exchange, key, secret string) *Stream {
2022-01-02 04:00:06 +00:00
stream := &Stream{
StandardStream: types.NewStandardStream(),
key: key,
2022-06-17 07:04:23 +00:00
// pragma: allowlist nextline secret
2024-10-04 01:53:28 +00:00
secret: secret,
depthBuffers: make(map[string]*depth.Buffer),
2022-01-02 04:00:06 +00:00
}
stream.SetEndpointCreator(stream.getEndpoint)
stream.SetParser(max.ParseMessage)
stream.SetDispatcher(stream.dispatchEvent)
stream.OnConnect(stream.handleConnect)
2024-10-04 01:53:28 +00:00
stream.OnDisconnect(stream.handleDisconnect)
2023-04-14 10:57:13 +00:00
stream.OnAuthEvent(func(e max.AuthEvent) {
log.Infof("max websocket connection authenticated: %+v", e)
stream.EmitAuth()
2023-04-14 10:57:13 +00:00
})
2023-10-02 09:22:03 +00:00
2022-01-02 04:00:06 +00:00
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent)
stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent)
stream.OnTradeUpdateEvent(stream.handleTradeEvent)
stream.OnAccountSnapshotEvent(stream.handleAccountSnapshotEvent)
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
2024-10-04 05:59:58 +00:00
stream.OnBookEvent(stream.handleBookEvent(ex))
2022-01-02 04:00:06 +00:00
return stream
}
func (s *Stream) getEndpoint(ctx context.Context) (string, error) {
url := os.Getenv("MAX_API_WS_URL")
if url == "" {
url = max.WebSocketURL
}
2022-01-02 04:00:06 +00:00
return url, nil
}
2020-10-03 11:14:15 +00:00
2023-10-02 09:22:03 +00:00
func (s *Stream) SetPrivateChannels(channels []string) {
s.privateChannels = channels
}
func ToLocalDepth(depth types.Depth) int {
if len(depth) > 0 {
switch depth {
case types.DepthLevelFull:
return 50
case types.DepthLevelMedium:
return 20
case types.DepthLevel1:
return 1
case types.DepthLevel5:
return 5
default:
return 20
}
}
return 0
}
2022-01-02 04:00:06 +00:00
func (s *Stream) handleConnect() {
if s.PublicOnly {
cmd := &max.WebsocketCommand{
Action: "subscribe",
2020-11-05 07:04:56 +00:00
}
2022-01-02 04:00:06 +00:00
for _, sub := range s.Subscriptions {
depth := ToLocalDepth(sub.Options.Depth)
2022-01-02 04:00:06 +00:00
cmd.Subscriptions = append(cmd.Subscriptions, max.Subscription{
Channel: string(sub.Channel),
Market: toLocalSymbol(sub.Symbol),
Depth: depth,
2022-05-19 01:48:36 +00:00
Resolution: sub.Options.Interval.String(),
2022-01-02 04:00:06 +00:00
})
}
log.Infof("public subscription commands: %+v", cmd)
2022-05-25 06:40:43 +00:00
if err := s.Conn.WriteJSON(cmd); err != nil {
log.WithError(err).Error("failed to send subscription request")
}
2022-01-02 04:00:06 +00:00
} else {
2022-05-25 06:40:43 +00:00
var filters []string
2023-10-02 09:22:03 +00:00
if len(s.privateChannels) > 0 {
// TODO: maybe check the valid private channels
filters = s.privateChannels
} else if s.MarginSettings.IsMargin {
2022-05-25 06:40:43 +00:00
filters = []string{
"mwallet_order",
"mwallet_trade",
"mwallet_account",
"ad_ratio",
"borrowing",
}
}
2023-10-02 09:22:03 +00:00
log.Debugf("user data websocket filters: %v", filters)
2022-01-02 04:00:06 +00:00
nonce := time.Now().UnixNano() / int64(time.Millisecond)
auth := &max.AuthMessage{
2022-06-17 07:04:23 +00:00
// pragma: allowlist nextline secret
Action: "auth",
// pragma: allowlist nextline secret
2022-01-02 04:00:06 +00:00
APIKey: s.key,
Nonce: nonce,
2023-10-02 09:22:03 +00:00
Signature: signPayload(strconv.FormatInt(nonce, 10), s.secret),
2022-01-02 04:00:06 +00:00
ID: uuid.New().String(),
2022-05-25 06:40:43 +00:00
Filters: filters,
}
2022-05-25 06:40:43 +00:00
2022-01-02 04:00:06 +00:00
if err := s.Conn.WriteJSON(auth); err != nil {
log.WithError(err).Error("failed to send auth request")
2020-10-19 14:23:49 +00:00
}
2022-01-02 04:00:06 +00:00
}
}
2020-10-19 14:23:49 +00:00
2024-10-04 01:53:28 +00:00
func (s *Stream) handleDisconnect() {
log.Debugf("resetting depth snapshots...")
for _, f := range s.depthBuffers {
f.Reset()
}
}
2022-01-02 04:00:06 +00:00
func (s *Stream) handleKLineEvent(e max.KLineEvent) {
kline := e.KLine.KLine()
s.EmitKLine(kline)
if kline.Closed {
s.EmitKLineClosed(kline)
}
}
func (s *Stream) handleOrderSnapshotEvent(e max.OrderSnapshotEvent) {
for _, o := range e.Orders {
globalOrder, err := convertWebSocketOrderUpdate(o)
2020-10-03 10:35:28 +00:00
if err != nil {
2022-01-02 04:00:06 +00:00
log.WithError(err).Error("websocket order snapshot convert error")
continue
2020-10-03 10:35:28 +00:00
}
2022-01-02 04:00:06 +00:00
s.EmitOrderUpdate(*globalOrder)
}
}
2020-10-03 11:14:15 +00:00
2022-01-02 04:00:06 +00:00
func (s *Stream) handleOrderUpdateEvent(e max.OrderUpdateEvent) {
for _, o := range e.Orders {
globalOrder, err := convertWebSocketOrderUpdate(o)
2022-01-02 04:00:06 +00:00
if err != nil {
log.WithError(err).Error("websocket order update convert error")
continue
2020-10-03 11:14:15 +00:00
}
2022-01-02 04:00:06 +00:00
s.EmitOrderUpdate(*globalOrder)
}
}
2020-10-03 11:14:15 +00:00
2022-01-02 04:00:06 +00:00
func (s *Stream) handleTradeEvent(e max.TradeUpdateEvent) {
for _, tradeUpdate := range e.Trades {
trade, err := convertWebSocketTrade(tradeUpdate)
if err != nil {
log.WithError(err).Error("websocket trade update convert error")
return
2020-10-03 11:14:15 +00:00
}
2022-01-02 04:00:06 +00:00
s.EmitTradeUpdate(*trade)
}
}
2020-10-03 11:14:15 +00:00
2024-10-04 05:59:58 +00:00
func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
2024-10-04 01:53:28 +00:00
return func(e max.BookEvent) {
symbol := toGlobalSymbol(e.Market)
f, ok := s.depthBuffers[symbol]
if !ok {
bookDepth := 0
for _, subscription := range s.Subscriptions {
if subscription.Channel == types.BookChannel && toLocalSymbol(subscription.Symbol) == e.Market {
bookDepth = ToLocalDepth(subscription.Options.Depth)
break
}
}
// the default depth of websocket channel is 50, we need to make sure both RESTful API and WS channel use the same depth
if bookDepth == 0 {
bookDepth = 50
}
2024-10-04 01:53:28 +00:00
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
log.Infof("fetching %s depth with depth = %d...", e.Market, bookDepth)
2024-10-04 01:53:28 +00:00
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
return ex.QueryDepth(context.Background(), e.Market, bookDepth)
2024-10-04 01:53:28 +00:00
})
2024-11-15 10:09:15 +00:00
f.SetBufferingPeriod(3 * time.Second)
2024-10-04 01:53:28 +00:00
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
s.EmitBookSnapshot(snapshot)
for _, u := range updates {
s.EmitBookUpdate(u.Object)
}
})
f.OnPush(func(update depth.Update) {
s.EmitBookUpdate(update.Object)
})
s.depthBuffers[symbol] = f
}
// if we receive orderbook event with both asks and bids are empty, it means we need to rebuild this orderbook
shouldReset := len(e.Asks) == 0 && len(e.Bids) == 0
if shouldReset {
f.Reset()
return
}
if err := f.AddUpdate(types.SliceOrderBook{
Symbol: symbol,
Time: e.Time(),
Bids: e.Bids,
Asks: e.Asks,
}, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("found missing %s update event", e.Market)
}
2024-10-04 01:53:28 +00:00
}
}
2024-11-14 17:48:32 +00:00
func (s *Stream) String() string {
ss := "max.Stream"
if s.PublicOnly {
ss += " (public only)"
} else {
ss += " (user data)"
}
if s.MarginSettings.IsMargin {
ss += " (margin)"
}
return ss
}
2022-01-02 04:00:06 +00:00
func (s *Stream) handleAccountSnapshotEvent(e max.AccountSnapshotEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
2021-02-22 09:36:23 +00:00
if err != nil {
2022-01-02 04:00:06 +00:00
continue
2021-02-22 09:36:23 +00:00
}
2022-01-02 04:00:06 +00:00
snapshot[balance.Currency] = *balance
2021-02-22 09:36:23 +00:00
}
2022-01-02 04:00:06 +00:00
s.EmitBalanceSnapshot(snapshot)
2020-10-03 11:14:15 +00:00
}
2022-01-02 04:00:06 +00:00
func (s *Stream) handleAccountUpdateEvent(e max.AccountUpdateEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
if err != nil {
continue
}
2022-01-02 04:00:06 +00:00
snapshot[toGlobalCurrency(balance.Currency)] = *balance
}
2020-10-03 10:35:28 +00:00
2022-01-02 04:00:06 +00:00
s.EmitBalanceUpdate(snapshot)
2020-10-03 10:35:28 +00:00
}
2020-10-19 14:23:49 +00:00
2022-01-02 04:00:06 +00:00
func (s *Stream) dispatchEvent(e interface{}) {
switch e := e.(type) {
case *max.AuthEvent:
s.EmitAuthEvent(*e)
2022-01-02 04:00:06 +00:00
case *max.BookEvent:
s.EmitBookEvent(*e)
case *max.PublicTradeEvent:
s.EmitTradeEvent(*e)
case *max.KLineEvent:
s.EmitKLineEvent(*e)
case *max.ErrorEvent:
s.EmitErrorEvent(*e)
case *max.SubscriptionEvent:
s.EmitSubscriptionEvent(*e)
case *max.TradeSnapshotEvent:
s.EmitTradeSnapshotEvent(*e)
case *max.TradeUpdateEvent:
s.EmitTradeUpdateEvent(*e)
case *max.AccountSnapshotEvent:
s.EmitAccountSnapshotEvent(*e)
case *max.AccountUpdateEvent:
s.EmitAccountUpdateEvent(*e)
case *max.OrderSnapshotEvent:
s.EmitOrderSnapshotEvent(*e)
case *max.OrderUpdateEvent:
s.EmitOrderUpdateEvent(*e)
2022-05-25 12:34:25 +00:00
case *max.ADRatioEvent:
s.EmitAdRatioEvent(*e)
2022-05-26 10:07:17 +00:00
case *max.DebtEvent:
s.EmitDebtEvent(*e)
2022-05-25 12:34:25 +00:00
2022-01-02 04:00:06 +00:00
default:
2022-05-25 12:34:25 +00:00
log.Warnf("unhandled %T event: %+v", e, e)
2022-01-02 04:00:06 +00:00
}
}
func signPayload(payload string, secret string) string {
var sig = hmac.New(sha256.New, []byte(secret))
_, err := sig.Write([]byte(payload))
if err != nil {
return ""
}
return hex.EncodeToString(sig.Sum(nil))
}