ftx: refactor websocket_messages structs

This commit is contained in:
ycdesu 2021-03-27 19:40:23 +08:00
parent d3cdd3c2a6
commit 34ea325499
4 changed files with 85 additions and 83 deletions

View File

@ -62,7 +62,7 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.Subscri
}
s.wsService.Subscribe(websocketRequest{
Operation: subscribe,
Channel: orderbook,
Channel: orderBookChannel,
Market: TrimUpperString(symbol),
})
}

View File

@ -3,8 +3,6 @@ package ftx
import (
"encoding/json"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
@ -13,56 +11,52 @@ type messageHandler struct {
}
func (h *messageHandler) handleMessage(message []byte) {
log.Infof("raw: %s", string(message))
var r rawResponse
var r websocketResponse
if err := json.Unmarshal(message, &r); err != nil {
logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message))
return
}
switch r.Type {
case subscribedRespType:
h.handleSubscribedMessage(r)
case partialRespType, updateRespType:
h.handleMarketData(r)
switch r.Channel {
case orderBookChannel:
h.handleOrderBook(r)
case privateOrdersChannel:
default:
logger.Errorf("unsupported message type: %+v", r.Type)
}
}
// {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
func (h messageHandler) handleSubscribedMessage(response rawResponse) {
r := response.toSubscribedResp()
func (h messageHandler) handleSubscribedMessage(response websocketResponse) {
r, err := response.toSubscribedResponse()
if err != nil {
logger.WithError(err).Errorf("failed to convert the subscribed message")
return
}
logger.Infof("%s %s is subscribed", r.Market, r.Channel)
}
func (h *messageHandler) handleMarketData(response rawResponse) {
r, err := response.toOrderBookResponse()
func (h *messageHandler) handleOrderBook(response websocketResponse) {
if response.Type == subscribedRespType {
h.handleSubscribedMessage(response)
return
}
r, err := response.toPublicOrderBookResponse()
if err != nil {
log.WithError(err).Errorf("failed to convert the partial response to data response")
logger.WithError(err).Errorf("failed to convert the public orderbook")
return
}
switch r.Channel {
case orderbook:
h.handleOrderBook(r)
default:
log.Errorf("unsupported market data channel %s", r.Channel)
return
}
}
func (h *messageHandler) handleOrderBook(r orderBookResponse) {
globalOrderBook, err := toGlobalOrderBook(r)
if err != nil {
log.WithError(err).Errorf("failed to generate orderbook snapshot")
logger.WithError(err).Errorf("failed to generate orderbook snapshot")
return
}
switch r.Type {
case partialRespType:
if err := r.verifyChecksum(); err != nil {
log.WithError(err).Errorf("invalid orderbook snapshot")
logger.WithError(err).Errorf("invalid orderbook snapshot")
return
}
h.EmitBookSnapshot(globalOrderBook)
@ -70,7 +64,7 @@ func (h *messageHandler) handleOrderBook(r orderBookResponse) {
// emit updates, not the whole orderbook
h.EmitBookUpdate(globalOrderBook)
default:
log.Errorf("unsupported order book data type %s", r.Type)
logger.Errorf("unsupported order book data type %s", r.Type)
return
}
}

View File

@ -21,10 +21,10 @@ const unsubscribe operation = "unsubscribe"
type channel string
const orderbook channel = "orderbook"
const orderBookChannel channel = "orderbook"
const privateOrdersChannel channel = "orders"
var errInvalidChannel = fmt.Errorf("invalid channel")
var errUnsupportedConversion = fmt.Errorf("unsupported conversion")
/*
Private:
@ -74,13 +74,22 @@ func loginBody(millis int64) string {
return fmt.Sprintf("%dwebsocket_login", millis)
}
type respType string
const errRespType respType = "error"
const subscribedRespType respType = "subscribed"
const unsubscribedRespType respType = "unsubscribed"
const infoRespType respType = "info"
const partialRespType respType = "partial"
const updateRespType respType = "update"
type websocketResponse struct {
mandatory
mandatoryFields
optionalFields
}
type mandatory struct {
type mandatoryFields struct {
Channel channel `json:"channel"`
Type respType `json:"type"`
@ -98,82 +107,76 @@ type optionalFields struct {
}
type orderUpdateResponse struct {
mandatory
mandatoryFields
Data order `json:"data"`
}
func (r websocketResponse) toOrderUpdateResponse() (orderUpdateResponse, error) {
if r.Channel != privateOrdersChannel {
return orderUpdateResponse{}, fmt.Errorf("can't convert %s channel data to %s: %w", r.Channel, privateOrdersChannel, errInvalidChannel)
if r.Type != subscribedRespType || r.Channel != privateOrdersChannel {
return orderUpdateResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion)
}
var o orderUpdateResponse
if err := json.Unmarshal(r.Data, &o.Data); err != nil {
return orderUpdateResponse{}, err
}
o.mandatory = r.mandatory
o.mandatoryFields = r.mandatoryFields
return o, nil
}
type respType string
/*
Private:
order: {"type": "subscribed", "channel": "orders"}
const errRespType respType = "error"
const subscribedRespType respType = "subscribed"
const unsubscribedRespType respType = "unsubscribed"
const infoRespType respType = "info"
const partialRespType respType = "partial"
const updateRespType respType = "update"
Public
ordeerbook: {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
type mandatoryFields struct {
Type respType `json:"type"`
*/
type subscribedResponse struct {
mandatoryFields
// Channel is mandatory
Channel channel `json:"channel"`
// Market is mandatory
Market string `json:"market"`
}
// doc: https://docs.ftx.com/#response-format
type rawResponse struct {
mandatoryFields
// {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
func (r websocketResponse) toSubscribedResponse() (subscribedResponse, error) {
if r.Type != subscribedRespType {
return subscribedResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion)
}
// The following fields are optional.
// Example 1: {"type": "error", "code": 404, "msg": "No such market: BTCUSDT"}
Code int64 `json:"code"`
Message string `json:"msg"`
Data json.RawMessage `json:"data"`
}
func (r rawResponse) toSubscribedResp() subscribedResponse {
return subscribedResponse{
mandatoryFields: r.mandatoryFields,
}
Market: r.Market,
}, nil
}
func (r rawResponse) toOrderBookResponse() (orderBookResponse, error) {
o := orderBookResponse{
mandatoryFields: r.mandatoryFields,
func (r websocketResponse) toPublicOrderBookResponse() (orderBookResponse, error) {
if r.Channel != orderBookChannel {
return orderBookResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion)
}
var o orderBookResponse
if err := json.Unmarshal(r.Data, &o); err != nil {
return orderBookResponse{}, err
}
sec, dec := math.Modf(o.Time)
o.Timestamp = time.Unix(int64(sec), int64(dec*1e9))
o.mandatoryFields = r.mandatoryFields
o.Market = r.Market
o.Timestamp = nanoToTime(o.Time)
return o, nil
}
// {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
type subscribedResponse struct {
mandatoryFields
func nanoToTime(input float64) time.Time {
sec, dec := math.Modf(input)
return time.Unix(int64(sec), int64(dec*1e9))
}
type orderBookResponse struct {
mandatoryFields
Market string `json:"market"`
Action string `json:"action"`
Time float64 `json:"time"`

View File

@ -15,23 +15,28 @@ import (
func Test_rawResponse_toSubscribedResp(t *testing.T) {
input := `{"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}`
var m rawResponse
var m websocketResponse
assert.NoError(t, json.Unmarshal([]byte(input), &m))
r := m.toSubscribedResp()
assert.Equal(t, subscribedRespType, r.Type)
assert.Equal(t, orderbook, r.Channel)
assert.Equal(t, "BTC/USDT", r.Market)
r, err := m.toSubscribedResponse()
assert.NoError(t, err)
assert.Equal(t, subscribedResponse{
mandatoryFields: mandatoryFields{
Channel: orderBookChannel,
Type: subscribedRespType,
},
Market: "BTC/USDT",
}, r)
}
func Test_rawResponse_toDataResponse(t *testing.T) {
func Test_websocketResponse_toPublicOrderBookResponse(t *testing.T) {
f, err := ioutil.ReadFile("./orderbook_snapshot.json")
assert.NoError(t, err)
var m rawResponse
var m websocketResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toOrderBookResponse()
r, err := m.toPublicOrderBookResponse()
assert.NoError(t, err)
assert.Equal(t, partialRespType, r.Type)
assert.Equal(t, orderbook, r.Channel)
assert.Equal(t, orderBookChannel, r.Channel)
assert.Equal(t, "BTC/USDT", r.Market)
assert.Equal(t, int64(1614520368), r.Timestamp.Unix())
assert.Equal(t, uint32(2150525410), r.Checksum)
@ -46,9 +51,9 @@ func Test_rawResponse_toDataResponse(t *testing.T) {
func Test_orderBookResponse_toGlobalOrderBook(t *testing.T) {
f, err := ioutil.ReadFile("./orderbook_snapshot.json")
assert.NoError(t, err)
var m rawResponse
var m websocketResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toOrderBookResponse()
r, err := m.toPublicOrderBookResponse()
assert.NoError(t, err)
b, err := toGlobalOrderBook(r)
@ -120,9 +125,9 @@ func Test_orderBookResponse_verifyChecksum(t *testing.T) {
for _, file := range []string{"./orderbook_snapshot.json"} {
f, err := ioutil.ReadFile(file)
assert.NoError(t, err)
var m rawResponse
var m websocketResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toOrderBookResponse()
r, err := m.toPublicOrderBookResponse()
assert.NoError(t, err)
assert.NoError(t, r.verifyChecksum(), "filename: "+file)
}
@ -216,7 +221,7 @@ func Test_websocketResponse_toOrderUpdateResponse(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, orderUpdateResponse{
mandatory: mandatory{
mandatoryFields: mandatoryFields{
Channel: privateOrdersChannel,
Type: updateRespType,
},