ftx: authenticate websocket

This commit is contained in:
ycdesu 2021-03-27 18:07:35 +08:00
parent 24254a869d
commit f60f1ef52e
5 changed files with 98 additions and 27 deletions

View File

@ -3,11 +3,14 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"syscall" "syscall"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -63,9 +66,76 @@ var orderbookCmd = &cobra.Command{
}, },
} }
// go run ./cmd/bbgo orderupdate
var orderUpdateCmd = &cobra.Command{
Use: "orderupdate",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configFile, err := cmd.Flags().GetString("config")
if err != nil {
return err
}
if len(configFile) == 0 {
return errors.New("--config option is required")
}
// if config file exists, use the config loaded from the config file.
// otherwise, use a empty config object
var userConfig *bbgo.Config
if _, err := os.Stat(configFile); err == nil {
// load successfully
userConfig, err = bbgo.Load(configFile, false)
if err != nil {
return err
}
} else if os.IsNotExist(err) {
// config file doesn't exist
userConfig = &bbgo.Config{}
} else {
// other error
return err
}
environ := bbgo.NewEnvironment()
if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err
}
sessionName, err := cmd.Flags().GetString("session")
if err != nil {
return err
}
session, ok := environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
}
s := session.Exchange.NewStream()
s.OnOrderUpdate(func(order types.Order) {
log.Infof("order update: %+v", order)
})
log.Infof("connecting...")
if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", sessionName)
}
log.Infof("connected")
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil
},
}
func init() { func init() {
// since the public data does not require trading authentication, we use --exchange option here. // since the public data does not require trading authentication, we use --exchange option here.
orderbookCmd.Flags().String("exchange", "", "the exchange name for sync") orderbookCmd.Flags().String("exchange", "", "the exchange name for sync")
orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...") orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...")
orderUpdateCmd.Flags().String("session", "", "session name")
RootCmd.AddCommand(orderbookCmd) RootCmd.AddCommand(orderbookCmd)
RootCmd.AddCommand(orderUpdateCmd)
} }

View File

@ -3,6 +3,7 @@ package ftx
import ( import (
"context" "context"
"sync/atomic" "sync/atomic"
"time"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -28,11 +29,16 @@ func NewStream(key, secret string) *Stream {
} }
func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) Connect(ctx context.Context) error {
if err := s.wsService.Connect(ctx); err != nil {
return err
}
// If it's not public only, let's do the authentication. // If it's not public only, let's do the authentication.
if atomic.LoadInt32(&s.publicOnly) == 0 { if atomic.LoadInt32(&s.publicOnly) == 0 {
logger.Infof("subscribe private events")
s.wsService.Subscribe(
newLoginRequest(s.wsService.key, s.wsService.secret, time.Now()),
)
}
if err := s.wsService.Connect(ctx); err != nil {
return err
} }
return nil return nil
@ -43,10 +49,16 @@ func (s *Stream) SetPublicOnly() {
} }
func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) { func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) {
if err := s.wsService.Subscribe(channel, symbol); err != nil { if channel != types.BookChannel {
logger.WithError(err).Errorf("subscribe failed, should never happen") // TODO: return err
} }
s.wsService.Subscribe(websocketRequest{
Operation: subscribe,
Channel: orderbook,
Market: TrimUpperString(symbol),
})
} }
func (s *Stream) Close() error { func (s *Stream) Close() error {
if s.wsService != nil { if s.wsService != nil {
return s.wsService.Close() return s.wsService.Close()

View File

@ -13,6 +13,7 @@ type messageHandler struct {
} }
func (h *messageHandler) handleMessage(message []byte) { func (h *messageHandler) handleMessage(message []byte) {
log.Infof("raw: %s", string(message))
var r rawResponse var r rawResponse
if err := json.Unmarshal(message, &r); err != nil { if err := json.Unmarshal(message, &r); err != nil {
logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message)) logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message))

View File

@ -2,13 +2,11 @@ package ftx
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
) )
type WebsocketService struct { type WebsocketService struct {
@ -17,7 +15,7 @@ type WebsocketService struct {
key string key string
secret string secret string
subscriptions []SubscribeRequest subscriptions []websocketRequest
} }
const endpoint = "wss://ftx.com/ws/" const endpoint = "wss://ftx.com/ws/"
@ -36,18 +34,8 @@ func NewWebsocketService(key string, secret string) *WebsocketService {
return s return s
} }
func (w *WebsocketService) Subscribe(channel types.Channel, symbol string) error { func (w *WebsocketService) Subscribe(request websocketRequest) {
r := SubscribeRequest{ w.subscriptions = append(w.subscriptions, request)
Operation: subscribe,
}
if channel != types.BookChannel {
return fmt.Errorf("unsupported channel %+v", channel)
}
r.Channel = orderbook
r.Market = strings.ToUpper(strings.TrimSpace(symbol))
w.subscriptions = append(w.subscriptions, r)
return nil
} }
var errSubscriptionFailed = fmt.Errorf("failed to subscribe") var errSubscriptionFailed = fmt.Errorf("failed to subscribe")
@ -55,6 +43,7 @@ var errSubscriptionFailed = fmt.Errorf("failed to subscribe")
func (w *WebsocketService) sendSubscriptions() error { func (w *WebsocketService) sendSubscriptions() error {
conn := w.Conn() conn := w.Conn()
for _, s := range w.subscriptions { for _, s := range w.subscriptions {
logger.Infof("s: %+v", s)
if err := conn.WriteJSON(s); err != nil { if err := conn.WriteJSON(s); err != nil {
return fmt.Errorf("can't send subscription request %+v: %w", s, errSubscriptionFailed) return fmt.Errorf("can't send subscription request %+v: %w", s, errSubscriptionFailed)
} }
@ -62,6 +51,11 @@ func (w *WebsocketService) sendSubscriptions() error {
return nil return nil
} }
// After closing the websocket connection, you have to subscribe all events again
func (w *WebsocketService) Close() error { func (w *WebsocketService) Close() error {
return w.Conn().Close() w.subscriptions = nil
if conn := w.Conn(); conn != nil {
return conn.Close()
}
return nil
} }

View File

@ -67,12 +67,6 @@ func loginBody(millis int64) string {
return fmt.Sprintf("%dwebsocket_login", millis) return fmt.Sprintf("%dwebsocket_login", millis)
} }
type SubscribeRequest struct {
Operation operation `json:"op"`
Channel channel `json:"channel"`
Market string `json:"market"`
}
type respType string type respType string
const errRespType respType = "error" const errRespType respType = "error"