From fc687f317437e9df2d01db8b35056bfa4bda60bb Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 19 Oct 2020 22:46:34 +0800 Subject: [PATCH] max: implement kline event parser for websocket --- pkg/bbgo/trader.go | 4 +- pkg/exchange/max/maxapi/public_parser.go | 83 +++++++++++++++++++ pkg/exchange/max/maxapi/websocket.go | 4 + .../max/maxapi/websocketservice_callbacks.go | 10 +++ pkg/strategy/buyandhold/cmd.go | 3 +- 5 files changed, 101 insertions(+), 3 deletions(-) diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index c1d3c95cf..c1390c905 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -63,9 +63,9 @@ func NewTrader(environ *Environment) *Trader { } } -// AttachStrategy attaches the single exchange strategy on an exchange session. +// AttachStrategyOn attaches the single exchange strategy on an exchange session. // Single exchange strategy is the default behavior. -func (trader *Trader) AttachStrategy(session string, strategies ...SingleExchangeStrategy) *Trader { +func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader { if _, ok := trader.environment.sessions[session]; !ok { log.Panicf("session %s is not defined", session) } diff --git a/pkg/exchange/max/maxapi/public_parser.go b/pkg/exchange/max/maxapi/public_parser.go index 0eaab22d2..1a487654a 100644 --- a/pkg/exchange/max/maxapi/public_parser.go +++ b/pkg/exchange/max/maxapi/public_parser.go @@ -1,6 +1,7 @@ package max import ( + "encoding/json" "strings" "time" @@ -9,6 +10,7 @@ import ( "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" ) var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length") @@ -27,6 +29,8 @@ func ParseMessage(payload []byte) (interface{}, error) { if channel := string(val.GetStringBytes("c")); len(channel) > 0 { switch channel { + case "kline": + return parseKLineEvent(val) case "book": return parseBookEvent(val) case "trade": @@ -68,6 +72,67 @@ func parseTradeEntry(val *fastjson.Value) TradeEntry { } } +type KLineEvent struct { + Event string `json:"e"` + Market string `json:"M"` + Channel string `json:"c"` + KLine KLine `json:"k"` + Timestamp int64 `json:"T"` +} + +/* +{ + "c": "kline", + "M": "btcusdt", + "e": "update", + "T": 1602999650179, + "k": { + "ST": 1602999900000, + "ET": 1602999900000, + "M": "btcusdt", + "R": "5m", + "O": "11417.21", + "H": "11417.21", + "L": "11417.21", + "C": "11417.21", + "v": "0", + "ti": 0, + "x": false + } +} +*/ +type KLinePayload struct { + StartTime int64 `json:"ST"` + EndTime int64 `json:"ET"` + Market string `json:"M"` + Resolution string `json:"R"` + Open string `json:"O"` + High string `json:"H"` + Low string `json:"L"` + Close string `json:"C"` + Volume string `json:"v"` + LastTradeID int `json:"ti"` + Closed bool `json:"x"` +} + +func (k KLinePayload) KLine() types.KLine { + return types.KLine{ + StartTime: time.Unix(0, k.StartTime*int64(time.Millisecond)), + EndTime: time.Unix(0, k.EndTime*int64(time.Millisecond)), + Symbol: k.Market, + Interval: k.Resolution, + Open: util.MustParseFloat(k.Open), + Close: util.MustParseFloat(k.Close), + High: util.MustParseFloat(k.High), + Low: util.MustParseFloat(k.Low), + Volume: util.MustParseFloat(k.Volume), + QuoteVolume: 0, + LastTradeID: k.LastTradeID, + // NumberOfTrades: 0, + Closed: k.Closed, + } +} + type PublicTradeEvent struct { Event string `json:"e"` Market string `json:"M"` @@ -130,6 +195,24 @@ func (e *BookEvent) OrderBook() (snapshot types.OrderBook, err error) { return snapshot, nil } +func parseKLineEvent(val *fastjson.Value) (*KLineEvent, error) { + event := KLineEvent{ + Event: string(val.GetStringBytes("e")), + Market: string(val.GetStringBytes("M")), + Channel: string(val.GetStringBytes("c")), + Timestamp: val.GetInt64("T"), + } + + out := val.MarshalTo(nil) + + err := json.Unmarshal(out, &event.KLine) + if err != nil { + return nil, err + } + + return &event, nil +} + func parseBookEvent(val *fastjson.Value) (*BookEvent, error) { event := BookEvent{ Event: string(val.GetStringBytes("e")), diff --git a/pkg/exchange/max/maxapi/websocket.go b/pkg/exchange/max/maxapi/websocket.go index c8b4970e2..f16b82805 100644 --- a/pkg/exchange/max/maxapi/websocket.go +++ b/pkg/exchange/max/maxapi/websocket.go @@ -52,6 +52,7 @@ type WebSocketService struct { messageCallbacks []func(message []byte) bookEventCallbacks []func(e BookEvent) tradeEventCallbacks []func(e PublicTradeEvent) + kLineEventCallbacks []func(e KLineEvent) errorEventCallbacks []func(e ErrorEvent) subscriptionEventCallbacks []func(e SubscriptionEvent) @@ -174,6 +175,9 @@ func (s *WebSocketService) dispatch(msg interface{}) { case *PublicTradeEvent: s.EmitTradeEvent(*e) + case *KLineEvent: + s.EmitKLineEvent(*e) + case *ErrorEvent: s.EmitErrorEvent(*e) diff --git a/pkg/exchange/max/maxapi/websocketservice_callbacks.go b/pkg/exchange/max/maxapi/websocketservice_callbacks.go index 7b60604dc..e32d2b5db 100644 --- a/pkg/exchange/max/maxapi/websocketservice_callbacks.go +++ b/pkg/exchange/max/maxapi/websocketservice_callbacks.go @@ -66,6 +66,16 @@ func (s *WebSocketService) EmitTradeEvent(e PublicTradeEvent) { } } +func (s *WebSocketService) OnKLineEvent(cb func(e KLineEvent)) { + s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb) +} + +func (s *WebSocketService) EmitKLineEvent(e KLineEvent) { + for _, cb := range s.kLineEventCallbacks { + cb(e) + } +} + func (s *WebSocketService) OnErrorEvent(cb func(e ErrorEvent)) { s.errorEventCallbacks = append(s.errorEventCallbacks, cb) } diff --git a/pkg/strategy/buyandhold/cmd.go b/pkg/strategy/buyandhold/cmd.go index 286669a94..068d47708 100644 --- a/pkg/strategy/buyandhold/cmd.go +++ b/pkg/strategy/buyandhold/cmd.go @@ -63,7 +63,8 @@ var Cmd = &cobra.Command{ environ := bbgo.NewDefaultEnvironment(db) trader := bbgo.NewTrader(environ) - trader.AttachStrategy(string(exchangeName), New(symbol, interval, baseQuantity)) + trader.AttachStrategyOn(string(exchangeName), New(symbol, interval, baseQuantity)) + err = trader.Run(ctx) if err != nil { return err