Merge pull request #1335 from c9s/c9s/feature/private-channels

FEATURE: add custom private channel support to max
This commit is contained in:
c9s 2023-10-12 17:20:06 +08:00 committed by GitHub
commit 20df2ef3c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 5 deletions

View File

@ -94,6 +94,7 @@ type NotificationConfig struct {
type LoggingConfig struct {
Trade bool `json:"trade,omitempty"`
Order bool `json:"order,omitempty"`
Balance bool `json:"balance,omitempty"`
FilledOrderOnly bool `json:"filledOrder,omitempty"`
Fields map[string]interface{} `json:"fields,omitempty"`
}

View File

@ -51,7 +51,13 @@ type ExchangeSession struct {
TakerFeeRate fixedpoint.Value `json:"takerFeeRate" yaml:"takerFeeRate"`
ModifyOrderAmountForFee bool `json:"modifyOrderAmountForFee" yaml:"modifyOrderAmountForFee"`
PublicOnly bool `json:"publicOnly,omitempty" yaml:"publicOnly"`
// PublicOnly is used for setting the session to public only (without authentication, no private user data)
PublicOnly bool `json:"publicOnly,omitempty" yaml:"publicOnly"`
// PrivateChannels is used for filtering the private user data channel, .e.g, orders, trades, balances.. etc
// This option is exchange specific
PrivateChannels []string `json:"privateChannels,omitempty" yaml:"privateChannels,omitempty"`
Margin bool `json:"margin,omitempty" yaml:"margin"`
IsolatedMargin bool `json:"isolatedMargin,omitempty" yaml:"isolatedMargin,omitempty"`
IsolatedMarginSymbol string `json:"isolatedMarginSymbol,omitempty" yaml:"isolatedMarginSymbol,omitempty"`
@ -237,8 +243,13 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
// query and initialize the balances
if !session.PublicOnly {
logger.Infof("querying account balances...")
if len(session.PrivateChannels) > 0 {
if setter, ok := session.UserDataStream.(types.PrivateChannelSetter); ok {
setter.SetPrivateChannels(session.PrivateChannels)
}
}
logger.Infof("querying account balances...")
account, err := session.Exchange.QueryAccount(ctx)
if err != nil {
return err
@ -276,6 +287,15 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
}
if environ.loggingConfig != nil {
if environ.loggingConfig.Balance {
session.UserDataStream.OnBalanceSnapshot(func(balances types.BalanceMap) {
logger.Info(balances.String())
})
session.UserDataStream.OnBalanceUpdate(func(balances types.BalanceMap) {
logger.Info(balances.String())
})
}
if environ.loggingConfig.Trade {
session.UserDataStream.OnTradeUpdate(func(trade types.Trade) {
logger.Info(trade.String())

View File

@ -5,8 +5,8 @@ import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
"strconv"
"time"
"github.com/google/uuid"
@ -22,6 +22,8 @@ type Stream struct {
key, secret string
privateChannels []string
authEventCallbacks []func(e max.AuthEvent)
bookEventCallbacks []func(e max.BookEvent)
tradeEventCallbacks []func(e max.PublicTradeEvent)
@ -55,6 +57,7 @@ func NewStream(key, secret string) *Stream {
log.Infof("max websocket connection authenticated: %+v", e)
stream.EmitAuth()
})
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent)
stream.OnOrderUpdateEvent(stream.handleOrderUpdateEvent)
@ -73,6 +76,10 @@ func (s *Stream) getEndpoint(ctx context.Context) (string, error) {
return url, nil
}
func (s *Stream) SetPrivateChannels(channels []string) {
s.privateChannels = channels
}
func (s *Stream) handleConnect() {
if s.PublicOnly {
cmd := &max.WebsocketCommand{
@ -109,7 +116,11 @@ func (s *Stream) handleConnect() {
} else {
var filters []string
if s.MarginSettings.IsMargin {
if len(s.privateChannels) > 0 {
// TODO: maybe check the valid private channels
filters = s.privateChannels
} else if s.MarginSettings.IsMargin {
filters = []string{
"mwallet_order",
"mwallet_trade",
@ -119,6 +130,8 @@ func (s *Stream) handleConnect() {
}
}
log.Debugf("user data websocket filters: %v", filters)
nonce := time.Now().UnixNano() / int64(time.Millisecond)
auth := &max.AuthMessage{
// pragma: allowlist nextline secret
@ -126,7 +139,7 @@ func (s *Stream) handleConnect() {
// pragma: allowlist nextline secret
APIKey: s.key,
Nonce: nonce,
Signature: signPayload(fmt.Sprintf("%d", nonce), s.secret),
Signature: signPayload(strconv.FormatInt(nonce, 10), s.secret),
ID: uuid.New().String(),
Filters: filters,
}

View File

@ -42,6 +42,10 @@ type Stream interface {
Close() error
}
type PrivateChannelSetter interface {
SetPrivateChannels(channels []string)
}
type Unsubscriber interface {
// Unsubscribe unsubscribes the all subscriptions.
Unsubscribe()