max: implement kline event parser for websocket

This commit is contained in:
c9s 2020-10-19 22:46:34 +08:00
parent d68564de28
commit fc687f3174
5 changed files with 101 additions and 3 deletions

View File

@ -63,9 +63,9 @@ func NewTrader(environ *Environment) *Trader {
} }
} }
// AttachStrategy attaches the single exchange strategy on an exchange session. // AttachStrategyOn attaches the single exchange strategy on an exchange session.
// Single exchange strategy is the default behavior. // Single exchange strategy is the default behavior.
func (trader *Trader) AttachStrategy(session string, strategies ...SingleExchangeStrategy) *Trader { func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader {
if _, ok := trader.environment.sessions[session]; !ok { if _, ok := trader.environment.sessions[session]; !ok {
log.Panicf("session %s is not defined", session) log.Panicf("session %s is not defined", session)
} }

View File

@ -1,6 +1,7 @@
package max package max
import ( import (
"encoding/json"
"strings" "strings"
"time" "time"
@ -9,6 +10,7 @@ import (
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
) )
var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length") var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length")
@ -27,6 +29,8 @@ func ParseMessage(payload []byte) (interface{}, error) {
if channel := string(val.GetStringBytes("c")); len(channel) > 0 { if channel := string(val.GetStringBytes("c")); len(channel) > 0 {
switch channel { switch channel {
case "kline":
return parseKLineEvent(val)
case "book": case "book":
return parseBookEvent(val) return parseBookEvent(val)
case "trade": case "trade":
@ -68,6 +72,67 @@ func parseTradeEntry(val *fastjson.Value) TradeEntry {
} }
} }
type KLineEvent struct {
Event string `json:"e"`
Market string `json:"M"`
Channel string `json:"c"`
KLine KLine `json:"k"`
Timestamp int64 `json:"T"`
}
/*
{
"c": "kline",
"M": "btcusdt",
"e": "update",
"T": 1602999650179,
"k": {
"ST": 1602999900000,
"ET": 1602999900000,
"M": "btcusdt",
"R": "5m",
"O": "11417.21",
"H": "11417.21",
"L": "11417.21",
"C": "11417.21",
"v": "0",
"ti": 0,
"x": false
}
}
*/
type KLinePayload struct {
StartTime int64 `json:"ST"`
EndTime int64 `json:"ET"`
Market string `json:"M"`
Resolution string `json:"R"`
Open string `json:"O"`
High string `json:"H"`
Low string `json:"L"`
Close string `json:"C"`
Volume string `json:"v"`
LastTradeID int `json:"ti"`
Closed bool `json:"x"`
}
func (k KLinePayload) KLine() types.KLine {
return types.KLine{
StartTime: time.Unix(0, k.StartTime*int64(time.Millisecond)),
EndTime: time.Unix(0, k.EndTime*int64(time.Millisecond)),
Symbol: k.Market,
Interval: k.Resolution,
Open: util.MustParseFloat(k.Open),
Close: util.MustParseFloat(k.Close),
High: util.MustParseFloat(k.High),
Low: util.MustParseFloat(k.Low),
Volume: util.MustParseFloat(k.Volume),
QuoteVolume: 0,
LastTradeID: k.LastTradeID,
// NumberOfTrades: 0,
Closed: k.Closed,
}
}
type PublicTradeEvent struct { type PublicTradeEvent struct {
Event string `json:"e"` Event string `json:"e"`
Market string `json:"M"` Market string `json:"M"`
@ -130,6 +195,24 @@ func (e *BookEvent) OrderBook() (snapshot types.OrderBook, err error) {
return snapshot, nil return snapshot, nil
} }
func parseKLineEvent(val *fastjson.Value) (*KLineEvent, error) {
event := KLineEvent{
Event: string(val.GetStringBytes("e")),
Market: string(val.GetStringBytes("M")),
Channel: string(val.GetStringBytes("c")),
Timestamp: val.GetInt64("T"),
}
out := val.MarshalTo(nil)
err := json.Unmarshal(out, &event.KLine)
if err != nil {
return nil, err
}
return &event, nil
}
func parseBookEvent(val *fastjson.Value) (*BookEvent, error) { func parseBookEvent(val *fastjson.Value) (*BookEvent, error) {
event := BookEvent{ event := BookEvent{
Event: string(val.GetStringBytes("e")), Event: string(val.GetStringBytes("e")),

View File

@ -52,6 +52,7 @@ type WebSocketService struct {
messageCallbacks []func(message []byte) messageCallbacks []func(message []byte)
bookEventCallbacks []func(e BookEvent) bookEventCallbacks []func(e BookEvent)
tradeEventCallbacks []func(e PublicTradeEvent) tradeEventCallbacks []func(e PublicTradeEvent)
kLineEventCallbacks []func(e KLineEvent)
errorEventCallbacks []func(e ErrorEvent) errorEventCallbacks []func(e ErrorEvent)
subscriptionEventCallbacks []func(e SubscriptionEvent) subscriptionEventCallbacks []func(e SubscriptionEvent)
@ -174,6 +175,9 @@ func (s *WebSocketService) dispatch(msg interface{}) {
case *PublicTradeEvent: case *PublicTradeEvent:
s.EmitTradeEvent(*e) s.EmitTradeEvent(*e)
case *KLineEvent:
s.EmitKLineEvent(*e)
case *ErrorEvent: case *ErrorEvent:
s.EmitErrorEvent(*e) s.EmitErrorEvent(*e)

View File

@ -66,6 +66,16 @@ func (s *WebSocketService) EmitTradeEvent(e PublicTradeEvent) {
} }
} }
func (s *WebSocketService) OnKLineEvent(cb func(e KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
func (s *WebSocketService) EmitKLineEvent(e KLineEvent) {
for _, cb := range s.kLineEventCallbacks {
cb(e)
}
}
func (s *WebSocketService) OnErrorEvent(cb func(e ErrorEvent)) { func (s *WebSocketService) OnErrorEvent(cb func(e ErrorEvent)) {
s.errorEventCallbacks = append(s.errorEventCallbacks, cb) s.errorEventCallbacks = append(s.errorEventCallbacks, cb)
} }

View File

@ -63,7 +63,8 @@ var Cmd = &cobra.Command{
environ := bbgo.NewDefaultEnvironment(db) environ := bbgo.NewDefaultEnvironment(db)
trader := bbgo.NewTrader(environ) trader := bbgo.NewTrader(environ)
trader.AttachStrategy(string(exchangeName), New(symbol, interval, baseQuantity)) trader.AttachStrategyOn(string(exchangeName), New(symbol, interval, baseQuantity))
err = trader.Run(ctx) err = trader.Run(ctx)
if err != nil { if err != nil {
return err return err