Merge pull request #136 from c9s/ftx/print-subscribed-message

This commit is contained in:
YC 2021-02-28 09:07:11 +08:00 committed by GitHub
commit 4730efefa3
9 changed files with 226 additions and 56 deletions

View File

@ -6,10 +6,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/exchange/ftx"
"github.com/c9s/bbgo/pkg/types"
)
//godotenv -f .env.local go run ./cmd/bbgo balances --session=ftx
@ -41,16 +37,3 @@ func init() {
RootCmd.AddCommand(balancesCmd)
}
func newExchange(session string) (types.Exchange, error) {
switch session {
case "ftx":
return ftx.NewExchange(
viper.GetString("ftx-api-key"),
viper.GetString("ftx-api-secret"),
viper.GetString("ftx-subaccount-name"),
), nil
}
return nil, fmt.Errorf("unsupported session %s", session)
}

50
pkg/cmd/orderbook.go Normal file
View File

@ -0,0 +1,50 @@
package cmd
import (
"context"
"fmt"
"syscall"
"github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types"
)
// go run ./cmd/bbgo orderbook --session=ftx --symbol=btc/usdt
var orderbookCmd = &cobra.Command{
Use: "orderbook",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
session, err := cmd.Flags().GetString("session")
if err != nil {
return fmt.Errorf("can't get session from flags: %w", err)
}
ex, err := newExchange(session)
if err != nil {
return err
}
symbol, err := cmd.Flags().GetString("symbol")
if err != nil {
return fmt.Errorf("can't get the symbol from flags: %w", err)
}
s := ex.NewStream()
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", session)
}
// TODO: register callbacks to print orderbook and updates
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil
},
}
func init() {
orderbookCmd.Flags().String("session", "", "the exchange session name for sync")
orderbookCmd.Flags().String("symbol", "", "the trading pair, like btcusdt")
RootCmd.AddCommand(orderbookCmd)
}

View File

@ -1,6 +1,11 @@
package cmd
import (
"fmt"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/exchange/ftx"
"github.com/c9s/bbgo/pkg/types"
)
@ -10,3 +15,15 @@ func inBaseAsset(balances types.BalanceMap, market types.Market, price float64)
return (base.Locked.Float64() + base.Available.Float64()) + ((quote.Locked.Float64() + quote.Available.Float64()) / price)
}
func newExchange(session string) (types.Exchange, error) {
switch session {
case "ftx":
return ftx.NewExchange(
viper.GetString("ftx-api-key"),
viper.GetString("ftx-api-secret"),
viper.GetString("ftx-subaccount-name"),
), nil
}
return nil, fmt.Errorf("unsupported session %s", session)
}

View File

@ -22,6 +22,8 @@ func NewStream(key, secret string) *Stream {
StandardStream: types.StandardStream{},
wsService: wss,
}
wss.OnMessage(messageHandler{s.StandardStream}.handleMessage)
return s
}
@ -33,42 +35,11 @@ func (s *Stream) SetPublicOnly() {
atomic.StoreInt32(&s.publicOnly, 1)
}
func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) {
if err := s.wsService.Subscribe(channel, symbol); err != nil {
logger.WithError(err).Errorf("subscribe failed, should never happen")
}
}
func (s *Stream) Close() error {
return s.wsService.Close()
}
func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) {
panic("implement me")
}
func (s *Stream) OnTradeUpdate(cb func(trade types.Trade)) {
panic("implement me")
}
func (s *Stream) OnOrderUpdate(cb func(order types.Order)) {
panic("implement me")
}
func (s *Stream) OnBalanceSnapshot(cb func(balances types.BalanceMap)) {
panic("implement me")
}
func (s *Stream) OnBalanceUpdate(cb func(balances types.BalanceMap)) {
panic("implement me")
}
func (s *Stream) OnKLineClosed(cb func(kline types.KLine)) {
panic("implement me")
}
func (s *Stream) OnKLine(cb func(kline types.KLine)) {
panic("implement me")
}
func (s *Stream) OnBookUpdate(cb func(book types.OrderBook)) {
panic("implement me")
}
func (s *Stream) OnBookSnapshot(cb func(book types.OrderBook)) {
panic("implement me")
}

View File

