add user data message parser

This commit is contained in:
c9s 2020-10-01 17:05:46 +08:00
parent 0708cee962
commit b153f3dea2

View File

@ -5,49 +5,11 @@ import (
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
) )
func ParsePrivateEvent(message []byte) (interface{}, error) { type BaseEvent struct {
var fp fastjson.Parser Event string `json:"e"`
var v, err = fp.ParseBytes(message) Timestamp int64 `json:"T"`
if err != nil {
return nil, errors.Wrap(err, "fail to parse account info raw message")
}
eventType := string(v.GetStringBytes("e"))
switch eventType {
case "order_snapshot":
return parserOrderSnapshotEvent(v)
case "order_update":
return parseOrderUpdateEvent(v)
case "trade_snapshot":
return parseTradeSnapshotEvent(v)
case "trade_update":
return parseTradeUpdateEvent(v)
case "account_snapshot":
return parserAccountSnapshotEvent(v)
case "account_update":
return parserAccountUpdateEvent(v)
case "authenticated":
return parserAuthEvent(v)
case "error":
logger.Errorf("error %s", message)
}
return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", message)
} }
var (
errParseOrder = errors.New("failed parse order")
errParseTrade = errors.New("failed parse trade")
errParseAccount = errors.New("failed parse account")
)
type OrderUpdate struct { type OrderUpdate struct {
Event string `json:"e"` Event string `json:"e"`
ID uint64 `json:"i"` ID uint64 `json:"i"`
@ -69,7 +31,13 @@ type OrderUpdate struct {
CreatedAtMs int64 `json:"T"` CreatedAtMs int64 `json:"T"`
} }
func parserOrderUpdate(v *fastjson.Value) (OrderUpdate, error) { type OrderUpdateEvent struct {
BaseEvent
Orders []OrderUpdate `json:"o"`
}
func parserOrderUpdate(v *fastjson.Value) OrderUpdate {
return OrderUpdate{ return OrderUpdate{
Event: string(v.GetStringBytes("e")), Event: string(v.GetStringBytes("e")),
ID: v.GetUint64("i"), ID: v.GetUint64("i"),
@ -86,39 +54,37 @@ func parserOrderUpdate(v *fastjson.Value) (OrderUpdate, error) {
GroupID: v.GetInt64("gi"), GroupID: v.GetInt64("gi"),
ClientOID: string(v.GetStringBytes("ci")), ClientOID: string(v.GetStringBytes("ci")),
CreatedAtMs: v.GetInt64("T"), CreatedAtMs: v.GetInt64("T"),
}, nil }
} }
func parseOrderUpdateEvent(v *fastjson.Value) (OrderUpdate, error) { func parseOrderUpdateEvent(v *fastjson.Value) (e OrderUpdateEvent) {
rawOrders := v.GetArray("o") e.Event = string(v.GetStringBytes("e"))
if len(rawOrders) == 0 { e.Timestamp = v.GetInt64("T")
return OrderUpdate{}, errParseOrder
for _, ov := range v.GetArray("o") {
o := parserOrderUpdate(ov)
e.Orders = append(e.Orders, o)
} }
return parserOrderUpdate(rawOrders[0]) return e
} }
type OrderSnapshot []OrderUpdate type OrderSnapshotEvent struct {
BaseEvent
func parserOrderSnapshotEvent(v *fastjson.Value) (orderSnapshot OrderSnapshot, err error) { Orders []OrderUpdate `json:"o"`
var errCount int }
rawOrders := v.GetArray("o") func parserOrderSnapshotEvent(v *fastjson.Value) (e OrderSnapshotEvent) {
for _, ov := range rawOrders { e.Event = string(v.GetStringBytes("e"))
o, e := parserOrderUpdate(ov) e.Timestamp = v.GetInt64("T")
if e != nil {
errCount++ for _, ov := range v.GetArray("o") {
err = e o := parserOrderUpdate(ov)
} else { e.Orders = append(e.Orders, o)
orderSnapshot = append(orderSnapshot, o)
}
} }
if errCount > 0 { return e
err = errors.Wrapf(err, "failed to parse order snapshot. %d errors in order snapshot. The last error: ", errCount)
}
return
} }
type TradeUpdate struct { type TradeUpdate struct {
@ -135,7 +101,7 @@ type TradeUpdate struct {
OrderID uint64 `json:"oi"` OrderID uint64 `json:"oi"`
} }
func parseTradeUpdate(v *fastjson.Value) (TradeUpdate, error) { func parseTradeUpdate(v *fastjson.Value) TradeUpdate {
return TradeUpdate{ return TradeUpdate{
ID: v.GetUint64("i"), ID: v.GetUint64("i"),
Side: string(v.GetStringBytes("sd")), Side: string(v.GetStringBytes("sd")),
@ -146,39 +112,43 @@ func parseTradeUpdate(v *fastjson.Value) (TradeUpdate, error) {
FeeCurrency: string(v.GetStringBytes("fc")), FeeCurrency: string(v.GetStringBytes("fc")),
Timestamp: v.GetInt64("T"), Timestamp: v.GetInt64("T"),
OrderID: v.GetUint64("oi"), OrderID: v.GetUint64("oi"),
}, nil }
} }
func parseTradeUpdateEvent(v *fastjson.Value) (TradeUpdate, error) { type TradeUpdateEvent struct {
rawTrades := v.GetArray("t") BaseEvent
if len(rawTrades) == 0 {
return TradeUpdate{}, errParseTrade Trades []TradeUpdate `json:"t"`
}
func parseTradeUpdateEvent(v *fastjson.Value) (e TradeUpdateEvent) {
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
for _, tv := range v.GetArray("t") {
e.Trades = append(e.Trades, parseTradeUpdate(tv))
} }
return parseTradeUpdate(rawTrades[0]) return e
} }
type TradeSnapshot []TradeUpdate type TradeSnapshot []TradeUpdate
func parseTradeSnapshotEvent(v *fastjson.Value) (tradeSnapshot TradeSnapshot, err error) { type TradeSnapshotEvent struct {
var errCount int BaseEvent
rawTrades := v.GetArray("t") Trades []TradeUpdate `json:"t"`
for _, tv := range rawTrades { }
t, e := parseTradeUpdate(tv)
if e != nil { func parseTradeSnapshotEvent(v *fastjson.Value) (e TradeSnapshotEvent) {
errCount++ e.Event = string(v.GetStringBytes("e"))
err = e e.Timestamp = v.GetInt64("T")
} else {
tradeSnapshot = append(tradeSnapshot, t) for _, tv := range v.GetArray("t") {
} e.Trades = append(e.Trades, parseTradeUpdate(tv))
} }
if errCount > 0 { return e
err = errors.Wrapf(err, "failed to parse trade snapshot. %d errors in trade snapshot. The last error: ", errCount)
}
return
} }
type Balance struct { type Balance struct {
@ -187,50 +157,89 @@ type Balance struct {
Locked string `json:"l"` Locked string `json:"l"`
} }
func parseBalance(v *fastjson.Value) (Balance, error) { func parseBalance(v *fastjson.Value) Balance {
return Balance{ return Balance{
Currency: string(v.GetStringBytes("cu")), Currency: string(v.GetStringBytes("cu")),
Available: string(v.GetStringBytes("av")), Available: string(v.GetStringBytes("av")),
Locked: string(v.GetStringBytes("l")), Locked: string(v.GetStringBytes("l")),
}, nil }
} }
func parserAccountUpdateEvent(v *fastjson.Value) (Balance, error) {
rawBalances := v.GetArray("B")
if len(rawBalances) == 0 {
return Balance{}, errParseAccount
}
return parseBalance(rawBalances[0]) type AccountUpdateEvent struct {
BaseEvent
Balances []Balance `json:"B"`
} }
type BalanceSnapshot []Balance func parserAccountUpdateEvent(v *fastjson.Value) (e AccountUpdateEvent) {
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
func parserAccountSnapshotEvent(v *fastjson.Value) (balanceSnapshot BalanceSnapshot, err error) { for _, bv := range v.GetArray("B") {
var errCount int e.Balances = append(e.Balances, parseBalance(bv))
rawBalances := v.GetArray("B")
for _, bv := range rawBalances {
b, e := parseBalance(bv)
if e != nil {
errCount++
err = e
} else {
balanceSnapshot = append(balanceSnapshot, b)
}
} }
if errCount > 0 { return e
err = errors.Wrapf(err, "failed to parse balance snapshot. %d errors in balance snapshot. The last error: ", errCount)
}
return
} }
func parserAuthEvent(v *fastjson.Value) (AuthEvent, error) { type AccountSnapshotEvent struct {
BaseEvent
Balances []Balance `json:"B"`
}
func parserAccountSnapshotEvent(v *fastjson.Value) (e AccountSnapshotEvent) {
e.Event = string(v.GetStringBytes("e"))
e.Timestamp = v.GetInt64("T")
for _, bv := range v.GetArray("B") {
e.Balances = append(e.Balances, parseBalance(bv))
}
return e
}
func parserAuthEvent(v *fastjson.Value) AuthEvent {
return AuthEvent{ return AuthEvent{
Event: string(v.GetStringBytes("e")), Event: string(v.GetStringBytes("e")),
ID: string(v.GetStringBytes("i")), ID: string(v.GetStringBytes("i")),
Timestamp: v.GetInt64("T"), Timestamp: v.GetInt64("T"),
}, nil }
}
func ParsePrivateEvent(message []byte) (interface{}, error) {
var fp fastjson.Parser
var v, err = fp.ParseBytes(message)
if err != nil {
return nil, errors.Wrap(err, "fail to parse account info raw message")
}
eventType := string(v.GetStringBytes("e"))
switch eventType {
case "order_snapshot":
return parserOrderSnapshotEvent(v), nil
case "order_update":
return parseOrderUpdateEvent(v), nil
case "trade_snapshot":
return parseTradeSnapshotEvent(v), nil
case "trade_update":
return parseTradeUpdateEvent(v), nil
case "account_snapshot":
return parserAccountSnapshotEvent(v), nil
case "account_update":
return parserAccountUpdateEvent(v), nil
case "authenticated":
return parserAuthEvent(v), nil
case "error":
logger.Errorf("error %s", message)
}
return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", message)
} }