From 884e764fe7377da623a43bda55d3e2a607153b06 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 27 May 2021 15:48:51 +0800 Subject: [PATCH] okex: order book parsing --- pkg/exchange/okex/parse.go | 150 ++++++++++++++++++++++++++++++++++++ pkg/exchange/okex/stream.go | 7 +- 2 files changed, 156 insertions(+), 1 deletion(-) diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 30d7b8c92..9c3472444 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -1 +1,151 @@ package okex + +import ( + "errors" + "fmt" + "strconv" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/valyala/fastjson" +) + +func Parse(str string) (interface{}, error) { + v, err := fastjson.Parse(str) + if err != nil { + return nil, err + } + + if v.Exists("event") { + return parseEvent(v) + } + + if v.Exists("data") { + return parseData(v) + } + + return nil, nil +} + +type WebSocketEvent struct { + Event string + Code string + Message string +} + +func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) { + // event could be "subscribe", "unsubscribe" or "error" + event := string(v.GetStringBytes("event")) + code := string(v.GetStringBytes("code")) + message := string(v.GetStringBytes("message")) + return &WebSocketEvent{ + Event: event, + Code: code, + Message: message, + }, nil +} + +type BookData struct { + InstrumentID string + Symbol string + Action string + Bids []BookEntry + Asks []BookEntry + MillisecondTimestamp int64 + Checksum int +} + +type BookEntry struct { + Price fixedpoint.Value + Volume fixedpoint.Value + NumLiquidated int + NumOrders int +} + +func parseBookEntry(v *fastjson.Value) (*BookEntry, error) { + arr, err := v.Array() + if err != nil { + return nil, err + } + + if len(arr) < 4 { + return nil, fmt.Errorf("unexpected book entry size: %d", len(arr)) + } + + price := fixedpoint.Must(fixedpoint.NewFromString(string(arr[0].GetStringBytes()))) + volume := fixedpoint.Must(fixedpoint.NewFromString(string(arr[1].GetStringBytes()))) + numLiquidated, err := strconv.Atoi(string(arr[2].GetStringBytes())) + if err != nil { + return nil, err + } + + numOrders, err := strconv.Atoi(string(arr[3].GetStringBytes())) + if err != nil { + return nil, err + } + + return &BookEntry{ + Price: price, + Volume: volume, + NumLiquidated: numLiquidated, + NumOrders: numOrders, + }, nil +} + +func parseBookData(instrumentId string, v *fastjson.Value) (interface{}, error) { + data := v.GetArray("data") + if len(data) == 0 { + return nil, errors.New("empty data payload") + } + + // "snapshot" or "update" + action := string(v.GetStringBytes("action")) + + millisecondTimestamp, err := strconv.ParseInt(string(data[0].GetStringBytes("ts")), 10, 64) + if err != nil { + return nil, err + } + + checksum := data[0].GetInt("checksum") + + var asks []BookEntry + var bids []BookEntry + + for _, v := range data[0].GetArray("asks") { + entry, err := parseBookEntry(v) + if err != nil { + return nil, err + } + asks = append(asks, *entry) + } + + for _, v := range data[0].GetArray("bids") { + entry, err := parseBookEntry(v) + if err != nil { + return nil, err + } + bids = append(bids, *entry) + } + + return &BookData{ + InstrumentID: instrumentId, + Symbol: toGlobalSymbol(instrumentId), + Action: action, + Bids: bids, + Asks: asks, + Checksum: checksum, + MillisecondTimestamp: millisecondTimestamp, + }, nil +} + +func parseData(v *fastjson.Value) (interface{}, error) { + instrumentId := string(v.GetStringBytes("arg", "instId")) + channel := string(v.GetStringBytes("arg", "channel")) + + switch channel { + case "books": + return parseBookData(instrumentId, v) + + } + + return nil, nil +} diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 996e0931a..b85d54dc2 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -208,7 +208,12 @@ func (s *Stream) read(ctx context.Context) { continue } - log.Infof(string(message)) + e, err := Parse(string(message)) + if err != nil { + log.WithError(err).Error("message parse error") + } + + log.Infof("%+v", e) } } }