diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 98a07e1e6..ba955bce3 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -8,14 +8,15 @@ import ( "strings" "time" + "github.com/valyala/fastjson" + "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" - "github.com/valyala/fastjson" ) -func Parse(str string) (interface{}, error) { - v, err := fastjson.Parse(str) +func parseWebSocketEvent(str []byte) (interface{}, error) { + v, err := fastjson.ParseBytes(str) if err != nil { return nil, err } @@ -52,7 +53,7 @@ func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) { }, nil } -type BookData struct { +type BookEvent struct { InstrumentID string Symbol string Action string @@ -63,7 +64,7 @@ type BookData struct { channel string } -func (data *BookData) BookTicker() types.BookTicker { +func (data *BookEvent) BookTicker() types.BookTicker { var askBookData BookEntry = data.Asks[0] var bidBookData BookEntry = data.Bids[0] @@ -77,7 +78,7 @@ func (data *BookData) BookTicker() types.BookTicker { } } -func (data *BookData) Book() types.SliceOrderBook { +func (data *BookEvent) Book() types.SliceOrderBook { book := types.SliceOrderBook{ Symbol: data.Symbol, } @@ -130,7 +131,7 @@ func parseBookEntry(v *fastjson.Value) (*BookEntry, error) { }, nil } -func parseBookData(v *fastjson.Value) (*BookData, error) { +func parseBookData(v *fastjson.Value) (*BookEvent, error) { instrumentId := string(v.GetStringBytes("arg", "instId")) data := v.GetArray("data") if len(data) == 0 { @@ -166,7 +167,7 @@ func parseBookData(v *fastjson.Value) (*BookData, error) { bids = append(bids, *entry) } - return &BookData{ + return &BookEvent{ InstrumentID: instrumentId, Symbol: toGlobalSymbol(instrumentId), Action: action, diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 7cc83716d..d803a6d30 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -2,18 +2,13 @@ package okex import ( "context" - "net" "strconv" - "sync" "time" "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" "github.com/c9s/bbgo/pkg/types" - "github.com/gorilla/websocket" ) -const readTimeout = 15 * time.Second - type WebsocketOp struct { Op string `json:"op"` Args interface{} `json:"args"` @@ -30,18 +25,14 @@ type WebsocketLogin struct { type Stream struct { types.StandardStream - Client *okexapi.RestClient - Conn *websocket.Conn - connLock sync.Mutex - connCtx context.Context - connCancel context.CancelFunc + client *okexapi.RestClient // public callbacks - candleDataCallbacks []func(candle Candle) - bookDataCallbacks []func(book BookData) + candleEventCallbacks []func(candle Candle) + bookEventCallbacks []func(book BookEvent) eventCallbacks []func(event WebSocketEvent) - accountCallbacks []func(account okexapi.Account) - orderDetailsCallbacks []func(orderDetails []okexapi.OrderDetails) + accountEventCallbacks []func(account okexapi.Account) + orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails) lastCandle map[CandleKey]Candle } @@ -53,328 +44,180 @@ type CandleKey struct { func NewStream(client *okexapi.RestClient) *Stream { stream := &Stream{ - Client: client, - StandardStream: types.StandardStream{ - ReconnectC: make(chan struct{}, 1), - }, - lastCandle: make(map[CandleKey]Candle), + client: client, + StandardStream: types.NewStandardStream(), + lastCandle: make(map[CandleKey]Candle), } - stream.OnCandleData(func(candle Candle) { - key := CandleKey{Channel: candle.Channel, InstrumentID: candle.InstrumentID} - kline := candle.KLine() + stream.SetParser(parseWebSocketEvent) + stream.SetDispatcher(stream.dispatchEvent) + stream.SetEndpointCreator(stream.createEndpoint) - // check if we need to close previous kline - lastCandle, ok := stream.lastCandle[key] - if ok && candle.StartTime.After(lastCandle.StartTime) { - lastKline := lastCandle.KLine() - lastKline.Closed = true - stream.EmitKLineClosed(lastKline) + stream.OnCandleEvent(stream.handleCandleEvent) + stream.OnBookEvent(stream.handleBookEvent) + stream.OnAccountEvent(stream.handleAccountEvent) + stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent) + stream.OnEvent(stream.handleEvent) + stream.OnConnect(stream.handleConnect) + return stream +} + +func (s *Stream) handleConnect() { + if s.PublicOnly { + var subs []WebsocketSubscription + for _, subscription := range s.Subscriptions { + sub, err := convertSubscription(subscription) + if err != nil { + log.WithError(err).Errorf("subscription convert error") + continue + } + + subs = append(subs, sub) + } + if len(subs) == 0 { + return } - stream.EmitKLine(kline) - stream.lastCandle[key] = candle - }) + log.Infof("subscribing channels: %+v", subs) + err := s.Conn.WriteJSON(WebsocketOp{ + Op: "subscribe", + Args: subs, + }) - stream.OnBookData(func(data BookData) { - book := data.Book() - switch data.Action { - case "snapshot": - stream.EmitBookSnapshot(book) - case "update": - stream.EmitBookUpdate(book) - } - }) - - stream.OnAccount(func(account okexapi.Account) { - balances := toGlobalBalance(&account) - stream.EmitBalanceSnapshot(balances) - }) - - stream.OnOrderDetails(func(orderDetails []okexapi.OrderDetails) { - detailTrades, detailOrders := segmentOrderDetails(orderDetails) - - trades, err := toGlobalTrades(detailTrades) if err != nil { - log.WithError(err).Errorf("error converting order details into trades") - } else { - for _, trade := range trades { - stream.EmitTradeUpdate(trade) - } + log.WithError(err).Error("subscribe error") + } + } else { + // login as private channel + // sign example: + // sign=CryptoJS.enc.Base64.Stringify(CryptoJS.HmacSHA256(timestamp +'GET'+'/users/self/verify', secretKey)) + msTimestamp := strconv.FormatFloat(float64(time.Now().UnixNano())/float64(time.Second), 'f', -1, 64) + payload := msTimestamp + "GET" + "/users/self/verify" + sign := okexapi.Sign(payload, s.client.Secret) + op := WebsocketOp{ + Op: "login", + Args: []WebsocketLogin{ + { + Key: s.client.Key, + Passphrase: s.client.Passphrase, + Timestamp: msTimestamp, + Sign: sign, + }, + }, } - orders, err := toGlobalOrders(detailOrders) + log.Infof("sending okex login request") + err := s.Conn.WriteJSON(op) if err != nil { - log.WithError(err).Errorf("error converting order details into orders") - } else { - for _, order := range orders { - stream.EmitOrderUpdate(order) - } + log.WithError(err).Errorf("can not send login message") } - }) + } +} - stream.OnEvent(func(event WebSocketEvent) { - switch event.Event { - case "login": - if event.Code == "0" { - var subs = []WebsocketSubscription{ - {Channel: "account"}, - {Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)}, - } - - log.Infof("subscribing private channels: %+v", subs) - err := stream.Conn.WriteJSON(WebsocketOp{ - Op: "subscribe", - Args: subs, - }) - - if err != nil { - log.WithError(err).Error("private channel subscribe error") - } - } - } - }) - - stream.OnConnect(func() { - if stream.PublicOnly { - var subs []WebsocketSubscription - for _, subscription := range stream.Subscriptions { - sub, err := convertSubscription(subscription) - if err != nil { - log.WithError(err).Errorf("subscription convert error") - continue - } - - subs = append(subs, sub) - } - if len(subs) == 0 { - return +func (s *Stream) handleEvent(event WebSocketEvent) { + switch event.Event { + case "login": + if event.Code == "0" { + var subs = []WebsocketSubscription{ + {Channel: "account"}, + {Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)}, } - log.Infof("subscribing channels: %+v", subs) - err := stream.Conn.WriteJSON(WebsocketOp{ + log.Infof("subscribing private channels: %+v", subs) + err := s.Conn.WriteJSON(WebsocketOp{ Op: "subscribe", Args: subs, }) if err != nil { - log.WithError(err).Error("subscribe error") - } - } else { - // login as private channel - // sign example: - // sign=CryptoJS.enc.Base64.Stringify(CryptoJS.HmacSHA256(timestamp +'GET'+'/users/self/verify', secretKey)) - msTimestamp := strconv.FormatFloat(float64(time.Now().UnixNano())/float64(time.Second), 'f', -1, 64) - payload := msTimestamp + "GET" + "/users/self/verify" - sign := okexapi.Sign(payload, stream.Client.Secret) - op := WebsocketOp{ - Op: "login", - Args: []WebsocketLogin{ - { - Key: stream.Client.Key, - Passphrase: stream.Client.Passphrase, - Timestamp: msTimestamp, - Sign: sign, - }, - }, - } - - log.Infof("sending okex login request") - err := stream.Conn.WriteJSON(op) - if err != nil { - log.WithError(err).Errorf("can not send login message") + log.WithError(err).Error("private channel subscribe error") } } - }) - - return stream + } } -func (s *Stream) Close() error { - return nil -} +func (s *Stream) handleOrderDetailsEvent(orderDetails []okexapi.OrderDetails) { + detailTrades, detailOrders := segmentOrderDetails(orderDetails) -func (s *Stream) Connect(ctx context.Context) error { - err := s.connect(ctx) + trades, err := toGlobalTrades(detailTrades) if err != nil { - return err + log.WithError(err).Errorf("error converting order details into trades") + } else { + for _, trade := range trades { + s.EmitTradeUpdate(trade) + } } - // start one re-connector goroutine with the base context - go s.Reconnector(ctx) - - s.EmitStart() - return nil -} - -func (s *Stream) Reconnector(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - - case <-s.ReconnectC: - log.Warnf("received reconnect signal, reconnecting...") - time.Sleep(3 * time.Second) - - if err := s.connect(ctx); err != nil { - log.WithError(err).Errorf("connect error, try to reconnect again...") - s.Reconnect() - } + orders, err := toGlobalOrders(detailOrders) + if err != nil { + log.WithError(err).Errorf("error converting order details into orders") + } else { + for _, order := range orders { + s.EmitOrderUpdate(order) } } } -func (s *Stream) connect(ctx context.Context) error { - // when in public mode, the listen key is an empty string +func (s *Stream) handleAccountEvent(account okexapi.Account) { + balances := toGlobalBalance(&account) + s.EmitBalanceSnapshot(balances) +} + +func (s *Stream) handleBookEvent(data BookEvent) { + book := data.Book() + switch data.Action { + case "snapshot": + s.EmitBookSnapshot(book) + case "update": + s.EmitBookUpdate(book) + } +} + +func (s *Stream) handleCandleEvent(candle Candle) { + key := CandleKey{Channel: candle.Channel, InstrumentID: candle.InstrumentID} + kline := candle.KLine() + + // check if we need to close previous kline + lastCandle, ok := s.lastCandle[key] + if ok && candle.StartTime.After(lastCandle.StartTime) { + lastKline := lastCandle.KLine() + lastKline.Closed = true + s.EmitKLineClosed(lastKline) + } + + s.EmitKLine(kline) + s.lastCandle[key] = candle +} + +func (s *Stream) createEndpoint(ctx context.Context) (string, error) { var url string if s.PublicOnly { url = okexapi.PublicWebSocketURL } else { url = okexapi.PrivateWebSocketURL } - - conn, err := s.StandardStream.Dial(ctx, url) - if err != nil { - return err - } - - log.Infof("websocket connected: %s", url) - - // should only start one connection one time, so we lock the mutex - s.connLock.Lock() - - // ensure the previous context is cancelled - if s.connCancel != nil { - s.connCancel() - } - - // create a new context - s.connCtx, s.connCancel = context.WithCancel(ctx) - - conn.SetReadDeadline(time.Now().Add(readTimeout)) - conn.SetPongHandler(func(string) error { - conn.SetReadDeadline(time.Now().Add(readTimeout)) - return nil - }) - - s.Conn = conn - s.connLock.Unlock() - - s.EmitConnect() - - go s.read(s.connCtx) - go s.ping(s.connCtx) - return nil + return url, nil } -func (s *Stream) read(ctx context.Context) { - defer func() { - if s.connCancel != nil { - s.connCancel() +func (s *Stream) dispatchEvent(e interface{}) { + switch et := e.(type) { + case *WebSocketEvent: + s.EmitEvent(*et) + + case *BookEvent: + // there's "books" for 400 depth and books5 for 5 depth + if et.channel != "books5" { + s.EmitBookEvent(*et) } - s.EmitDisconnect() - }() + s.EmitBookTickerUpdate(et.BookTicker()) + case *Candle: + s.EmitCandleEvent(*et) - for { - select { + case *okexapi.Account: + s.EmitAccountEvent(*et) - case <-ctx.Done(): - return + case []okexapi.OrderDetails: + s.EmitOrderDetailsEvent(et) - default: - s.connLock.Lock() - conn := s.Conn - s.connLock.Unlock() - - if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { - log.WithError(err).Errorf("set read deadline error: %s", err.Error()) - } - - mt, message, err := conn.ReadMessage() - if err != nil { - // if it's a network timeout error, we should re-connect - switch err := err.(type) { - - // if it's a websocket related error - case *websocket.CloseError: - if err.Code == websocket.CloseNormalClosure { - return - } - - // for unexpected close error, we should re-connect - // emit reconnect to start a new connection - s.Reconnect() - return - - case net.Error: - log.WithError(err).Error("network error") - s.Reconnect() - return - - default: - log.WithError(err).Error("unexpected connection error") - s.Reconnect() - return - } - } - - // skip non-text messages - if mt != websocket.TextMessage { - continue - } - - e, err := Parse(string(message)) - if err != nil { - log.WithError(err).Error("message parse error") - } - - if e != nil { - switch et := e.(type) { - case *WebSocketEvent: - s.EmitEvent(*et) - - case *BookData: - // there's "books" for 400 depth and books5 for 5 depth - if et.channel != "books5" { - s.EmitBookData(*et) - } - s.EmitBookTickerUpdate(et.BookTicker()) - case *Candle: - s.EmitCandleData(*et) - - case *okexapi.Account: - s.EmitAccount(*et) - - case []okexapi.OrderDetails: - s.EmitOrderDetails(et) - - } - } - } - } -} - -func (s *Stream) ping(ctx context.Context) { - pingTicker := time.NewTicker(readTimeout / 2) - defer pingTicker.Stop() - - for { - select { - - case <-ctx.Done(): - log.Debug("ping worker stopped") - return - - case <-pingTicker.C: - s.connLock.Lock() - conn := s.Conn - s.connLock.Unlock() - - if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil { - log.WithError(err).Error("ping error", err) - s.Reconnect() - } - } } } diff --git a/pkg/exchange/okex/stream_callbacks.go b/pkg/exchange/okex/stream_callbacks.go index 3c5615ce5..6fb6e7231 100644 --- a/pkg/exchange/okex/stream_callbacks.go +++ b/pkg/exchange/okex/stream_callbacks.go @@ -6,22 +6,22 @@ import ( "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" ) -func (s *Stream) OnCandleData(cb func(candle Candle)) { - s.candleDataCallbacks = append(s.candleDataCallbacks, cb) +func (s *Stream) OnCandleEvent(cb func(candle Candle)) { + s.candleEventCallbacks = append(s.candleEventCallbacks, cb) } -func (s *Stream) EmitCandleData(candle Candle) { - for _, cb := range s.candleDataCallbacks { +func (s *Stream) EmitCandleEvent(candle Candle) { + for _, cb := range s.candleEventCallbacks { cb(candle) } } -func (s *Stream) OnBookData(cb func(book BookData)) { - s.bookDataCallbacks = append(s.bookDataCallbacks, cb) +func (s *Stream) OnBookEvent(cb func(book BookEvent)) { + s.bookEventCallbacks = append(s.bookEventCallbacks, cb) } -func (s *Stream) EmitBookData(book BookData) { - for _, cb := range s.bookDataCallbacks { +func (s *Stream) EmitBookEvent(book BookEvent) { + for _, cb := range s.bookEventCallbacks { cb(book) } } @@ -36,34 +36,34 @@ func (s *Stream) EmitEvent(event WebSocketEvent) { } } -func (s *Stream) OnAccount(cb func(account okexapi.Account)) { - s.accountCallbacks = append(s.accountCallbacks, cb) +func (s *Stream) OnAccountEvent(cb func(account okexapi.Account)) { + s.accountEventCallbacks = append(s.accountEventCallbacks, cb) } -func (s *Stream) EmitAccount(account okexapi.Account) { - for _, cb := range s.accountCallbacks { +func (s *Stream) EmitAccountEvent(account okexapi.Account) { + for _, cb := range s.accountEventCallbacks { cb(account) } } -func (s *Stream) OnOrderDetails(cb func(orderDetails []okexapi.OrderDetails)) { - s.orderDetailsCallbacks = append(s.orderDetailsCallbacks, cb) +func (s *Stream) OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails)) { + s.orderDetailsEventCallbacks = append(s.orderDetailsEventCallbacks, cb) } -func (s *Stream) EmitOrderDetails(orderDetails []okexapi.OrderDetails) { - for _, cb := range s.orderDetailsCallbacks { +func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) { + for _, cb := range s.orderDetailsEventCallbacks { cb(orderDetails) } } type StreamEventHub interface { - OnCandleData(cb func(candle Candle)) + OnCandleEvent(cb func(candle Candle)) - OnBookData(cb func(book BookData)) + OnBookEvent(cb func(book BookEvent)) OnEvent(cb func(event WebSocketEvent)) - OnAccount(cb func(account okexapi.Account)) + OnAccountEvent(cb func(account okexapi.Account)) - OnOrderDetails(cb func(orderDetails []okexapi.OrderDetails)) + OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails)) }