From 78ea940569ba11b8cdcf8b85822f13c08d5f5493 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 2 Oct 2023 17:22:03 +0800 Subject: [PATCH 1/2] max: support private channel setter --- pkg/bbgo/session.go | 15 +++++++++++++-- pkg/exchange/max/stream.go | 19 ++++++++++++++++--- pkg/types/stream.go | 4 ++++ 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 1343eb5d8..80d24d5c6 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -51,7 +51,13 @@ type ExchangeSession struct { TakerFeeRate fixedpoint.Value `json:"takerFeeRate" yaml:"takerFeeRate"` ModifyOrderAmountForFee bool `json:"modifyOrderAmountForFee" yaml:"modifyOrderAmountForFee"` - PublicOnly bool `json:"publicOnly,omitempty" yaml:"publicOnly"` + // PublicOnly is used for setting the session to public only (without authentication, no private user data) + PublicOnly bool `json:"publicOnly,omitempty" yaml:"publicOnly"` + + // PrivateChannels is used for filtering the private user data channel, .e.g, orders, trades, balances.. etc + // This option is exchange specific + PrivateChannels []string `json:"privateChannels,omitempty" yaml:"privateChannels,omitempty"` + Margin bool `json:"margin,omitempty" yaml:"margin"` IsolatedMargin bool `json:"isolatedMargin,omitempty" yaml:"isolatedMargin,omitempty"` IsolatedMarginSymbol string `json:"isolatedMarginSymbol,omitempty" yaml:"isolatedMarginSymbol,omitempty"` @@ -237,8 +243,13 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) // query and initialize the balances if !session.PublicOnly { - logger.Infof("querying account balances...") + if len(session.PrivateChannels) > 0 { + if setter, ok := session.UserDataStream.(types.PrivateChannelSetter); ok { + setter.SetPrivateChannels(session.PrivateChannels) + } + } + logger.Infof("querying account balances...") account, err := session.Exchange.QueryAccount(ctx) if err != nil { return err diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index bd02663b1..72bece012 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -5,8 +5,8 @@ import ( "crypto/hmac" "crypto/sha256" "encoding/hex" - "fmt" "os" + "strconv" "time" "github.com/google/uuid" @@ -22,6 +22,8 @@ type Stream struct { key, secret string + privateChannels []string + authEventCallbacks []func(e max.AuthEvent) bookEventCallbacks []func(e max.BookEvent) tradeEventCallbacks []func(e max.PublicTradeEvent) @@ -55,6 +57,7 @@ func NewStream(key, secret string) *Stream { log.Infof("max websocket connection authenticated: %+v", e) stream.EmitAuth() }) + stream.OnKLineEvent(stream.handleKLineEvent) stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent) stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent) @@ -73,6 +76,10 @@ func (s *Stream) getEndpoint(ctx context.Context) (string, error) { return url, nil } +func (s *Stream) SetPrivateChannels(channels []string) { + s.privateChannels = channels +} + func (s *Stream) handleConnect() { if s.PublicOnly { cmd := &max.WebsocketCommand{ @@ -109,7 +116,11 @@ func (s *Stream) handleConnect() { } else { var filters []string - if s.MarginSettings.IsMargin { + + if len(s.privateChannels) > 0 { + // TODO: maybe check the valid private channels + filters = s.privateChannels + } else if s.MarginSettings.IsMargin { filters = []string{ "mwallet_order", "mwallet_trade", @@ -119,6 +130,8 @@ func (s *Stream) handleConnect() { } } + log.Debugf("user data websocket filters: %v", filters) + nonce := time.Now().UnixNano() / int64(time.Millisecond) auth := &max.AuthMessage{ // pragma: allowlist nextline secret @@ -126,7 +139,7 @@ func (s *Stream) handleConnect() { // pragma: allowlist nextline secret APIKey: s.key, Nonce: nonce, - Signature: signPayload(fmt.Sprintf("%d", nonce), s.secret), + Signature: signPayload(strconv.FormatInt(nonce, 10), s.secret), ID: uuid.New().String(), Filters: filters, } diff --git a/pkg/types/stream.go b/pkg/types/stream.go index 73bd44856..51337f260 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -42,6 +42,10 @@ type Stream interface { Close() error } +type PrivateChannelSetter interface { + SetPrivateChannels(channels []string) +} + type Unsubscriber interface { // Unsubscribe unsubscribes the all subscriptions. Unsubscribe() From 378425a3aa22d72acecb2ee3620e4bf7f2a9bfa6 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 2 Oct 2023 17:35:10 +0800 Subject: [PATCH 2/2] bbgo: add balance logger support --- pkg/bbgo/config.go | 1 + pkg/bbgo/session.go | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/pkg/bbgo/config.go b/pkg/bbgo/config.go index 6abcff37b..4cbc23048 100644 --- a/pkg/bbgo/config.go +++ b/pkg/bbgo/config.go @@ -94,6 +94,7 @@ type NotificationConfig struct { type LoggingConfig struct { Trade bool `json:"trade,omitempty"` Order bool `json:"order,omitempty"` + Balance bool `json:"balance,omitempty"` FilledOrderOnly bool `json:"filledOrder,omitempty"` Fields map[string]interface{} `json:"fields,omitempty"` } diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 80d24d5c6..3d2cc82e9 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -287,6 +287,15 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) } if environ.loggingConfig != nil { + if environ.loggingConfig.Balance { + session.UserDataStream.OnBalanceSnapshot(func(balances types.BalanceMap) { + logger.Info(balances.String()) + }) + session.UserDataStream.OnBalanceUpdate(func(balances types.BalanceMap) { + logger.Info(balances.String()) + }) + } + if environ.loggingConfig.Trade { session.UserDataStream.OnTradeUpdate(func(trade types.Trade) { logger.Info(trade.String())