Merge pull request #145 from c9s/ftx/order-update

feature: emit orderbook update
This commit is contained in:
YC 2021-03-04 13:25:37 +08:00 committed by GitHub
commit bd4168d3fc
7 changed files with 125 additions and 45 deletions

View File

@ -37,7 +37,10 @@ var orderbookCmd = &cobra.Command{
s := ex.NewStream() s := ex.NewStream()
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
s.OnBookSnapshot(func(book types.OrderBook) { s.OnBookSnapshot(func(book types.OrderBook) {
log.Infof("orderbook snapshot: %+v", book) log.Infof("orderbook snapshot: %s", book.String())
})
s.OnBookUpdate(func(book types.OrderBook) {
log.Infof("orderbook update: %s", book.String())
}) })
if err := s.Connect(ctx); err != nil { if err := s.Connect(ctx); err != nil {

View File

@ -0,0 +1,26 @@
{
"channel": "orderbook",
"market": "BTC/USDT",
"type": "update",
"data": {
"time": 1614737706.650016,
"checksum": 3976343467,
"bids": [
[
48763.0,
0.5001
]
],
"asks": [
[
48826.0,
0.3385
],
[
48929.0,
26.8713
]
],
"action": "update"
}
}

View File

@ -23,7 +23,7 @@ func NewStream(key, secret string) *Stream {
wsService: wss, wsService: wss,
} }
wss.OnMessage(messageHandler{StandardStream: s.StandardStream}.handleMessage) wss.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
return s return s
} }

View File

@ -22,11 +22,8 @@ func (h messageHandler) handleMessage(message []byte) {
switch r.Type { switch r.Type {
case subscribedRespType: case subscribedRespType:
h.handleSubscribedMessage(r) h.handleSubscribedMessage(r)
case partialRespType: case partialRespType, updateRespType:
// snapshot of current market data h.handleMarketData(r)
h.handleSnapshot(r)
case updateRespType:
//log.Infof("update=> %s", string(message))
default: default:
logger.Errorf("unsupported message type: %+v", r.Type) logger.Errorf("unsupported message type: %+v", r.Type)
} }
@ -38,11 +35,36 @@ func (h messageHandler) handleSubscribedMessage(response rawResponse) {
logger.Infof("%s %s is subscribed", r.Market, r.Channel) logger.Infof("%s %s is subscribed", r.Market, r.Channel)
} }
func (h messageHandler) handleSnapshot(response rawResponse) { func (h *messageHandler) handleMarketData(response rawResponse) {
r, err := response.toSnapshotResp() r, err := response.toDataResponse()
if err != nil { if err != nil {
log.WithError(err).Errorf("failed to convert the partial response to snapshot") log.WithError(err).Errorf("failed to convert the partial response to data response")
return
}
switch r.Channel {
case orderbook:
h.handleOrderBook(r)
default:
log.Errorf("unsupported market data channel %s", r.Channel)
return
}
}
func (h messageHandler) handleOrderBook(r dataResponse) {
ob, err := toGlobalOrderBook(r)
if err != nil {
log.WithError(err).Errorf("failed to generate orderbook snapshot")
return
}
switch r.Type {
case partialRespType:
h.EmitBookSnapshot(ob)
case updateRespType:
h.EmitBookUpdate(ob)
default:
log.Errorf("unsupported order book data type %s", r.Type)
return return
} }
h.EmitBookSnapshot(r.toGlobalOrderBook())
} }

View File

