diff --git a/pkg/exchange/ftx/stream_message_handler.go b/pkg/exchange/ftx/stream_message_handler.go index a41ea887a..9c6cf19f2 100644 --- a/pkg/exchange/ftx/stream_message_handler.go +++ b/pkg/exchange/ftx/stream_message_handler.go @@ -12,7 +12,7 @@ type messageHandler struct { *types.StandardStream } -func (h messageHandler) handleMessage(message []byte) { +func (h *messageHandler) handleMessage(message []byte) { var r rawResponse if err := json.Unmarshal(message, &r); err != nil { logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message)) @@ -36,7 +36,7 @@ func (h messageHandler) handleSubscribedMessage(response rawResponse) { } func (h *messageHandler) handleMarketData(response rawResponse) { - r, err := response.toDataResponse() + r, err := response.toOrderBookResponse() if err != nil { log.WithError(err).Errorf("failed to convert the partial response to data response") return @@ -51,8 +51,8 @@ func (h *messageHandler) handleMarketData(response rawResponse) { } } -func (h messageHandler) handleOrderBook(r dataResponse) { - ob, err := toGlobalOrderBook(r) +func (h *messageHandler) handleOrderBook(r orderBookResponse) { + globalOrderBook, err := toGlobalOrderBook(r) if err != nil { log.WithError(err).Errorf("failed to generate orderbook snapshot") return @@ -60,9 +60,14 @@ func (h messageHandler) handleOrderBook(r dataResponse) { switch r.Type { case partialRespType: - h.EmitBookSnapshot(ob) + if err := r.verifyChecksum(); err != nil { + log.WithError(err).Errorf("invalid orderbook snapshot") + return + } + h.EmitBookSnapshot(globalOrderBook) case updateRespType: - h.EmitBookUpdate(ob) + // emit updates, not the whole orderbook + h.EmitBookUpdate(globalOrderBook) default: log.Errorf("unsupported order book data type %s", r.Type) return diff --git a/pkg/exchange/ftx/websocket_messages.go b/pkg/exchange/ftx/websocket_messages.go index fbfb56e5b..2422ca672 100644 --- a/pkg/exchange/ftx/websocket_messages.go +++ b/pkg/exchange/ftx/websocket_messages.go @@ -3,7 +3,9 @@ package ftx import ( "encoding/json" "fmt" + "hash/crc32" "math" + "strconv" "strings" "time" @@ -65,13 +67,13 @@ func (r rawResponse) toSubscribedResp() subscribedResponse { } } -func (r rawResponse) toDataResponse() (dataResponse, error) { - o := dataResponse{ +func (r rawResponse) toOrderBookResponse() (orderBookResponse, error) { + o := orderBookResponse{ mandatoryFields: r.mandatoryFields, } if err := json.Unmarshal(r.Data, &o); err != nil { - return dataResponse{}, err + return orderBookResponse{}, err } sec, dec := math.Modf(o.Time) @@ -85,7 +87,7 @@ type subscribedResponse struct { mandatoryFields } -type dataResponse struct { +type orderBookResponse struct { mandatoryFields Action string `json:"action"` @@ -96,12 +98,130 @@ type dataResponse struct { Checksum uint32 `json:"checksum"` + // best 100 orders. Ex. {[100,1], [50, 2]} Bids [][]json.Number `json:"bids"` + // best 100 orders. Ex. {[51, 1], [102, 3]} Asks [][]json.Number `json:"asks"` } -func toGlobalOrderBook(r dataResponse) (types.OrderBook, error) { +// only 100 orders so we use linear search here +func (r *orderBookResponse) update(orderUpdates orderBookResponse) { + r.Checksum = orderUpdates.Checksum + r.updateBids(orderUpdates.Bids) + r.updateAsks(orderUpdates.Asks) +} + +func (r *orderBookResponse) updateAsks(asks [][]json.Number) { + higherPrice := func(dst, src float64) bool { + return dst < src + } + for _, o := range asks { + if remove := o[1] == "0"; remove { + r.Asks = removePrice(r.Asks, o[0]) + } else { + r.Asks = upsertPriceVolume(r.Asks, o, higherPrice) + } + } +} + +func (r *orderBookResponse) updateBids(bids [][]json.Number) { + lessPrice := func(dst, src float64) bool { + return dst > src + } + for _, o := range bids { + if remove := o[1] == "0"; remove { + r.Bids = removePrice(r.Bids, o[0]) + } else { + r.Bids = upsertPriceVolume(r.Bids, o, lessPrice) + } + } +} + +func upsertPriceVolume(dst [][]json.Number, src []json.Number, priceComparator func(dst float64, src float64) bool) [][]json.Number { + for i, pv := range dst { + dstPrice := pv[0] + srcPrice := src[0] + + // update volume + if dstPrice == srcPrice { + pv[1] = src[1] + return dst + } + + // The value must be a number which is verified by json.Unmarshal, so the err + // should never happen. + dstPriceNum, err := strconv.ParseFloat(string(dstPrice), 64) + if err != nil { + logger.WithError(err).Errorf("unexpected price %s", dstPrice) + continue + } + srcPriceNum, err := strconv.ParseFloat(string(srcPrice), 64) + if err != nil { + logger.WithError(err).Errorf("unexpected price updates %s", srcPrice) + continue + } + + if !priceComparator(dstPriceNum, srcPriceNum) { + return insertAt(dst, i, src) + } + } + + return append(dst, src) +} + +func insertAt(dst [][]json.Number, id int, pv []json.Number) (result [][]json.Number) { + result = append(result, dst[:id]...) + result = append(result, pv) + result = append(result, dst[id:]...) + return +} + +func removePrice(dst [][]json.Number, price json.Number) [][]json.Number { + for i, pv := range dst { + if pv[0] == price { + return append(dst[:i], dst[i+1:]...) + } + } + + return dst +} + +func (r orderBookResponse) verifyChecksum() error { + if crc32Val := crc32.ChecksumIEEE([]byte(checksumString(r.Bids, r.Asks))); crc32Val != r.Checksum { + return fmt.Errorf("expected checksum %d, actual checksum %d: %w", r.Checksum, crc32Val, errUnmatchedChecksum) + } + return nil +} + +// :::... +func checksumString(bids, asks [][]json.Number) string { + sb := strings.Builder{} + appendNumber := func(pv []json.Number) { + if sb.Len() != 0 { + sb.WriteString(":") + } + sb.WriteString(string(pv[0])) + sb.WriteString(":") + sb.WriteString(string(pv[1])) + } + + bidsLen := len(bids) + asksLen := len(asks) + for i := 0; i < bidsLen || i < asksLen; i++ { + if i < bidsLen { + appendNumber(bids[i]) + } + if i < asksLen { + appendNumber(asks[i]) + } + } + return sb.String() +} + +var errUnmatchedChecksum = fmt.Errorf("unmatched checksum") + +func toGlobalOrderBook(r orderBookResponse) (types.OrderBook, error) { bids, err := toPriceVolumeSlice(r.Bids) if err != nil { return types.OrderBook{}, fmt.Errorf("can't convert bids to priceVolumeSlice: %w", err) diff --git a/pkg/exchange/ftx/websocket_messages_test.go b/pkg/exchange/ftx/websocket_messages_test.go index 33c236265..1196385f3 100644 --- a/pkg/exchange/ftx/websocket_messages_test.go +++ b/pkg/exchange/ftx/websocket_messages_test.go @@ -26,7 +26,7 @@ func Test_rawResponse_toDataResponse(t *testing.T) { assert.NoError(t, err) var m rawResponse assert.NoError(t, json.Unmarshal(f, &m)) - r, err := m.toDataResponse() + r, err := m.toOrderBookResponse() assert.NoError(t, err) assert.Equal(t, partialRespType, r.Type) assert.Equal(t, orderbook, r.Channel) @@ -41,12 +41,12 @@ func Test_rawResponse_toDataResponse(t *testing.T) { assert.Equal(t, []json.Number{"44579.0", "0.15"}, r.Asks[1]) } -func Test_DataResponse_toGlobalOrderBook(t *testing.T) { +func Test_orderBookResponse_toGlobalOrderBook(t *testing.T) { f, err := ioutil.ReadFile("./orderbook_snapshot.json") assert.NoError(t, err) var m rawResponse assert.NoError(t, json.Unmarshal(f, &m)) - r, err := m.toDataResponse() + r, err := m.toOrderBookResponse() assert.NoError(t, err) b, err := toGlobalOrderBook(r) @@ -78,3 +78,95 @@ func Test_DataResponse_toGlobalOrderBook(t *testing.T) { } +func Test_checksumString(t *testing.T) { + type args struct { + bids [][]json.Number + asks [][]json.Number + } + tests := []struct { + name string + args args + want string + }{ + { + name: "more bids", + args: args{ + bids: [][]json.Number{{"5000.5", "10"}, {"4995.0", "5"}}, + asks: [][]json.Number{{"5001.0", "6"}}, + }, + want: "5000.5:10:5001.0:6:4995.0:5", + }, + { + name: "lengths of bids and asks are the same", + args: args{ + bids: [][]json.Number{{"5000.5", "10"}, {"4995.0", "5"}}, + asks: [][]json.Number{{"5001.0", "6"}, {"5002.0", "7"}}, + }, + want: "5000.5:10:5001.0:6:4995.0:5:5002.0:7", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := checksumString(tt.args.bids, tt.args.asks); got != tt.want { + t.Errorf("checksumString() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_orderBookResponse_verifyChecksum(t *testing.T) { + for _, file := range []string{"./orderbook_snapshot.json"} { + f, err := ioutil.ReadFile(file) + assert.NoError(t, err) + var m rawResponse + assert.NoError(t, json.Unmarshal(f, &m)) + r, err := m.toOrderBookResponse() + assert.NoError(t, err) + assert.NoError(t, r.verifyChecksum(), "filename: "+file) + } +} + +func Test_removePrice(t *testing.T) { + pairs := [][]json.Number{{"123.99", "2.0"}, {"2234.12", "3.1"}} + assert.Equal(t, pairs, removePrice(pairs, "99333")) + + pairs = removePrice(pairs, "2234.12") + assert.Equal(t, [][]json.Number{{"123.99", "2.0"}}, pairs) + assert.Equal(t, [][]json.Number{}, removePrice(pairs, "123.99")) +} + +func Test_orderBookResponse_update(t *testing.T) { + ob := &orderBookResponse{Bids: nil, Asks: nil} + + ob.update(orderBookResponse{ + Bids: [][]json.Number{{"1.0", "0"}, {"10.0", "1"}, {"11.0", "1"}}, + Asks: [][]json.Number{{"1.0", "1"}}, + }) + assert.Equal(t, [][]json.Number{{"11.0", "1"}, {"10.0", "1"}}, ob.Bids) + assert.Equal(t, [][]json.Number{{"1.0", "1"}}, ob.Asks) + ob.update(orderBookResponse{ + Bids: [][]json.Number{{"9.0", "1"}, {"12.0", "1"}, {"10.5", "1"}}, + Asks: [][]json.Number{{"1.0", "0"}}, + }) + assert.Equal(t, [][]json.Number{{"12.0", "1"}, {"11.0", "1"}, {"10.5", "1"}, {"10.0", "1"}, {"9.0", "1"}}, ob.Bids) + assert.Equal(t, [][]json.Number{}, ob.Asks) + + // remove them + ob.update(orderBookResponse{ + Bids: [][]json.Number{{"9.0", "0"}, {"12.0", "0"}, {"10.5", "0"}}, + Asks: [][]json.Number{{"9.0", "1"}, {"12.0", "1"}, {"10.5", "1"}}, + }) + assert.Equal(t, [][]json.Number{{"11.0", "1"}, {"10.0", "1"}}, ob.Bids) + assert.Equal(t, [][]json.Number{{"9.0", "1"}, {"10.5", "1"}, {"12.0", "1"}}, ob.Asks) +} + +func Test_insertAt(t *testing.T) { + r := insertAt([][]json.Number{{"1.2", "2"}, {"1.4", "2"}}, 1, []json.Number{"1.3", "2"}) + assert.Equal(t, [][]json.Number{{"1.2", "2"}, {"1.3", "2"}, {"1.4", "2"}}, r) + + r = insertAt([][]json.Number{{"1.2", "2"}, {"1.4", "2"}}, 0, []json.Number{"1.1", "2"}) + assert.Equal(t, [][]json.Number{{"1.1", "2"}, {"1.2", "2"}, {"1.4", "2"}}, r) + + r = insertAt([][]json.Number{{"1.2", "2"}, {"1.4", "2"}}, 2, []json.Number{"1.5", "2"}) + assert.Equal(t, [][]json.Number{{"1.2", "2"}, {"1.4", "2"}, {"1.5", "2"}}, r) +}