bbgo_origin/pkg/exchange/max/maxapi/public_parser.go

363 lines
9.1 KiB
Go
Raw Normal View History

2020-10-01 08:07:18 +00:00
package max
import (
"strings"
"time"
"github.com/pkg/errors"
"github.com/valyala/fastjson"
2020-10-02 13:15:00 +00:00
2020-10-11 08:46:15 +00:00
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
2020-10-01 08:07:18 +00:00
)
var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length")
const Buy = 1
const Sell = -1
2021-06-26 13:03:03 +00:00
var parserPool fastjson.ParserPool
2020-10-02 04:43:14 +00:00
// ParseMessage accepts the raw messages from max public websocket channels and parses them into market data
2020-10-02 13:29:56 +00:00
// Return types: *BookEvent, *PublicTradeEvent, *SubscriptionEvent, *ErrorEvent
2020-10-02 04:43:14 +00:00
func ParseMessage(payload []byte) (interface{}, error) {
2021-06-26 13:03:03 +00:00
parser := parserPool.Get()
2020-10-01 08:07:18 +00:00
val, err := parser.ParseBytes(payload)
if err != nil {
return nil, errors.Wrap(err, "failed to parse payload: "+string(payload))
}
if channel := string(val.GetStringBytes("c")); len(channel) > 0 {
switch channel {
case "kline":
return parseKLineEvent(val)
2020-10-01 08:07:18 +00:00
case "book":
return parseBookEvent(val)
case "trade":
2020-10-02 13:29:56 +00:00
return parsePublicTradeEvent(val)
2020-10-02 04:43:14 +00:00
case "user":
return ParseUserEvent(val)
2020-10-01 08:07:18 +00:00
}
}
eventType := string(val.GetStringBytes("e"))
switch eventType {
2020-12-28 08:24:17 +00:00
case "authenticated":
return parseAuthEvent(val)
2020-10-01 08:07:18 +00:00
case "error":
return parseErrorEvent(val)
case "subscribed", "unsubscribed":
return parseSubscriptionEvent(val)
}
return nil, errors.Wrapf(ErrMessageTypeNotSupported, "payload %s", payload)
}
type TradeEntry struct {
Trend string `json:"tr"`
Price string `json:"p"`
Volume string `json:"v"`
Timestamp int64 `json:"T"`
}
func (e TradeEntry) Time() time.Time {
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
}
// parseTradeEntry parse the trade content payload
func parseTradeEntry(val *fastjson.Value) TradeEntry {
return TradeEntry{
Trend: strings.ToLower(string(val.GetStringBytes("tr"))),
Timestamp: val.GetInt64("T"),
Price: string(val.GetStringBytes("p")),
Volume: string(val.GetStringBytes("v")),
}
}
type KLineEvent struct {
Event string `json:"e"`
Market string `json:"M"`
Channel string `json:"c"`
KLine KLine `json:"k"`
Timestamp int64 `json:"T"`
}
/*
{
"c": "kline",
"M": "btcusdt",
"e": "update",
"T": 1602999650179,
"k": {
"ST": 1602999900000,
"ET": 1602999900000,
"M": "btcusdt",
"R": "5m",
"O": "11417.21",
"H": "11417.21",
"L": "11417.21",
"C": "11417.21",
"v": "0",
"ti": 0,
"x": false
}
}
*/
type KLinePayload struct {
StartTime int64 `json:"ST"`
EndTime int64 `json:"ET"`
Market string `json:"M"`
Resolution string `json:"R"`
Open string `json:"O"`
High string `json:"H"`
Low string `json:"L"`
Close string `json:"C"`
Volume string `json:"v"`
LastTradeID int `json:"ti"`
Closed bool `json:"x"`
}
func (k KLinePayload) KLine() types.KLine {
return types.KLine{
StartTime: types.Time(time.Unix(0, k.StartTime*int64(time.Millisecond))),
EndTime: types.Time(time.Unix(0, k.EndTime*int64(time.Millisecond))),
2020-11-06 16:49:17 +00:00
Symbol: k.Market,
Interval: types.Interval(k.Resolution),
Open: fixedpoint.MustNewFromString(k.Open),
Close: fixedpoint.MustNewFromString(k.Close),
High: fixedpoint.MustNewFromString(k.High),
Low: fixedpoint.MustNewFromString(k.Low),
Volume: fixedpoint.MustNewFromString(k.Volume),
QuoteVolume: fixedpoint.Zero, // TODO: add this from kingfisher
2020-11-06 16:49:17 +00:00
LastTradeID: uint64(k.LastTradeID),
NumberOfTrades: 0, // TODO: add this from kingfisher
Closed: k.Closed,
}
}
2020-10-02 13:29:56 +00:00
type PublicTradeEvent struct {
2020-10-01 08:07:18 +00:00
Event string `json:"e"`
Market string `json:"M"`
Channel string `json:"c"`
Trades []TradeEntry `json:"t"`
Timestamp int64 `json:"T"`
}
2020-10-02 13:29:56 +00:00
func (e *PublicTradeEvent) Time() time.Time {
2020-10-01 08:07:18 +00:00
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
}
2020-10-02 13:29:56 +00:00
func parsePublicTradeEvent(val *fastjson.Value) (*PublicTradeEvent, error) {
event := PublicTradeEvent{
2020-10-01 08:07:18 +00:00
Event: string(val.GetStringBytes("e")),
Market: string(val.GetStringBytes("M")),
Channel: string(val.GetStringBytes("c")),
Timestamp: val.GetInt64("T"),
}
for _, tradeValue := range val.GetArray("t") {
event.Trades = append(event.Trades, parseTradeEntry(tradeValue))
}
return &event, nil
}
type BookEvent struct {
Event string `json:"e"`
Market string `json:"M"`
Channel string `json:"c"`
Timestamp int64 `json:"t"` // Millisecond timestamp
2021-06-28 05:11:26 +00:00
Bids types.PriceVolumeSlice
Asks types.PriceVolumeSlice
2020-10-01 08:07:18 +00:00
}
func (e *BookEvent) Time() time.Time {
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
}
2021-05-22 03:36:58 +00:00
func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook, err error) {
snapshot.Symbol = strings.ToUpper(e.Market)
snapshot.Time = e.Time()
2021-06-28 05:11:26 +00:00
snapshot.Bids = e.Bids
snapshot.Asks = e.Asks
2020-10-02 13:15:00 +00:00
return snapshot, nil
}
func parseKLineEvent(val *fastjson.Value) (*KLineEvent, error) {
event := KLineEvent{
Event: string(val.GetStringBytes("e")),
Market: string(val.GetStringBytes("M")),
Channel: string(val.GetStringBytes("c")),
Timestamp: val.GetInt64("T"),
}
2020-11-05 07:04:56 +00:00
event.KLine = KLine{
Symbol: string(val.GetStringBytes("k", "M")),
Interval: string(val.GetStringBytes("k", "R")),
StartTime: time.Unix(0, val.GetInt64("k", "ST")*int64(time.Millisecond)),
EndTime: time.Unix(0, val.GetInt64("k", "ET")*int64(time.Millisecond)),
Open: fixedpoint.MustNewFromBytes(val.GetStringBytes("k", "O")),
High: fixedpoint.MustNewFromBytes(val.GetStringBytes("k", "H")),
Low: fixedpoint.MustNewFromBytes(val.GetStringBytes("k", "L")),
Close: fixedpoint.MustNewFromBytes(val.GetStringBytes("k", "C")),
Volume: fixedpoint.MustNewFromBytes(val.GetStringBytes("k", "v")),
2020-11-05 07:04:56 +00:00
Closed: val.GetBool("k", "x"),
}
return &event, nil
}
2020-10-01 08:07:18 +00:00
func parseBookEvent(val *fastjson.Value) (*BookEvent, error) {
event := BookEvent{
Event: string(val.GetStringBytes("e")),
Market: string(val.GetStringBytes("M")),
Channel: string(val.GetStringBytes("c")),
Timestamp: val.GetInt64("T"),
}
2021-06-28 05:13:32 +00:00
// t := time.Unix(0, event.Timestamp*int64(time.Millisecond))
2020-10-01 08:07:18 +00:00
var err error
2021-06-28 05:11:26 +00:00
event.Asks, err = parseBookEntries2(val.GetArray("a"))
2020-10-01 08:07:18 +00:00
if err != nil {
return nil, err
}
2021-06-28 05:11:26 +00:00
event.Bids, err = parseBookEntries2(val.GetArray("b"))
2020-10-01 08:07:18 +00:00
if err != nil {
return nil, err
}
return &event, nil
}
type BookEntry struct {
Side int
Time time.Time
Price string
Volume string
}
2020-10-03 01:19:38 +00:00
func (e *BookEntry) PriceVolumePair() (pv types.PriceVolume, err error) {
2020-10-02 13:15:00 +00:00
pv.Price, err = fixedpoint.NewFromString(e.Price)
if err != nil {
return pv, err
}
pv.Volume, err = fixedpoint.NewFromString(e.Volume)
if err != nil {
return pv, err
}
return pv, err
}
2021-06-28 05:11:26 +00:00
// parseBookEntries2 parses JSON struct like `[["233330", "0.33"], ....]`
func parseBookEntries2(vals []*fastjson.Value) (types.PriceVolumeSlice, error) {
entries := make(types.PriceVolumeSlice, 0, 50)
2021-06-28 05:11:26 +00:00
for _, entry := range vals {
pv, err := entry.Array()
if err != nil {
return nil, err
}
if len(pv) < 2 {
return nil, ErrIncorrectBookEntryElementLength
}
price, err := fixedpoint.NewFromString(string(pv[0].GetStringBytes()))
if err != nil {
return nil, err
}
volume, err := fixedpoint.NewFromString(string(pv[1].GetStringBytes()))
if err != nil {
return nil, err
}
entries = append(entries, types.PriceVolume{
Price: price,
Volume: volume,
})
}
return entries, nil
2021-06-28 05:11:26 +00:00
}
2020-10-01 08:07:18 +00:00
// parseBookEntries parses JSON struct like `[["233330", "0.33"], ....]`
func parseBookEntries(vals []*fastjson.Value, side int, t time.Time) (entries []BookEntry, err error) {
for _, entry := range vals {
pv, err := entry.Array()
if err != nil {
return nil, err
}
if len(pv) < 2 {
return nil, ErrIncorrectBookEntryElementLength
}
entries = append(entries, BookEntry{
Side: side,
Time: t,
2020-10-02 13:15:00 +00:00
Price: string(pv[0].GetStringBytes()),
Volume: string(pv[1].GetStringBytes()),
2020-10-01 08:07:18 +00:00
})
}
return entries, nil
}
type ErrorEvent struct {
Timestamp int64
Errors []string
CommandID string
}
func (e ErrorEvent) Time() time.Time {
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
}
func parseErrorEvent(val *fastjson.Value) (*ErrorEvent, error) {
event := ErrorEvent{
Timestamp: val.GetInt64("T"),
CommandID: string(val.GetStringBytes("i")),
}
for _, entry := range val.GetArray("E") {
event.Errors = append(event.Errors, string(entry.GetStringBytes()))
}
return &event, nil
}
type SubscriptionEvent struct {
Event string `json:"e"`
Timestamp int64 `json:"T"`
CommandID string `json:"i"`
Subscriptions []Subscription `json:"s"`
}
func (e SubscriptionEvent) Time() time.Time {
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
}
func parseSubscriptionEvent(val *fastjson.Value) (*SubscriptionEvent, error) {
event := SubscriptionEvent{
Event: string(val.GetStringBytes("e")),
Timestamp: val.GetInt64("T"),
CommandID: string(val.GetStringBytes("i")),
}
for _, entry := range val.GetArray("s") {
market := string(entry.GetStringBytes("market"))
channel := string(entry.GetStringBytes("channel"))
event.Subscriptions = append(event.Subscriptions, Subscription{
Market: market,
Channel: channel,
})
}
return &event, nil
}