Merge pull request #146 from c9s/ftx/orderbook-snapshot-checksum

This commit is contained in:
YC 2021-03-07 18:05:02 +08:00 committed by GitHub
commit 1de883e148
3 changed files with 231 additions and 14 deletions

View File

@ -12,7 +12,7 @@ type messageHandler struct {
*types.StandardStream
}
func (h messageHandler) handleMessage(message []byte) {
func (h *messageHandler) handleMessage(message []byte) {
var r rawResponse
if err := json.Unmarshal(message, &r); err != nil {
logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message))
@ -36,7 +36,7 @@ func (h messageHandler) handleSubscribedMessage(response rawResponse) {
}
func (h *messageHandler) handleMarketData(response rawResponse) {
r, err := response.toDataResponse()
r, err := response.toOrderBookResponse()
if err != nil {
log.WithError(err).Errorf("failed to convert the partial response to data response")
return
@ -51,8 +51,8 @@ func (h *messageHandler) handleMarketData(response rawResponse) {
}
}
func (h messageHandler) handleOrderBook(r dataResponse) {
ob, err := toGlobalOrderBook(r)
func (h *messageHandler) handleOrderBook(r orderBookResponse) {
globalOrderBook, err := toGlobalOrderBook(r)
if err != nil {
log.WithError(err).Errorf("failed to generate orderbook snapshot")
return
@ -60,9 +60,14 @@ func (h messageHandler) handleOrderBook(r dataResponse) {
switch r.Type {
case partialRespType:
h.EmitBookSnapshot(ob)
if err := r.verifyChecksum(); err != nil {
log.WithError(err).Errorf("invalid orderbook snapshot")
return
}
h.EmitBookSnapshot(globalOrderBook)
case updateRespType:
h.EmitBookUpdate(ob)
// emit updates, not the whole orderbook
h.EmitBookUpdate(globalOrderBook)
default:
log.Errorf("unsupported order book data type %s", r.Type)
return

View File

@ -3,7 +3,9 @@ package ftx
import (
"encoding/json"
"fmt"
"hash/crc32"
"math"
"strconv"
"strings"
"time"
@ -65,13 +67,13 @@ func (r rawResponse) toSubscribedResp() subscribedResponse {
}
}
func (r rawResponse) toDataResponse() (dataResponse, error) {
o := dataResponse{
func (r rawResponse) toOrderBookResponse() (orderBookResponse, error) {
o := orderBookResponse{
mandatoryFields: r.mandatoryFields,
}
if err := json.Unmarshal(r.Data, &o); err != nil {
return dataResponse{}, err
return orderBookResponse{}, err
}
sec, dec := math.Modf(o.Time)
@ -85,7 +87,7 @@ type subscribedResponse struct {
mandatoryFields
}
type dataResponse struct {
type orderBookResponse struct {
mandatoryFields
Action string `json:"action"`
@ -96,12 +98,130 @@ type dataResponse struct {
Checksum uint32 `json:"checksum"`
// best 100 orders. Ex. {[100,1], [50, 2]}
Bids [][]json.Number `json:"bids"`
// best 100 orders. Ex. {[51, 1], [102, 3]}
Asks [][]json.Number `json:"asks"`
}
func toGlobalOrderBook(r dataResponse) (types.OrderBook, error) {
// only 100 orders so we use linear search here
func (r *orderBookResponse) update(orderUpdates orderBookResponse) {
r.Checksum = orderUpdates.Checksum
r.updateBids(orderUpdates.Bids)
r.updateAsks(orderUpdates.Asks)
}
func (r *orderBookResponse) updateAsks(asks [][]json.Number) {
higherPrice := func(dst, src float64) bool {
return dst < src
}
for _, o := range asks {
if remove := o[1] == "0"; remove {
r.Asks = removePrice(r.Asks, o[0])
} else {
r.Asks = upsertPriceVolume(r.Asks, o, higherPrice)
}
}
}
func (r *orderBookResponse) updateBids(bids [][]json.Number) {
lessPrice := func(dst, src float64) bool {
return dst > src
}
for _, o := range bids {
if remove := o[1] == "0"; remove {
r.Bids = removePrice(r.Bids, o[0])
} else {
r.Bids = upsertPriceVolume(r.Bids, o, lessPrice)
}
}
}
func upsertPriceVolume(dst [][]json.Number, src []json.Number, priceComparator func(dst float64, src float64) bool) [][]json.Number {
for i, pv := range dst {
dstPrice := pv[0]
srcPrice := src[0]
// update volume
if dstPrice == srcPrice {
pv[1] = src[1]
return dst
}
// The value must be a number which is verified by json.Unmarshal, so the err
// should never happen.
dstPriceNum, err := strconv.ParseFloat(string(dstPrice), 64)
if err != nil {
logger.WithError(err).Errorf("unexpected price %s", dstPrice)
continue
}
srcPriceNum, err := strconv.ParseFloat(string(srcPrice), 64)
if err != nil {
logger.WithError(err).Errorf("unexpected price updates %s", srcPrice)
continue
}
if !priceComparator(dstPriceNum, srcPriceNum) {
return insertAt(dst, i, src)
}
}
return append(dst, src)
}
func insertAt(dst [][]json.Number, id int, pv []json.Number) (result [][]json.Number) {
result = append(result, dst[:id]...)
result = append(result, pv)
result = append(result, dst[id:]...)
return
}
func removePrice(dst [][]json.Number, price json.Number) [][]json.Number {
for i, pv := range dst {
if pv[0] == price {
return append(dst[:i], dst[i+1:]...)
}
}
return dst
}
func (r orderBookResponse) verifyChecksum() error {
if crc32Val := crc32.ChecksumIEEE([]byte(checksumString(r.Bids, r.Asks))); crc32Val != r.Checksum {
return fmt.Errorf("expected checksum %d, actual checksum %d: %w", r.Checksum, crc32Val, errUnmatchedChecksum)
}
return nil
}
// <best_bid_price>:<best_bid_size>:<best_ask_price>:<best_ask_size>...
func checksumString(bids, asks [][]json.Number) string {
sb := strings.Builder{}
appendNumber := func(pv []json.Number) {
if sb.Len() != 0 {
sb.WriteString(":")
}
sb.WriteString(string(pv[0]))
sb.WriteString(":")
sb.WriteString(string(pv[1]))
}
bidsLen := len(bids)
asksLen := len(asks)
for i := 0; i < bidsLen || i < asksLen; i++ {
if i < bidsLen {
appendNumber(bids[i])
}
if i < asksLen {
appendNumber(asks[i])
}
}
return sb.String()
}
var errUnmatchedChecksum = fmt.Errorf("unmatched checksum")
func toGlobalOrderBook(r orderBookResponse) (types.OrderBook, error) {
bids, err := toPriceVolumeSlice(r.Bids)
if err != nil {
return types.OrderBook{}, fmt.Errorf("can't convert bids to priceVolumeSlice: %w", err)

View File

@ -26,7 +26,7 @@ func Test_rawResponse_toDataResponse(t *testing.T) {
assert.NoError(t, err)
var m rawResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toDataResponse()
r, err := m.toOrderBookResponse()
assert.NoError(t, err)
assert.Equal(t, partialRespType, r.Type)
assert.Equal(t, orderbook, r.Channel)
@ -41,12 +41,12 @@ func Test_rawResponse_toDataResponse(t *testing.T) {
assert.Equal(t, []json.Number{"44579.0", "0.15"}, r.Asks[1])
}
func Test_DataResponse_toGlobalOrderBook(t *testing.T) {
func Test_orderBookResponse_toGlobalOrderBook(t *testing.T) {
f, err := ioutil.ReadFile("./orderbook_snapshot.json")
assert.NoError(t, err)
var m rawResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toDataResponse()
r, err := m.toOrderBookResponse()
assert.NoError(t, err)
b, err := toGlobalOrderBook(r)
@ -78,3 +78,95 @@ func Test_DataResponse_toGlobalOrderBook(t *testing.T) {
}
func Test_checksumString(t *testing.T) {
type args struct {
bids [][]json.Number
asks [][]json.Number
}
tests := []struct {
name string
args args
want string
}{
{
name: "more bids",
args: args{
bids: [][]json.Number{{"5000.5", "10"}, {"4995.0", "5"}},
asks: [][]json.Number{{"5001.0", "6"}},
},
want: "5000.5:10:5001.0:6:4995.0:5",
},
{
name: "lengths of bids and asks are the same",
args: args{
bids: [][]json.Number{{"5000.5", "10"}, {"4995.0", "5"}},
asks: [][]json.Number{{"5001.0", "6"}, {"5002.0", "7"}},
},
want: "5000.5:10:5001.0:6:4995.0:5:5002.0:7",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := checksumString(tt.args.bids, tt.args.asks); got != tt.want {
t.Errorf("checksumString() = %v, want %v", got, tt.want)
}
})
}
}
func Test_orderBookResponse_verifyChecksum(t *testing.T) {
for _, file := range []string{"./orderbook_snapshot.json"} {
f, err := ioutil.ReadFile(file)
assert.NoError(t, err)
var m rawResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toOrderBookResponse()
assert.NoError(t, err)
assert.NoError(t, r.verifyChecksum(), "filename: "+file)
}
}
func Test_removePrice(t *testing.T) {
pairs := [][]json.Number{{"123.99", "2.0"}, {"2234.12", "3.1"}}
assert.Equal(t, pairs, removePrice(pairs, "99333"))
pairs = removePrice(pairs, "2234.12")
assert.Equal(t, [][]json.Number{{"123.99", "2.0"}}, pairs)
assert.Equal(t, [][]json.Number{}, removePrice(pairs, "123.99"))
}
func Test_orderBookResponse_update(t *testing.T) {
ob := &orderBookResponse{Bids: nil, Asks: nil}
ob.update(orderBookResponse{
Bids: [][]json.Number{{"1.0", "0"}, {"10.0", "1"}, {"11.0", "1"}},
Asks: [][]json.Number{{"1.0", "1"}},
})
assert.Equal(t, [][]json.Number{{"11.0", "1"}, {"10.0", "1"}}, ob.Bids)
assert.Equal(t, [][]json.Number{{"1.0", "1"}}, ob.Asks)
ob.update(orderBookResponse{
Bids: [][]json.Number{{"9.0", "1"}, {"12.0", "1"}, {"10.5", "1"}},
Asks: [][]json.Number{{"1.0", "0"}},
})
assert.Equal(t, [][]json.Number{{"12.0", "1"}, {"11.0", "1"}, {"10.5", "1"}, {"10.0", "1"}, {"9.0", "1"}}, ob.Bids)
assert.Equal(t, [][]json.Number{}, ob.Asks)
// remove them
ob.update(orderBookResponse{
Bids: [][]json.Number{{"9.0", "0"}, {"12.0", "0"}, {"10.5", "0"}},
Asks: [][]json.Number{{"9.0", "1"}, {"12.0", "1"}, {"10.5", "1"}},
})
assert.Equal(t, [][]json.Number{{"11.0", "1"}, {"10.0", "1"}}, ob.Bids)
assert.Equal(t, [][]json.Number{{"9.0", "1"}, {"10.5", "1"}, {"12.0", "1"}}, ob.Asks)
}
func Test_insertAt(t *testing.T) {
r := insertAt([][]json.Number{{"1.2", "2"}, {"1.4", "2"}}, 1, []json.Number{"1.3", "2"})
assert.Equal(t, [][]json.Number{{"1.2", "2"}, {"1.3", "2"}, {"1.4", "2"}}, r)
r = insertAt([][]json.Number{{"1.2", "2"}, {"1.4", "2"}}, 0, []json.Number{"1.1", "2"})
assert.Equal(t, [][]json.Number{{"1.1", "2"}, {"1.2", "2"}, {"1.4", "2"}}, r)
r = insertAt([][]json.Number{{"1.2", "2"}, {"1.4", "2"}}, 2, []json.Number{"1.5", "2"})
assert.Equal(t, [][]json.Number{{"1.2", "2"}, {"1.4", "2"}, {"1.5", "2"}}, r)
}