add max-eqcatch

This commit is contained in:
c9s 2020-10-02 12:43:14 +08:00
parent 59f27cfe2e
commit 3ce49471d8
6 changed files with 71 additions and 24 deletions

View File

@ -0,0 +1,24 @@
package cmdutil
import (
"context"
"os"
"os/signal"
"github.com/sirupsen/logrus"
)
func WaitForSignal(ctx context.Context, signals ...os.Signal) {
var sigC = make(chan os.Signal, 1)
signal.Notify(sigC, signals...)
defer signal.Stop(sigC)
select {
case sig := <-sigC:
logrus.Warnf("%v", sig)
signal.Ignore()
case <-ctx.Done():
}
}

2
bbgo/cmd/signal.go Normal file
View File

@ -0,0 +1,2 @@
package cmd

View File

@ -13,11 +13,9 @@ var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry elemen
const Buy = 1
const Sell = -1
type PublicParser struct{}
// Parse accepts the raw messages from max public websocket channels and parses them into market data
// ParseMessage accepts the raw messages from max public websocket channels and parses them into market data
// Return types: *BookEvent, *TradeEvent, *SubscriptionEvent, *ErrorEvent
func (p *PublicParser) Parse(payload []byte) (interface{}, error) {
func ParseMessage(payload []byte) (interface{}, error) {
parser := fastjson.Parser{}
val, err := parser.ParseBytes(payload)
if err != nil {
@ -30,6 +28,8 @@ func (p *PublicParser) Parse(payload []byte) (interface{}, error) {
return parseBookEvent(val)
case "trade":
return parseTradeEvent(val)
case "user":
return ParseUserEvent(val)
}
}

View File

@ -202,7 +202,7 @@ func parserAccountSnapshotEvent(v *fastjson.Value) (e AccountSnapshotEvent) {
return e
}
func parserAuthEvent(v *fastjson.Value) AuthEvent {
func parseAuthEvent(v *fastjson.Value) AuthEvent {
return AuthEvent{
Event: string(v.GetStringBytes("e")),
ID: string(v.GetStringBytes("i")),
@ -211,13 +211,7 @@ func parserAuthEvent(v *fastjson.Value) AuthEvent {
}
func ParsePrivateEvent(message []byte) (interface{}, error) {
var fp fastjson.Parser
var v, err = fp.ParseBytes(message)
if err != nil {
return nil, errors.Wrap(err, "fail to parse account info raw message")
}
func ParseUserEvent(v *fastjson.Value) (interface{}, error) {
eventType := string(v.GetStringBytes("e"))
switch eventType {
case "order_snapshot":
@ -239,11 +233,11 @@ func ParsePrivateEvent(message []byte) (interface{}, error) {
return parserAccountUpdateEvent(v), nil
case "authenticated":
return parserAuthEvent(v), nil
return parseAuthEvent(v), nil
case "error":
logger.Errorf("error %s", message)
logger.Errorf("error %s", v.MarshalTo(nil))
}
return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", message)
return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", v.MarshalTo(nil))
}

View File

@ -11,6 +11,8 @@ import (
"github.com/pkg/errors"
)
var WebSocketURL = "wss://max-stream.maicoin.com/ws"
var ErrMessageTypeNotSupported = errors.New("message type currently not supported")
// Subscription is used for presenting the subscription metadata.
@ -31,7 +33,7 @@ type WebsocketCommand struct {
var SubscribeAction = "subscribe"
var UnsubscribeAction = "unsubscribe"
//go:generate callbackgen -type PublicWebSocketService
//go:generate callbackgen -type WebSocketService
type WebSocketService struct {
baseURL, key, secret string
@ -42,7 +44,8 @@ type WebSocketService struct {
// Subscriptions is the subscription request payloads that will be used for sending subscription request
Subscriptions []Subscription
parser PublicParser
connectCallbacks []func(conn *websocket.Conn)
disconnectCallbacks []func(conn *websocket.Conn)
errorCallbacks []func(err error)
messageCallbacks []func(message []byte)
@ -63,13 +66,15 @@ func NewWebSocketService(wsURL string, key, secret string) *WebSocketService {
func (s *WebSocketService) Connect(ctx context.Context) error {
// pre-allocate the websocket client, the websocket client can be used for reconnecting.
if err := s.connect(ctx) ; err != nil {
return err
}
go s.read(ctx)
return s.connect(ctx)
return nil
}
func (s *WebSocketService) sendAuthMessage() error {
func (s *WebSocketService) Auth() error {
nonce := time.Now().UnixNano() / int64(time.Millisecond)
auth := &AuthMessage{
Action: "auth",
APIKey: s.key,
@ -88,6 +93,7 @@ func (s *WebSocketService) connect(ctx context.Context) error {
}
s.conn = conn
s.EmitConnect(conn)
return nil
}
@ -124,7 +130,7 @@ func (s *WebSocketService) read(ctx context.Context) {
s.EmitMessage(msg)
m, err := s.parser.Parse(msg)
m, err := ParseMessage(msg)
if err != nil {
s.EmitError(errors.Wrapf(err, "failed to parse public message: %s", msg))
continue
@ -161,12 +167,11 @@ func (s *WebSocketService) Reconnect() {
// Subscribe is a helper method for building subscription request from the internal mapping types.
// (Internal public method)
func (s *WebSocketService) Subscribe(channel string, market string) error {
func (s *WebSocketService) Subscribe(channel string, market string) {
s.AddSubscription(Subscription{
Channel: channel,
Market: market,
})
return nil
}
// AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint.

View File

@ -2,7 +2,29 @@
package max
import ()
import (
"github.com/gorilla/websocket"
)
func (s *WebSocketService) OnConnect(cb func(conn *websocket.Conn)) {
s.connectCallbacks = append(s.connectCallbacks, cb)
}
func (s *WebSocketService) EmitConnect(conn *websocket.Conn) {
for _, cb := range s.connectCallbacks {
cb(conn)
}
}
func (s *WebSocketService) OnDisconnect(cb func(conn *websocket.Conn)) {
s.disconnectCallbacks = append(s.disconnectCallbacks, cb)
}
func (s *WebSocketService) EmitDisconnect(conn *websocket.Conn) {
for _, cb := range s.disconnectCallbacks {
cb(conn)
}
}
func (s *WebSocketService) OnError(cb func(err error)) {
s.errorCallbacks = append(s.errorCallbacks, cb)