diff --git a/bbgo/cmd/cmdutil/signal.go b/bbgo/cmd/cmdutil/signal.go new file mode 100644 index 000000000..90a94d061 --- /dev/null +++ b/bbgo/cmd/cmdutil/signal.go @@ -0,0 +1,24 @@ +package cmdutil + +import ( + "context" + "os" + "os/signal" + + "github.com/sirupsen/logrus" +) + +func WaitForSignal(ctx context.Context, signals ...os.Signal) { + var sigC = make(chan os.Signal, 1) + signal.Notify(sigC, signals...) + defer signal.Stop(sigC) + + select { + case sig := <-sigC: + logrus.Warnf("%v", sig) + signal.Ignore() + + case <-ctx.Done(): + + } +} diff --git a/bbgo/cmd/signal.go b/bbgo/cmd/signal.go new file mode 100644 index 000000000..89225e6ee --- /dev/null +++ b/bbgo/cmd/signal.go @@ -0,0 +1,2 @@ +package cmd + diff --git a/bbgo/exchange/max/maxapi/public_parser.go b/bbgo/exchange/max/maxapi/public_parser.go index 117ff4454..a861f99e9 100644 --- a/bbgo/exchange/max/maxapi/public_parser.go +++ b/bbgo/exchange/max/maxapi/public_parser.go @@ -13,11 +13,9 @@ var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry elemen const Buy = 1 const Sell = -1 -type PublicParser struct{} - -// Parse accepts the raw messages from max public websocket channels and parses them into market data +// ParseMessage accepts the raw messages from max public websocket channels and parses them into market data // Return types: *BookEvent, *TradeEvent, *SubscriptionEvent, *ErrorEvent -func (p *PublicParser) Parse(payload []byte) (interface{}, error) { +func ParseMessage(payload []byte) (interface{}, error) { parser := fastjson.Parser{} val, err := parser.ParseBytes(payload) if err != nil { @@ -30,6 +28,8 @@ func (p *PublicParser) Parse(payload []byte) (interface{}, error) { return parseBookEvent(val) case "trade": return parseTradeEvent(val) + case "user": + return ParseUserEvent(val) } } diff --git a/bbgo/exchange/max/maxapi/userdata.go b/bbgo/exchange/max/maxapi/userdata.go index d4fb623d4..e5ca14d1a 100644 --- a/bbgo/exchange/max/maxapi/userdata.go +++ b/bbgo/exchange/max/maxapi/userdata.go @@ -202,7 +202,7 @@ func parserAccountSnapshotEvent(v *fastjson.Value) (e AccountSnapshotEvent) { return e } -func parserAuthEvent(v *fastjson.Value) AuthEvent { +func parseAuthEvent(v *fastjson.Value) AuthEvent { return AuthEvent{ Event: string(v.GetStringBytes("e")), ID: string(v.GetStringBytes("i")), @@ -211,13 +211,7 @@ func parserAuthEvent(v *fastjson.Value) AuthEvent { } -func ParsePrivateEvent(message []byte) (interface{}, error) { - var fp fastjson.Parser - var v, err = fp.ParseBytes(message) - if err != nil { - return nil, errors.Wrap(err, "fail to parse account info raw message") - } - +func ParseUserEvent(v *fastjson.Value) (interface{}, error) { eventType := string(v.GetStringBytes("e")) switch eventType { case "order_snapshot": @@ -239,11 +233,11 @@ func ParsePrivateEvent(message []byte) (interface{}, error) { return parserAccountUpdateEvent(v), nil case "authenticated": - return parserAuthEvent(v), nil + return parseAuthEvent(v), nil case "error": - logger.Errorf("error %s", message) + logger.Errorf("error %s", v.MarshalTo(nil)) } - return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", message) + return nil, errors.Wrapf(ErrMessageTypeNotSupported, "private message %s", v.MarshalTo(nil)) } diff --git a/bbgo/exchange/max/maxapi/public_websocket.go b/bbgo/exchange/max/maxapi/websocket.go similarity index 93% rename from bbgo/exchange/max/maxapi/public_websocket.go rename to bbgo/exchange/max/maxapi/websocket.go index 3c9f98fea..50b570b9a 100644 --- a/bbgo/exchange/max/maxapi/public_websocket.go +++ b/bbgo/exchange/max/maxapi/websocket.go @@ -11,6 +11,8 @@ import ( "github.com/pkg/errors" ) +var WebSocketURL = "wss://max-stream.maicoin.com/ws" + var ErrMessageTypeNotSupported = errors.New("message type currently not supported") // Subscription is used for presenting the subscription metadata. @@ -31,7 +33,7 @@ type WebsocketCommand struct { var SubscribeAction = "subscribe" var UnsubscribeAction = "unsubscribe" -//go:generate callbackgen -type PublicWebSocketService +//go:generate callbackgen -type WebSocketService type WebSocketService struct { baseURL, key, secret string @@ -42,7 +44,8 @@ type WebSocketService struct { // Subscriptions is the subscription request payloads that will be used for sending subscription request Subscriptions []Subscription - parser PublicParser + connectCallbacks []func(conn *websocket.Conn) + disconnectCallbacks []func(conn *websocket.Conn) errorCallbacks []func(err error) messageCallbacks []func(message []byte) @@ -63,13 +66,15 @@ func NewWebSocketService(wsURL string, key, secret string) *WebSocketService { func (s *WebSocketService) Connect(ctx context.Context) error { // pre-allocate the websocket client, the websocket client can be used for reconnecting. + if err := s.connect(ctx) ; err != nil { + return err + } go s.read(ctx) - return s.connect(ctx) + return nil } -func (s *WebSocketService) sendAuthMessage() error { +func (s *WebSocketService) Auth() error { nonce := time.Now().UnixNano() / int64(time.Millisecond) - auth := &AuthMessage{ Action: "auth", APIKey: s.key, @@ -88,6 +93,7 @@ func (s *WebSocketService) connect(ctx context.Context) error { } s.conn = conn + s.EmitConnect(conn) return nil } @@ -124,7 +130,7 @@ func (s *WebSocketService) read(ctx context.Context) { s.EmitMessage(msg) - m, err := s.parser.Parse(msg) + m, err := ParseMessage(msg) if err != nil { s.EmitError(errors.Wrapf(err, "failed to parse public message: %s", msg)) continue @@ -161,12 +167,11 @@ func (s *WebSocketService) Reconnect() { // Subscribe is a helper method for building subscription request from the internal mapping types. // (Internal public method) -func (s *WebSocketService) Subscribe(channel string, market string) error { +func (s *WebSocketService) Subscribe(channel string, market string) { s.AddSubscription(Subscription{ Channel: channel, Market: market, }) - return nil } // AddSubscription adds the subscription request to the buffer, these requests will be sent to the server right after connecting to the endpoint. diff --git a/bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go b/bbgo/exchange/max/maxapi/websocketservice_callbacks.go similarity index 73% rename from bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go rename to bbgo/exchange/max/maxapi/websocketservice_callbacks.go index f21792091..0644dd761 100644 --- a/bbgo/exchange/max/maxapi/publicwebsocketservice_callbacks.go +++ b/bbgo/exchange/max/maxapi/websocketservice_callbacks.go @@ -2,7 +2,29 @@ package max -import () +import ( + "github.com/gorilla/websocket" +) + +func (s *WebSocketService) OnConnect(cb func(conn *websocket.Conn)) { + s.connectCallbacks = append(s.connectCallbacks, cb) +} + +func (s *WebSocketService) EmitConnect(conn *websocket.Conn) { + for _, cb := range s.connectCallbacks { + cb(conn) + } +} + +func (s *WebSocketService) OnDisconnect(cb func(conn *websocket.Conn)) { + s.disconnectCallbacks = append(s.disconnectCallbacks, cb) +} + +func (s *WebSocketService) EmitDisconnect(conn *websocket.Conn) { + for _, cb := range s.disconnectCallbacks { + cb(conn) + } +} func (s *WebSocketService) OnError(cb func(err error)) { s.errorCallbacks = append(s.errorCallbacks, cb)