@ -2,6 +2,7 @@ package ftx
import ( import (
"encoding/json" "encoding/json"
"fmt"
"math" "math"
"strings" "strings"
"time" "time"
@ -64,13 +65,13 @@ func (r rawResponse) toSubscribedResp() subscribedResponse {
} }
} }
func (r rawResponse) toSnapshotResp() (snapshotResponse, error) { func (r rawResponse) toDataResponse() (dataResponse, error) {
o := snapshotResponse{ o := dataResponse{
mandatoryFields: r.mandatoryFields, mandatoryFields: r.mandatoryFields,
} }
if err := json.Unmarshal(r.Data, &o); err != nil { if err := json.Unmarshal(r.Data, &o); err != nil {
return snapshotResponse{}, err return dataResponse{}, err
} }
sec, dec := math.Modf(o.Time) sec, dec := math.Modf(o.Time)
@ -84,7 +85,7 @@ type subscribedResponse struct {
mandatoryFields mandatoryFields
} }
type snapshotResponse struct { type dataResponse struct {
mandatoryFields mandatoryFields
Action string `json:"action"` Action string `json:"action"`
@ -93,31 +94,42 @@ type snapshotResponse struct {
Timestamp time.Time Timestamp time.Time
Checksum int64 `json:"checksum"` Checksum uint32 `json:"checksum"`
// Best 100 orders Bids [][]json.Number `json:"bids"`
Bids [][]float64 `json:"bids"`
// Best 100 orders Asks [][]json.Number `json:"asks"`
Asks [][]float64 `json:"asks"`
} }
func (r snapshotResponse) toGlobalOrderBook() types.OrderBook { func toGlobalOrderBook(r dataResponse) (types.OrderBook, error) {
bids, err := toPriceVolumeSlice(r.Bids)
if err != nil {
return types.OrderBook{}, fmt.Errorf("can't convert bids to priceVolumeSlice: %w", err)
}
asks, err := toPriceVolumeSlice(r.Asks)
if err != nil {
return types.OrderBook{}, fmt.Errorf("can't convert asks to priceVolumeSlice: %w", err)
}
return types.OrderBook{ return types.OrderBook{
// ex. BTC/USDT // ex. BTC/USDT
Symbol: strings.ToUpper(r.Market), Symbol: strings.ToUpper(r.Market),
Bids: toPriceVolumeSlice(r.Bids), Bids: bids,
Asks: toPriceVolumeSlice(r.Asks), Asks: asks,
} }, nil
} }
func toPriceVolumeSlice(orders [][]float64) types.PriceVolumeSlice { func toPriceVolumeSlice(orders [][]json.Number) (types.PriceVolumeSlice, error) {
var pv types.PriceVolumeSlice var pv types.PriceVolumeSlice
for _, o := range orders { for _, o := range orders {
pv = append(pv, types.PriceVolume{ p, err := fixedpoint.NewFromString(string(o[0]))
Price: fixedpoint.NewFromFloat(o[0]), if err != nil {
Volume: fixedpoint.NewFromFloat(o[1]), return nil, fmt.Errorf("can't convert price %+v to fixedpoint: %w", o[0], err)
})
} }
return pv v, err := fixedpoint.NewFromString(string(o[1]))
if err != nil {
return nil, fmt.Errorf("can't convert volume %+v to fixedpoint: %w", o[0], err)
}
pv = append(pv, types.PriceVolume{Price: p, Volume: v})
}
return pv, nil
} }

View File

@ -21,35 +21,35 @@ func Test_rawResponse_toSubscribedResp(t *testing.T) {
assert.Equal(t, "BTC/USDT", r.Market) assert.Equal(t, "BTC/USDT", r.Market)
} }
func Test_rawResponse_toSnapshotResp(t *testing.T) { func Test_rawResponse_toDataResponse(t *testing.T) {
f, err := ioutil.ReadFile("./orderbook_snapshot.json") f, err := ioutil.ReadFile("./orderbook_snapshot.json")
assert.NoError(t, err) assert.NoError(t, err)
var m rawResponse var m rawResponse
assert.NoError(t, json.Unmarshal(f, &m)) assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toSnapshotResp() r, err := m.toDataResponse()
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, partialRespType, r.Type) assert.Equal(t, partialRespType, r.Type)
assert.Equal(t, orderbook, r.Channel) assert.Equal(t, orderbook, r.Channel)
assert.Equal(t, "BTC/USDT", r.Market) assert.Equal(t, "BTC/USDT", r.Market)
assert.Equal(t, int64(1614520368), r.Timestamp.Unix()) assert.Equal(t, int64(1614520368), r.Timestamp.Unix())
assert.Equal(t, int64(2150525410), r.Checksum) assert.Equal(t, uint32(2150525410), r.Checksum)
assert.Len(t, r.Bids, 100) assert.Len(t, r.Bids, 100)
assert.Equal(t, []float64{44555.0, 3.3968}, r.Bids[0]) assert.Equal(t, []json.Number{"44555.0", "3.3968"}, r.Bids[0])
assert.Equal(t, []float64{44554.0, 0.0561}, r.Bids[1]) assert.Equal(t, []json.Number{"44554.0", "0.0561"}, r.Bids[1])
assert.Len(t, r.Asks, 100) assert.Len(t, r.Asks, 100)
assert.Equal(t, []float64{44574.0, 0.4591}, r.Asks[0]) assert.Equal(t, []json.Number{"44574.0", "0.4591"}, r.Asks[0])
assert.Equal(t, []float64{44579.0, 0.15}, r.Asks[1]) assert.Equal(t, []json.Number{"44579.0", "0.15"}, r.Asks[1])
} }
func Test_snapshotResponse_toGlobalOrderBook(t *testing.T) { func Test_DataResponse_toGlobalOrderBook(t *testing.T) {
f, err := ioutil.ReadFile("./orderbook_snapshot.json") f, err := ioutil.ReadFile("./orderbook_snapshot.json")
assert.NoError(t, err) assert.NoError(t, err)
var m rawResponse var m rawResponse
assert.NoError(t, json.Unmarshal(f, &m)) assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toSnapshotResp() r, err := m.toDataResponse()
assert.NoError(t, err) assert.NoError(t, err)
b := r.toGlobalOrderBook() b, err := toGlobalOrderBook(r)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "BTC/USDT", b.Symbol) assert.Equal(t, "BTC/USDT", b.Symbol)
isValid, err := b.IsValid() isValid, err := b.IsValid()
@ -75,4 +75,6 @@ func Test_snapshotResponse_toGlobalOrderBook(t *testing.T) {
Price: fixedpoint.MustNewFromString("45010.0"), Price: fixedpoint.MustNewFromString("45010.0"),
Volume: fixedpoint.MustNewFromString("0.0003"), Volume: fixedpoint.MustNewFromString("0.0003"),
}, b.Asks[99]) }, b.Asks[99])
} }