@ -0,0 +1,31 @@
package ftx
import (
"encoding/json"
"github.com/c9s/bbgo/pkg/types"
)
type messageHandler struct {
types.StandardStream
}
func (h messageHandler) handleMessage(message []byte) {
var r rawResponse
if err := json.Unmarshal(message, &r); err != nil {
logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message))
return
}
switch r.Type {
case subscribedRespType:
h.handleSubscribedMessage(r)
default:
logger.Errorf("unsupported message type: %+v", r.Type)
}
}
// {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
func (h messageHandler) handleSubscribedMessage(response rawResponse) {
logger.Infof("%s orderbook is subscribed", response.toSubscribedResp().Market)
}

View File

@ -1,9 +1,14 @@
package ftx
import (
"fmt"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
type WebsocketService struct {
@ -11,6 +16,8 @@ type WebsocketService struct {
key string
secret string
subscriptions []SubscribeRequest
}
const endpoint = "wss://ftx.com/ws/"
@ -21,9 +28,40 @@ func NewWebsocketService(key string, secret string) *WebsocketService {
key: key,
secret: secret,
}
s.OnConnected(func(_ *websocket.Conn) {
if err := s.sendSubscriptions(); err != nil {
s.EmitError(err)
}
})
return s
}
func (w *WebsocketService) Subscribe(channel types.Channel, symbol string) error {
r := SubscribeRequest{
Operation: subscribe,
}
if channel != types.BookChannel {
return fmt.Errorf("unsupported channel %+v", channel)
}
r.Channel = orderbook
r.Market = strings.ToUpper(strings.TrimSpace(symbol))
w.subscriptions = append(w.subscriptions, r)
return nil
}
var errSubscriptionFailed = fmt.Errorf("failed to subscribe")
func (w *WebsocketService) sendSubscriptions() error {
conn := w.Conn()
for _, s := range w.subscriptions {
if err := conn.WriteJSON(s); err != nil {
return fmt.Errorf("can't send subscription request %+v: %w", s, errSubscriptionFailed)
}
}
return nil
}
func (w *WebsocketService) Close() error {
return w.Conn().Close()
}

View File

@ -0,0 +1,62 @@
package ftx
import "encoding/json"
type operation string
const subscribe operation = "subscribe"
const unsubscribe operation = "unsubscribe"
type channel string
const orderbook channel = "orderbook"
const trades channel = "trades"
const ticker channel = "ticker"
// {'op': 'subscribe', 'channel': 'trades', 'market': 'BTC-PERP'}
type SubscribeRequest struct {
Operation operation `json:"op"`
Channel channel `json:"channel"`
Market string `json:"market"`
}
type respType string
const errRespType respType = "error"
const subscribedRespType respType = "subscribed"
const unsubscribedRespType respType = "unsubscribed"
const infoRespType respType = "info"
const partialRespType respType = "partial"
const updateRespType respType = "update"
type mandatoryFields struct {
Type respType `json:"type"`
// Channel is mandatory
Channel channel `json:"channel"`
// Market is mandatory
Market string `json:"market"`
}
// doc: https://docs.ftx.com/#response-format
type rawResponse struct {
mandatoryFields
// The following fields are optional.
// Example 1: {"type": "error", "code": 404, "msg": "No such market: BTCUSDT"}
Code int64 `json:"code"`
Message string `json:"msg"`
Data map[string]json.RawMessage `json:"data"`
}
func (r rawResponse) toSubscribedResp() subscribedResponse {
return subscribedResponse{
mandatoryFields: r.mandatoryFields,
}
}
// {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
type subscribedResponse struct {
mandatoryFields
}

View File

@ -0,0 +1,18 @@
package ftx
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_rawResponse_toSubscribedResp(t *testing.T) {
input := `{"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}`
var m rawResponse
assert.NoError(t, json.Unmarshal([]byte(input), &m))
r := m.toSubscribedResp()
assert.Equal(t, subscribedRespType, r.Type)
assert.Equal(t, orderbook, r.Channel)
assert.Equal(t, "BTC/USDT", r.Market)
}

View File

@ -40,14 +40,14 @@ func (s *WebsocketClientBase) Listen(ctx context.Context) {
case <-s.reconnectC:
time.Sleep(s.reconnectDuration)
if err := s.connect(ctx); err != nil {
s.reconnect()
s.Reconnect()
}
default:
conn := s.Conn()
mt, msg, err := conn.ReadMessage()
if err != nil {
s.reconnect()
s.Reconnect()
continue
}
@ -68,7 +68,7 @@ func (s *WebsocketClientBase) Connect(ctx context.Context) error {
return nil
}
func (s *WebsocketClientBase) reconnect() {
func (s *WebsocketClientBase) Reconnect() {
select {
case s.reconnectC <- struct{}{}:
default: