diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 63e9f2fb9..1da074846 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -218,7 +218,7 @@ func NewExchangeSessionFromConfig(name string, sessionConfig *ExchangeSession) ( var exchange types.Exchange if sessionConfig.Key != "" && sessionConfig.Secret != "" { - exchange, err = cmdutil.NewExchangeStandard(exchangeName, sessionConfig.Key, sessionConfig.Secret) + exchange, err = cmdutil.NewExchangeStandard(exchangeName, sessionConfig.Key, sessionConfig.Secret, sessionConfig.SubAccount) } else { exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, sessionConfig.EnvVarPrefix) } diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 9f82e32c4..7dee34ae9 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -110,6 +110,7 @@ type ExchangeSession struct { EnvVarPrefix string `json:"envVarPrefix" yaml:"envVarPrefix"` Key string `json:"key,omitempty" yaml:"key,omitempty"` Secret string `json:"secret,omitempty" yaml:"secret,omitempty"` + SubAccount string `json:"subAccount,omitempty" yaml:"subAccount,omitempty"` PublicOnly bool `json:"publicOnly,omitempty" yaml:"publicOnly"` Margin bool `json:"margin,omitempty" yaml:"margin"` @@ -560,4 +561,3 @@ func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err err return symbols, nil } - diff --git a/pkg/cmd/cmdutil/exchange.go b/pkg/cmd/cmdutil/exchange.go index dfbd1c9f6..7bf63269d 100644 --- a/pkg/cmd/cmdutil/exchange.go +++ b/pkg/cmd/cmdutil/exchange.go @@ -8,17 +8,21 @@ import ( "github.com/pkg/errors" "github.com/c9s/bbgo/pkg/exchange/binance" + "github.com/c9s/bbgo/pkg/exchange/ftx" "github.com/c9s/bbgo/pkg/exchange/max" "github.com/c9s/bbgo/pkg/types" ) -func NewExchangeStandard(n types.ExchangeName, key, secret string) (types.Exchange, error) { +func NewExchangeStandard(n types.ExchangeName, key, secret, subAccount string) (types.Exchange, error) { if len(key) == 0 || len(secret) == 0 { return nil, errors.New("binance: empty key or secret") } switch n { + case types.ExchangeFTX: + return ftx.NewExchange(key, secret, subAccount), nil + case types.ExchangeBinance: return binance.New(key, secret), nil @@ -44,7 +48,8 @@ func NewExchangeWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types. return nil, fmt.Errorf("%s: empty key or secret, env var prefix: %s", n, varPrefix) } - return NewExchangeStandard(n, key, secret) + subAccount := os.Getenv(varPrefix + "_SUBACCOUNT") + return NewExchangeStandard(n, key, secret, subAccount) } // NewExchange constructor exchange object from viper config. diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index ba2a99f52..e0a0fb09d 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -12,26 +12,35 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -// go run ./cmd/bbgo orderbook --session=ftx --symbol=BTC/USDT +// go run ./cmd/bbgo orderbook --exchange=ftx --symbol=BTC/USDT var orderbookCmd = &cobra.Command{ - Use: "orderbook", + Use: "orderbook", + Short: "connect to the order book market data streaming service of an exchange", RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - session, err := cmd.Flags().GetString("session") + + exName, err := cmd.Flags().GetString("exchange") if err != nil { - return fmt.Errorf("can't get session from flags: %w", err) + return fmt.Errorf("can not get exchange from flags: %w", err) } - ex, err := newExchange(session) + + exchangeName, err := types.ValidExchangeName(exName) + if err != nil { + return err + } + + ex, err := cmdutil.NewExchange(exchangeName) if err != nil { return err } symbol, err := cmd.Flags().GetString("symbol") if err != nil { - return fmt.Errorf("can't get the symbol from flags: %w", err) + return fmt.Errorf("can not get the symbol from flags: %w", err) } + if symbol == "" { - return fmt.Errorf("symbol is not found") + return fmt.Errorf("--symbol option is required") } s := ex.NewStream() @@ -43,8 +52,9 @@ var orderbookCmd = &cobra.Command{ log.Infof("orderbook update: %s", book.String()) }) + log.Infof("connecting...") if err := s.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect to %s", session) + return fmt.Errorf("failed to connect to %s", exchangeName) } cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) @@ -53,8 +63,8 @@ var orderbookCmd = &cobra.Command{ } func init() { - orderbookCmd.Flags().String("session", "", "the exchange session name for sync") - orderbookCmd.Flags().String("symbol", "", "the trading pair, like btcusdt") - + // since the public data does not require trading authentication, we use --exchange option here. + orderbookCmd.Flags().String("exchange", "", "the exchange name for sync") + orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...") RootCmd.AddCommand(orderbookCmd) } diff --git a/pkg/exchange/max/maxapi/websocket.go b/pkg/exchange/max/maxapi/websocket.go index 19e9acb5b..f005416f2 100644 --- a/pkg/exchange/max/maxapi/websocket.go +++ b/pkg/exchange/max/maxapi/websocket.go @@ -3,11 +3,13 @@ package max import ( "context" "fmt" + "sync" "time" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" ) var WebSocketURL = "wss://max-stream.maicoin.com/ws" @@ -42,6 +44,7 @@ var UnsubscribeAction = "unsubscribe" type WebSocketService struct { baseURL, key, secret string + mu sync.Mutex conn *websocket.Conn reconnectC chan struct{} @@ -90,7 +93,8 @@ func (s *WebSocketService) Connect(ctx context.Context) error { if err := s.connect(ctx); err != nil { return err } - go s.read(ctx) + + go s.reconnector(ctx) return nil } @@ -107,6 +111,9 @@ func (s *WebSocketService) Auth() error { } func (s *WebSocketService) connect(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + dialer := websocket.DefaultDialer conn, _, err := dialer.DialContext(ctx, s.baseURL, nil) if err != nil { @@ -116,6 +123,8 @@ func (s *WebSocketService) connect(ctx context.Context) error { s.conn = conn s.EmitConnect(conn) + go s.read(ctx) + return nil } @@ -126,7 +135,7 @@ func (s *WebSocketService) emitReconnect() { } } -func (s *WebSocketService) read(ctx context.Context) { +func (s *WebSocketService) reconnector(ctx context.Context) { for { select { case <-ctx.Done(): @@ -137,12 +146,29 @@ func (s *WebSocketService) read(ctx context.Context) { if err := s.connect(ctx); err != nil { s.emitReconnect() } + } + } +} + +func (s *WebSocketService) read(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return default: + s.mu.Lock() mt, msg, err := s.conn.ReadMessage() + s.mu.Unlock() if err != nil { - s.emitReconnect() + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + // emit reconnect to start a new connection + s.emitReconnect() + return + } + + log.WithError(err).Error("websocket error") continue } @@ -219,9 +245,9 @@ func (s *WebSocketService) Reconnect() { // (Internal public method) func (s *WebSocketService) Subscribe(channel, market string, options SubscribeOptions) { s.AddSubscription(Subscription{ - Channel: channel, - Market: market, - Depth: options.Depth, + Channel: channel, + Market: market, + Depth: options.Depth, Resolution: options.Resolution, }) } diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 018f66484..4e3121765 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -61,20 +61,17 @@ type Exchange interface { PlatformFeeCurrency() string - NewStream() Stream + // required implementation + ExchangeMarketDataService - QueryMarkets(ctx context.Context) (MarketMap, error) + ExchangeTradingService +} +type ExchangeTradingService interface { QueryAccount(ctx context.Context) (*Account, error) QueryAccountBalances(ctx context.Context) (BalanceMap, error) - QueryTicker(ctx context.Context, symbol string) (*Ticker, error) - - QueryTickers(ctx context.Context, symbol ...string) (map[string]Ticker, error) - - QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error) - QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error) @@ -86,6 +83,18 @@ type Exchange interface { CancelOrders(ctx context.Context, orders ...Order) error } +type ExchangeMarketDataService interface { + NewStream() Stream + + QueryMarkets(ctx context.Context) (MarketMap, error) + + QueryTicker(ctx context.Context, symbol string) (*Ticker, error) + + QueryTickers(ctx context.Context, symbol ...string) (map[string]Ticker, error) + + QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error) +} + type ExchangeTransferService interface { QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error) diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 543079aa4..0f89dcd71 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -237,18 +237,23 @@ func (b *OrderBook) String() string { sb.WriteString("BOOK ") sb.WriteString(b.Symbol) sb.WriteString("\n") - sb.WriteString("ASKS:\n") - for i := len(b.Asks) - 1; i >= 0; i-- { - sb.WriteString("- ASK: ") - sb.WriteString(b.Asks[i].String()) - sb.WriteString("\n") + + if len(b.Asks) > 0 { + sb.WriteString("ASKS:\n") + for i := len(b.Asks) - 1; i >= 0; i-- { + sb.WriteString("- ASK: ") + sb.WriteString(b.Asks[i].String()) + sb.WriteString("\n") + } } - sb.WriteString("BIDS:\n") - for _, bid := range b.Bids { - sb.WriteString("- BID: ") - sb.WriteString(bid.String()) - sb.WriteString("\n") + if len(b.Bids) > 0 { + sb.WriteString("BIDS:\n") + for _, bid := range b.Bids { + sb.WriteString("- BID: ") + sb.WriteString(bid.String()) + sb.WriteString("\n") + } } return sb.String() diff --git a/pkg/types/standardstream_callbacks.go b/pkg/types/standardstream_callbacks.go index db33b748b..7777aba72 100644 --- a/pkg/types/standardstream_callbacks.go +++ b/pkg/types/standardstream_callbacks.go @@ -14,6 +14,16 @@ func (stream *StandardStream) EmitConnect() { } } +func (stream *StandardStream) OnDisconnect(cb func()) { + stream.disconnectCallbacks = append(stream.disconnectCallbacks, cb) +} + +func (stream *StandardStream) EmitDisconnect() { + for _, cb := range stream.disconnectCallbacks { + cb() + } +} + func (stream *StandardStream) OnTradeUpdate(cb func(trade Trade)) { stream.tradeUpdateCallbacks = append(stream.tradeUpdateCallbacks, cb) } @@ -97,6 +107,8 @@ func (stream *StandardStream) EmitBookSnapshot(book OrderBook) { type StandardStreamEventHub interface { OnConnect(cb func()) + OnDisconnect(cb func()) + OnTradeUpdate(cb func(trade Trade)) OnOrderUpdate(cb func(order Order)) diff --git a/pkg/types/stream.go b/pkg/types/stream.go index a811106ee..5a4f35423 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -25,6 +25,8 @@ type StandardStream struct { connectCallbacks []func() + disconnectCallbacks []func() + // private trade update callbacks tradeUpdateCallbacks []func(trade Trade)