View File

@ -3,6 +3,7 @@ package types
import ( import (
"fmt" "fmt"
"sort" "sort"
"strings"
"sync" "sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -227,16 +228,30 @@ func (b *OrderBook) Update(book OrderBook) {
} }
func (b *OrderBook) Print() { func (b *OrderBook) Print() {
fmt.Printf("BOOK %s\n", b.Symbol) fmt.Printf(b.String())
fmt.Printf("ASKS:\n")
for i := len(b.Asks) - 1; i >= 0; i-- {
fmt.Printf("- ASK: %s\n", b.Asks[i].String())
} }
fmt.Printf("BIDS:\n") func (b *OrderBook) String() string {
for _, bid := range b.Bids { sb := strings.Builder{}
fmt.Printf("- BID: %s\n", bid.String())
sb.WriteString("BOOK ")
sb.WriteString(b.Symbol)
sb.WriteString("\n")
sb.WriteString("ASKS:\n")
for i := len(b.Asks) - 1; i >= 0; i-- {
sb.WriteString("- ASK: ")
sb.WriteString(b.Asks[i].String())
sb.WriteString("\n")
} }
sb.WriteString("BIDS:\n")
for _, bid := range b.Bids {
sb.WriteString("- BID: ")
sb.WriteString(bid.String())
sb.WriteString("\n")
}
return sb.String()
} }
type MutexOrderBook struct { type MutexOrderBook struct {