mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 23:05:15 +00:00
first commit for the max websocket
This commit is contained in:
parent
db72b3b0f0
commit
94fb026149
215
bbgo/exchange/max/maxapi/public_parser.go
Normal file
215
bbgo/exchange/max/maxapi/public_parser.go
Normal file
|
@ -0,0 +1,215 @@
|
||||||
|
package max
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/valyala/fastjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length")
|
||||||
|
|
||||||
|
const Buy = 1
|
||||||
|
const Sell = -1
|
||||||
|
|
||||||
|
type PublicParser struct{}
|
||||||
|
|
||||||
|
// Parse accepts the raw messages from max public websocket channels and parses them into market data
|
||||||
|
// Return types: *BookEvent, *TradeEvent, *SubscriptionEvent, *ErrorEvent
|
||||||
|
func (p *PublicParser) Parse(payload []byte) (interface{}, error) {
|
||||||
|
parser := fastjson.Parser{}
|
||||||
|
val, err := parser.ParseBytes(payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to parse payload: "+string(payload))
|
||||||
|
}
|
||||||
|
|
||||||
|
if channel := string(val.GetStringBytes("c")); len(channel) > 0 {
|
||||||
|
switch channel {
|
||||||
|
case "book":
|
||||||
|
return parseBookEvent(val)
|
||||||
|
case "trade":
|
||||||
|
return parseTradeEvent(val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
eventType := string(val.GetStringBytes("e"))
|
||||||
|
switch eventType {
|
||||||
|
case "error":
|
||||||
|
return parseErrorEvent(val)
|
||||||
|
case "subscribed", "unsubscribed":
|
||||||
|
return parseSubscriptionEvent(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, errors.Wrapf(ErrMessageTypeNotSupported, "payload %s", payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
type TradeEntry struct {
|
||||||
|
Trend string `json:"tr"`
|
||||||
|
Price string `json:"p"`
|
||||||
|
Volume string `json:"v"`
|
||||||
|
Timestamp int64 `json:"T"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e TradeEntry) Time() time.Time {
|
||||||
|
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseTradeEntry parse the trade content payload
|
||||||
|
func parseTradeEntry(val *fastjson.Value) TradeEntry {
|
||||||
|
return TradeEntry{
|
||||||
|
Trend: strings.ToLower(string(val.GetStringBytes("tr"))),
|
||||||
|
Timestamp: val.GetInt64("T"),
|
||||||
|
Price: string(val.GetStringBytes("p")),
|
||||||
|
Volume: string(val.GetStringBytes("v")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type TradeEvent struct {
|
||||||
|
Event string `json:"e"`
|
||||||
|
Market string `json:"M"`
|
||||||
|
Channel string `json:"c"`
|
||||||
|
Trades []TradeEntry `json:"t"`
|
||||||
|
Timestamp int64 `json:"T"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *TradeEvent) Time() time.Time {
|
||||||
|
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTradeEvent(val *fastjson.Value) (*TradeEvent, error) {
|
||||||
|
event := TradeEvent{
|
||||||
|
Event: string(val.GetStringBytes("e")),
|
||||||
|
Market: string(val.GetStringBytes("M")),
|
||||||
|
Channel: string(val.GetStringBytes("c")),
|
||||||
|
Timestamp: val.GetInt64("T"),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tradeValue := range val.GetArray("t") {
|
||||||
|
event.Trades = append(event.Trades, parseTradeEntry(tradeValue))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type BookEvent struct {
|
||||||
|
Event string `json:"e"`
|
||||||
|
Market string `json:"M"`
|
||||||
|
Channel string `json:"c"`
|
||||||
|
Timestamp int64 `json:"t"` // Millisecond timestamp
|
||||||
|
Bids []BookEntry
|
||||||
|
Asks []BookEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *BookEvent) Time() time.Time {
|
||||||
|
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseBookEvent(val *fastjson.Value) (*BookEvent, error) {
|
||||||
|
event := BookEvent{
|
||||||
|
Event: string(val.GetStringBytes("e")),
|
||||||
|
Market: string(val.GetStringBytes("M")),
|
||||||
|
Channel: string(val.GetStringBytes("c")),
|
||||||
|
Timestamp: val.GetInt64("T"),
|
||||||
|
}
|
||||||
|
|
||||||
|
t := time.Unix(0, event.Timestamp*int64(time.Millisecond))
|
||||||
|
|
||||||
|
var err error
|
||||||
|
event.Asks, err = parseBookEntries(val.GetArray("a"), Sell, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event.Bids, err = parseBookEntries(val.GetArray("b"), Buy, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type BookEntry struct {
|
||||||
|
Side int
|
||||||
|
Time time.Time
|
||||||
|
Price string
|
||||||
|
Volume string
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseBookEntries parses JSON struct like `[["233330", "0.33"], ....]`
|
||||||
|
func parseBookEntries(vals []*fastjson.Value, side int, t time.Time) (entries []BookEntry, err error) {
|
||||||
|
for _, entry := range vals {
|
||||||
|
pv, err := entry.Array()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pv) < 2 {
|
||||||
|
return nil, ErrIncorrectBookEntryElementLength
|
||||||
|
}
|
||||||
|
|
||||||
|
entries = append(entries, BookEntry{
|
||||||
|
Side: side,
|
||||||
|
Time: t,
|
||||||
|
Price: pv[0].String(),
|
||||||
|
Volume: pv[1].String(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ErrorEvent struct {
|
||||||
|
Timestamp int64
|
||||||
|
Errors []string
|
||||||
|
CommandID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ErrorEvent) Time() time.Time {
|
||||||
|
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseErrorEvent(val *fastjson.Value) (*ErrorEvent, error) {
|
||||||
|
event := ErrorEvent{
|
||||||
|
Timestamp: val.GetInt64("T"),
|
||||||
|
CommandID: string(val.GetStringBytes("i")),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, entry := range val.GetArray("E") {
|
||||||
|
event.Errors = append(event.Errors, string(entry.GetStringBytes()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type SubscriptionEvent struct {
|
||||||
|
Event string `json:"e"`
|
||||||
|
Timestamp int64 `json:"T"`
|
||||||
|
CommandID string `json:"i"`
|
||||||
|
Subscriptions []Subscription `json:"s"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e SubscriptionEvent) Time() time.Time {
|
||||||
|
return time.Unix(0, e.Timestamp*int64(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseSubscriptionEvent(val *fastjson.Value) (*SubscriptionEvent, error) {
|
||||||
|
event := SubscriptionEvent{
|
||||||
|
Event: string(val.GetStringBytes("e")),
|
||||||
|
Timestamp: val.GetInt64("T"),
|
||||||
|
CommandID: string(val.GetStringBytes("i")),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, entry := range val.GetArray("s") {
|
||||||
|
market := string(entry.GetStringBytes("market"))
|
||||||
|
channel := string(entry.GetStringBytes("channel"))
|
||||||
|
event.Subscriptions = append(event.Subscriptions, Subscription{
|
||||||
|
Market: market,
|
||||||
|
Channel: channel,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return &event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
194
bbgo/exchange/max/maxapi/public_websocket.go
Normal file
194
bbgo/exchange/max/maxapi/public_websocket.go
Normal file
|
@ -0,0 +1,194 @@
|
||||||
|
package max
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrMessageTypeNotSupported = errors.New("message type currently not supported")
|
||||||
|
|
||||||
|
var logger = log.WithField("exchange", "max")
|
||||||
|
|
||||||
|
// Subscription is used for presenting the subscription metadata.
|
||||||
|
// This is used for sending subscribe and unsubscribe requests
|
||||||
|
type Subscription struct {
|
||||||
|
Channel string `json:"channel"`
|
||||||
|
Market string `json:"market"`
|
||||||
|
Depth int `json:"depth,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WebsocketCommand struct {
|
||||||
|
// Action is used for specify the action of the websocket session.
|
||||||
|
// Valid values are "subscribe", "unsubscribe" and "auth"
|
||||||
|
Action string `json:"action"`
|
||||||
|
Subscriptions []Subscription `json:"subscriptions,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var SubscribeAction = "subscribe"
|
||||||
|
var UnsubscribeAction = "unsubscribe"
|
||||||
|
|
||||||
|
//go:generate callbackgen -type PublicWebSocketService
|
||||||
|
type PublicWebSocketService struct {
|
||||||
|
BaseURL string
|
||||||
|
|
||||||
|
Conn *websocket.Conn
|
||||||
|
|
||||||
|
reconnectC chan struct{}
|
||||||
|
|
||||||
|
// Subscriptions is the subscription request payloads that will be used for sending subscription request
|
||||||
|
Subscriptions []Subscription
|
||||||
|
|
||||||
|
parser PublicParser
|
||||||
|
|
||||||
|
errorCallbacks []func(err error)
|
||||||
|
messageCallbacks []func(message []byte)
|
||||||
|
bookEventCallbacks []func(e BookEvent)
|
||||||
|
tradeEventCallbacks []func(e TradeEvent)
|
||||||
|
errorEventCallbacks []func(e ErrorEvent)
|
||||||
|
subscriptionEventCallbacks []func(e SubscriptionEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPublicWebSocketService(wsURL string) *PublicWebSocketService {
|
||||||
|
return &PublicWebSocketService{
|
||||||
|
reconnectC: make(chan struct{}, 1),
|
||||||
|
BaseURL: wsURL,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) Connect(ctx context.Context) error {
|
||||||
|
// pre-allocate the websocket client, the websocket client can be used for reconnecting.
|
||||||
|
go s.read(ctx)
|
||||||
|
return s.connect(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) connect(ctx context.Context) error {
|
||||||
|
dialer := websocket.DefaultDialer
|
||||||
|
conn, _, err := dialer.DialContext(ctx, s.BaseURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Conn = conn
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) emitReconnect() {
|
||||||
|
select {
|
||||||
|
case s.reconnectC <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) read(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-s.reconnectC:
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
if err := s.connect(ctx); err != nil {
|
||||||
|
s.emitReconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
mt, msg, err := s.Conn.ReadMessage()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
s.emitReconnect()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if mt != websocket.TextMessage {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.EmitMessage(msg)
|
||||||
|
|
||||||
|
m, err := s.parser.Parse(msg)
|
||||||
|
if err != nil {
|
||||||
|
s.EmitError(errors.Wrapf(err, "failed to parse public message: %s", msg))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.dispatch(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) dispatch(msg interface{}) {
|
||||||
|
switch e := msg.(type) {
|
||||||
|
case *BookEvent:
|
||||||
|
s.EmitBookEvent(*e)
|
||||||
|
case *TradeEvent:
|
||||||
|
s.EmitTradeEvent(*e)
|
||||||
|
case *ErrorEvent:
|
||||||
|
s.EmitErrorEvent(*e)
|
||||||
|
case *SubscriptionEvent:
|
||||||
|
s.EmitSubscriptionEvent(*e)
|
||||||
|
default:
|
||||||
|
s.EmitError(errors.Errorf("unsupported %T event: %+v", e, e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) ClearSubscriptions() {
|
||||||
|
s.Subscriptions = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) Reconnect() {
|
||||||
|
logger.Info("reconnecting...")
|
||||||
|
s.emitReconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe is a helper method for building subscription request from the internal mapping types.
|
||||||
|
// (Internal public method)
|
||||||
|
func (s *PublicWebSocketService) Subscribe(channel string, market string) error {
|
||||||
|
s.AddSubscription(Subscription{
|
||||||
|
Channel: channel,
|
||||||
|
Market: market,
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint.
|
||||||
|
func (s *PublicWebSocketService) AddSubscription(subscription Subscription) {
|
||||||
|
s.Subscriptions = append(s.Subscriptions, subscription)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) Resubscribe() {
|
||||||
|
// Calling Resubscribe() by websocket is not enough to refresh orderbook.
|
||||||
|
// We still need to get orderbook snapshot by rest client.
|
||||||
|
// Therefore Reconnect() is used to simplify implementation.
|
||||||
|
logger.Info("resubscribing all subscription...")
|
||||||
|
if err := s.SendSubscriptionRequest(UnsubscribeAction); err != nil {
|
||||||
|
logger.WithError(err).Error("failed to unsubscribe")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.SendSubscriptionRequest(SubscribeAction); err != nil {
|
||||||
|
logger.WithError(err).Error("failed to unsubscribe")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) SendSubscriptionRequest(action string) error {
|
||||||
|
request := WebsocketCommand{
|
||||||
|
Action: action,
|
||||||
|
Subscriptions: s.Subscriptions,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Debugf("sending websocket subscription: %+v", request)
|
||||||
|
|
||||||
|
if err := s.Conn.WriteJSON(request); err != nil {
|
||||||
|
return errors.Wrap(err, "Failed to send subscribe event")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close web socket connection
|
||||||
|
func (s *PublicWebSocketService) Close() error {
|
||||||
|
return s.Conn.Close()
|
||||||
|
}
|
65
bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go
Normal file
65
bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
// Code generated by "callbackgen -type PublicWebSocketService"; DO NOT EDIT.
|
||||||
|
|
||||||
|
package max
|
||||||
|
|
||||||
|
import ()
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) OnError(cb func(err error)) {
|
||||||
|
s.errorCallbacks = append(s.errorCallbacks, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) EmitError(err error) {
|
||||||
|
for _, cb := range s.errorCallbacks {
|
||||||
|
cb(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) OnMessage(cb func(message []byte)) {
|
||||||
|
s.messageCallbacks = append(s.messageCallbacks, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) EmitMessage(message []byte) {
|
||||||
|
for _, cb := range s.messageCallbacks {
|
||||||
|
cb(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) OnBookEvent(cb func(e BookEvent)) {
|
||||||
|
s.bookEventCallbacks = append(s.bookEventCallbacks, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) EmitBookEvent(e BookEvent) {
|
||||||
|
for _, cb := range s.bookEventCallbacks {
|
||||||
|
cb(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) OnTradeEvent(cb func(e TradeEvent)) {
|
||||||
|
s.tradeEventCallbacks = append(s.tradeEventCallbacks, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) EmitTradeEvent(e TradeEvent) {
|
||||||
|
for _, cb := range s.tradeEventCallbacks {
|
||||||
|
cb(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) OnErrorEvent(cb func(e ErrorEvent)) {
|
||||||
|
s.errorEventCallbacks = append(s.errorEventCallbacks, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) EmitErrorEvent(e ErrorEvent) {
|
||||||
|
for _, cb := range s.errorEventCallbacks {
|
||||||
|
cb(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) OnSubscriptionEvent(cb func(e SubscriptionEvent)) {
|
||||||
|
s.subscriptionEventCallbacks = append(s.subscriptionEventCallbacks, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublicWebSocketService) EmitSubscriptionEvent(e SubscriptionEvent) {
|
||||||
|
for _, cb := range s.subscriptionEventCallbacks {
|
||||||
|
cb(e)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user