Merge pull request #155 from c9s/refactor/orderbook-cmd

refactor: revise orderbook cmd and add ftx to the standard session factory
This commit is contained in:
Yo-An Lin 2021-03-15 13:24:49 +08:00 committed by GitHub
commit a8a914f696
9 changed files with 108 additions and 39 deletions

View File

@ -218,7 +218,7 @@ func NewExchangeSessionFromConfig(name string, sessionConfig *ExchangeSession) (
var exchange types.Exchange
if sessionConfig.Key != "" && sessionConfig.Secret != "" {
exchange, err = cmdutil.NewExchangeStandard(exchangeName, sessionConfig.Key, sessionConfig.Secret)
exchange, err = cmdutil.NewExchangeStandard(exchangeName, sessionConfig.Key, sessionConfig.Secret, sessionConfig.SubAccount)
} else {
exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, sessionConfig.EnvVarPrefix)
}

View File

@ -110,6 +110,7 @@ type ExchangeSession struct {
EnvVarPrefix string `json:"envVarPrefix" yaml:"envVarPrefix"`
Key string `json:"key,omitempty" yaml:"key,omitempty"`
Secret string `json:"secret,omitempty" yaml:"secret,omitempty"`
SubAccount string `json:"subAccount,omitempty" yaml:"subAccount,omitempty"`
PublicOnly bool `json:"publicOnly,omitempty" yaml:"publicOnly"`
Margin bool `json:"margin,omitempty" yaml:"margin"`
@ -560,4 +561,3 @@ func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err err
return symbols, nil
}

View File

@ -8,17 +8,21 @@ import (
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/exchange/ftx"
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/types"
)
func NewExchangeStandard(n types.ExchangeName, key, secret string) (types.Exchange, error) {
func NewExchangeStandard(n types.ExchangeName, key, secret, subAccount string) (types.Exchange, error) {
if len(key) == 0 || len(secret) == 0 {
return nil, errors.New("binance: empty key or secret")
}
switch n {
case types.ExchangeFTX:
return ftx.NewExchange(key, secret, subAccount), nil
case types.ExchangeBinance:
return binance.New(key, secret), nil
@ -44,7 +48,8 @@ func NewExchangeWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types.
return nil, fmt.Errorf("%s: empty key or secret, env var prefix: %s", n, varPrefix)
}
return NewExchangeStandard(n, key, secret)
subAccount := os.Getenv(varPrefix + "_SUBACCOUNT")
return NewExchangeStandard(n, key, secret, subAccount)
}
// NewExchange constructor exchange object from viper config.

View File

@ -12,26 +12,35 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
// go run ./cmd/bbgo orderbook --session=ftx --symbol=BTC/USDT
// go run ./cmd/bbgo orderbook --exchange=ftx --symbol=BTC/USDT
var orderbookCmd = &cobra.Command{
Use: "orderbook",
Use: "orderbook",
Short: "connect to the order book market data streaming service of an exchange",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
session, err := cmd.Flags().GetString("session")
exName, err := cmd.Flags().GetString("exchange")
if err != nil {
return fmt.Errorf("can't get session from flags: %w", err)
return fmt.Errorf("can not get exchange from flags: %w", err)
}
ex, err := newExchange(session)
exchangeName, err := types.ValidExchangeName(exName)
if err != nil {
return err
}
ex, err := cmdutil.NewExchange(exchangeName)
if err != nil {
return err
}
symbol, err := cmd.Flags().GetString("symbol")
if err != nil {
return fmt.Errorf("can't get the symbol from flags: %w", err)
return fmt.Errorf("can not get the symbol from flags: %w", err)
}
if symbol == "" {
return fmt.Errorf("symbol is not found")
return fmt.Errorf("--symbol option is required")
}
s := ex.NewStream()
@ -43,8 +52,9 @@ var orderbookCmd = &cobra.Command{
log.Infof("orderbook update: %s", book.String())
})
log.Infof("connecting...")
if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", session)
return fmt.Errorf("failed to connect to %s", exchangeName)
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
@ -53,8 +63,8 @@ var orderbookCmd = &cobra.Command{
}
func init() {
orderbookCmd.Flags().String("session", "", "the exchange session name for sync")
orderbookCmd.Flags().String("symbol", "", "the trading pair, like btcusdt")
// since the public data does not require trading authentication, we use --exchange option here.
orderbookCmd.Flags().String("exchange", "", "the exchange name for sync")
orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...")
RootCmd.AddCommand(orderbookCmd)
}

View File

@ -3,11 +3,13 @@ package max
import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
var WebSocketURL = "wss://max-stream.maicoin.com/ws"
@ -42,6 +44,7 @@ var UnsubscribeAction = "unsubscribe"
type WebSocketService struct {
baseURL, key, secret string
mu sync.Mutex
conn *websocket.Conn
reconnectC chan struct{}
@ -90,7 +93,8 @@ func (s *WebSocketService) Connect(ctx context.Context) error {
if err := s.connect(ctx); err != nil {
return err
}
go s.read(ctx)
go s.reconnector(ctx)
return nil
}
@ -107,6 +111,9 @@ func (s *WebSocketService) Auth() error {
}
func (s *WebSocketService) connect(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
dialer := websocket.DefaultDialer
conn, _, err := dialer.DialContext(ctx, s.baseURL, nil)
if err != nil {
@ -116,6 +123,8 @@ func (s *WebSocketService) connect(ctx context.Context) error {
s.conn = conn
s.EmitConnect(conn)
go s.read(ctx)
return nil
}
@ -126,7 +135,7 @@ func (s *WebSocketService) emitReconnect() {
}
}
func (s *WebSocketService) read(ctx context.Context) {
func (s *WebSocketService) reconnector(ctx context.Context) {
for {
select {
case <-ctx.Done():
@ -137,12 +146,29 @@ func (s *WebSocketService) read(ctx context.Context) {
if err := s.connect(ctx); err != nil {
s.emitReconnect()
}
}
}
}
func (s *WebSocketService) read(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
s.mu.Lock()
mt, msg, err := s.conn.ReadMessage()
s.mu.Unlock()
if err != nil {
s.emitReconnect()
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
// emit reconnect to start a new connection
s.emitReconnect()
return
}
log.WithError(err).Error("websocket error")
continue
}
@ -219,9 +245,9 @@ func (s *WebSocketService) Reconnect() {
// (Internal public method)
func (s *WebSocketService) Subscribe(channel, market string, options SubscribeOptions) {
s.AddSubscription(Subscription{
Channel: channel,
Market: market,
Depth: options.Depth,
Channel: channel,
Market: market,
Depth: options.Depth,
Resolution: options.Resolution,
})
}

View File

@ -61,20 +61,17 @@ type Exchange interface {
PlatformFeeCurrency() string
NewStream() Stream
// required implementation
ExchangeMarketDataService
QueryMarkets(ctx context.Context) (MarketMap, error)
ExchangeTradingService
}
type ExchangeTradingService interface {
QueryAccount(ctx context.Context) (*Account, error)
QueryAccountBalances(ctx context.Context) (BalanceMap, error)
QueryTicker(ctx context.Context, symbol string) (*Ticker, error)
QueryTickers(ctx context.Context, symbol ...string) (map[string]Ticker, error)
QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error)
QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error)
SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error)
@ -86,6 +83,18 @@ type Exchange interface {
CancelOrders(ctx context.Context, orders ...Order) error
}
type ExchangeMarketDataService interface {
NewStream() Stream
QueryMarkets(ctx context.Context) (MarketMap, error)
QueryTicker(ctx context.Context, symbol string) (*Ticker, error)
QueryTickers(ctx context.Context, symbol ...string) (map[string]Ticker, error)
QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error)
}
type ExchangeTransferService interface {
QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error)
QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error)

View File

@ -237,18 +237,23 @@ func (b *OrderBook) String() string {
sb.WriteString("BOOK ")
sb.WriteString(b.Symbol)
sb.WriteString("\n")
sb.WriteString("ASKS:\n")
for i := len(b.Asks) - 1; i >= 0; i-- {
sb.WriteString("- ASK: ")
sb.WriteString(b.Asks[i].String())
sb.WriteString("\n")
if len(b.Asks) > 0 {
sb.WriteString("ASKS:\n")
for i := len(b.Asks) - 1; i >= 0; i-- {
sb.WriteString("- ASK: ")
sb.WriteString(b.Asks[i].String())
sb.WriteString("\n")
}
}
sb.WriteString("BIDS:\n")
for _, bid := range b.Bids {
sb.WriteString("- BID: ")
sb.WriteString(bid.String())
sb.WriteString("\n")
if len(b.Bids) > 0 {
sb.WriteString("BIDS:\n")
for _, bid := range b.Bids {
sb.WriteString("- BID: ")
sb.WriteString(bid.String())
sb.WriteString("\n")
}
}
return sb.String()

View File

@ -14,6 +14,16 @@ func (stream *StandardStream) EmitConnect() {
}
}
func (stream *StandardStream) OnDisconnect(cb func()) {
stream.disconnectCallbacks = append(stream.disconnectCallbacks, cb)
}
func (stream *StandardStream) EmitDisconnect() {
for _, cb := range stream.disconnectCallbacks {
cb()
}
}
func (stream *StandardStream) OnTradeUpdate(cb func(trade Trade)) {
stream.tradeUpdateCallbacks = append(stream.tradeUpdateCallbacks, cb)
}
@ -97,6 +107,8 @@ func (stream *StandardStream) EmitBookSnapshot(book OrderBook) {
type StandardStreamEventHub interface {
OnConnect(cb func())
OnDisconnect(cb func())
OnTradeUpdate(cb func(trade Trade))
OnOrderUpdate(cb func(order Order))

View File

@ -25,6 +25,8 @@ type StandardStream struct {
connectCallbacks []func()
disconnectCallbacks []func()
// private trade update callbacks
tradeUpdateCallbacks []func(trade Trade)