okex: refactor okex stream

This commit is contained in:
c9s 2022-01-02 02:37:33 +08:00
parent 9d382a6b8c
commit 96fedfd311
3 changed files with 166 additions and 322 deletions

View File

@ -8,14 +8,15 @@ import (
"strings"
"time"
"github.com/valyala/fastjson"
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/valyala/fastjson"
)
func Parse(str string) (interface{}, error) {
v, err := fastjson.Parse(str)
func parseWebSocketEvent(str []byte) (interface{}, error) {
v, err := fastjson.ParseBytes(str)
if err != nil {
return nil, err
}
@ -52,7 +53,7 @@ func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) {
}, nil
}
type BookData struct {
type BookEvent struct {
InstrumentID string
Symbol string
Action string
@ -63,7 +64,7 @@ type BookData struct {
channel string
}
func (data *BookData) BookTicker() types.BookTicker {
func (data *BookEvent) BookTicker() types.BookTicker {
var askBookData BookEntry = data.Asks[0]
var bidBookData BookEntry = data.Bids[0]
@ -77,7 +78,7 @@ func (data *BookData) BookTicker() types.BookTicker {
}
}
func (data *BookData) Book() types.SliceOrderBook {
func (data *BookEvent) Book() types.SliceOrderBook {
book := types.SliceOrderBook{
Symbol: data.Symbol,
}
@ -130,7 +131,7 @@ func parseBookEntry(v *fastjson.Value) (*BookEntry, error) {
}, nil
}
func parseBookData(v *fastjson.Value) (*BookData, error) {
func parseBookData(v *fastjson.Value) (*BookEvent, error) {
instrumentId := string(v.GetStringBytes("arg", "instId"))
data := v.GetArray("data")
if len(data) == 0 {
@ -166,7 +167,7 @@ func parseBookData(v *fastjson.Value) (*BookData, error) {
bids = append(bids, *entry)
}
return &BookData{
return &BookEvent{
InstrumentID: instrumentId,
Symbol: toGlobalSymbol(instrumentId),
Action: action,

View File

@ -2,18 +2,13 @@ package okex
import (
"context"
"net"
"strconv"
"sync"
"time"
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/types"
"github.com/gorilla/websocket"
)
const readTimeout = 15 * time.Second
type WebsocketOp struct {
Op string `json:"op"`
Args interface{} `json:"args"`
@ -30,18 +25,14 @@ type WebsocketLogin struct {
type Stream struct {
types.StandardStream
Client *okexapi.RestClient
Conn *websocket.Conn
connLock sync.Mutex
connCtx context.Context
connCancel context.CancelFunc
client *okexapi.RestClient
// public callbacks
candleDataCallbacks []func(candle Candle)
bookDataCallbacks []func(book BookData)
candleEventCallbacks []func(candle Candle)
bookEventCallbacks []func(book BookEvent)
eventCallbacks []func(event WebSocketEvent)
accountCallbacks []func(account okexapi.Account)
orderDetailsCallbacks []func(orderDetails []okexapi.OrderDetails)
accountEventCallbacks []func(account okexapi.Account)
orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails)
lastCandle map[CandleKey]Candle
}
@ -53,328 +44,180 @@ type CandleKey struct {
func NewStream(client *okexapi.RestClient) *Stream {
stream := &Stream{
Client: client,
StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1),
},
lastCandle: make(map[CandleKey]Candle),
client: client,
StandardStream: types.NewStandardStream(),
lastCandle: make(map[CandleKey]Candle),
}
stream.OnCandleData(func(candle Candle) {
key := CandleKey{Channel: candle.Channel, InstrumentID: candle.InstrumentID}
kline := candle.KLine()
stream.SetParser(parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent)
stream.SetEndpointCreator(stream.createEndpoint)
// check if we need to close previous kline
lastCandle, ok := stream.lastCandle[key]
if ok && candle.StartTime.After(lastCandle.StartTime) {
lastKline := lastCandle.KLine()
lastKline.Closed = true
stream.EmitKLineClosed(lastKline)
stream.OnCandleEvent(stream.handleCandleEvent)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnAccountEvent(stream.handleAccountEvent)
stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent)
stream.OnEvent(stream.handleEvent)
stream.OnConnect(stream.handleConnect)
return stream
}
func (s *Stream) handleConnect() {
if s.PublicOnly {
var subs []WebsocketSubscription
for _, subscription := range s.Subscriptions {
sub, err := convertSubscription(subscription)
if err != nil {
log.WithError(err).Errorf("subscription convert error")
continue
}
subs = append(subs, sub)
}
if len(subs) == 0 {
return
}
stream.EmitKLine(kline)
stream.lastCandle[key] = candle
})
log.Infof("subscribing channels: %+v", subs)
err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: subs,
})
stream.OnBookData(func(data BookData) {
book := data.Book()
switch data.Action {
case "snapshot":
stream.EmitBookSnapshot(book)
case "update":
stream.EmitBookUpdate(book)
}
})
stream.OnAccount(func(account okexapi.Account) {
balances := toGlobalBalance(&account)
stream.EmitBalanceSnapshot(balances)
})
stream.OnOrderDetails(func(orderDetails []okexapi.OrderDetails) {
detailTrades, detailOrders := segmentOrderDetails(orderDetails)
trades, err := toGlobalTrades(detailTrades)
if err != nil {
log.WithError(err).Errorf("error converting order details into trades")
} else {
for _, trade := range trades {
stream.EmitTradeUpdate(trade)
}
log.WithError(err).Error("subscribe error")
}
} else {
// login as private channel
// sign example:
// sign=CryptoJS.enc.Base64.Stringify(CryptoJS.HmacSHA256(timestamp +'GET'+'/users/self/verify', secretKey))
msTimestamp := strconv.FormatFloat(float64(time.Now().UnixNano())/float64(time.Second), 'f', -1, 64)
payload := msTimestamp + "GET" + "/users/self/verify"
sign := okexapi.Sign(payload, s.client.Secret)
op := WebsocketOp{
Op: "login",
Args: []WebsocketLogin{
{
Key: s.client.Key,
Passphrase: s.client.Passphrase,
Timestamp: msTimestamp,
Sign: sign,
},
},
}
orders, err := toGlobalOrders(detailOrders)
log.Infof("sending okex login request")
err := s.Conn.WriteJSON(op)
if err != nil {
log.WithError(err).Errorf("error converting order details into orders")
} else {
for _, order := range orders {
stream.EmitOrderUpdate(order)
}
log.WithError(err).Errorf("can not send login message")
}
})
}
}
stream.OnEvent(func(event WebSocketEvent) {
switch event.Event {
case "login":
if event.Code == "0" {
var subs = []WebsocketSubscription{
{Channel: "account"},
{Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)},
}
log.Infof("subscribing private channels: %+v", subs)
err := stream.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: subs,
})
if err != nil {
log.WithError(err).Error("private channel subscribe error")
}
}
}
})
stream.OnConnect(func() {
if stream.PublicOnly {
var subs []WebsocketSubscription
for _, subscription := range stream.Subscriptions {
sub, err := convertSubscription(subscription)
if err != nil {
log.WithError(err).Errorf("subscription convert error")
continue
}
subs = append(subs, sub)
}
if len(subs) == 0 {
return
func (s *Stream) handleEvent(event WebSocketEvent) {
switch event.Event {
case "login":
if event.Code == "0" {
var subs = []WebsocketSubscription{
{Channel: "account"},
{Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)},
}
log.Infof("subscribing channels: %+v", subs)
err := stream.Conn.WriteJSON(WebsocketOp{
log.Infof("subscribing private channels: %+v", subs)
err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: subs,
})
if err != nil {
log.WithError(err).Error("subscribe error")
}
} else {
// login as private channel
// sign example:
// sign=CryptoJS.enc.Base64.Stringify(CryptoJS.HmacSHA256(timestamp +'GET'+'/users/self/verify', secretKey))
msTimestamp := strconv.FormatFloat(float64(time.Now().UnixNano())/float64(time.Second), 'f', -1, 64)
payload := msTimestamp + "GET" + "/users/self/verify"
sign := okexapi.Sign(payload, stream.Client.Secret)
op := WebsocketOp{
Op: "login",
Args: []WebsocketLogin{
{
Key: stream.Client.Key,
Passphrase: stream.Client.Passphrase,
Timestamp: msTimestamp,
Sign: sign,
},
},
}
log.Infof("sending okex login request")
err := stream.Conn.WriteJSON(op)
if err != nil {
log.WithError(err).Errorf("can not send login message")
log.WithError(err).Error("private channel subscribe error")
}
}
})
return stream
}
}
func (s *Stream) Close() error {
return nil
}
func (s *Stream) handleOrderDetailsEvent(orderDetails []okexapi.OrderDetails) {
detailTrades, detailOrders := segmentOrderDetails(orderDetails)
func (s *Stream) Connect(ctx context.Context) error {
err := s.connect(ctx)
trades, err := toGlobalTrades(detailTrades)
if err != nil {
return err
log.WithError(err).Errorf("error converting order details into trades")
} else {
for _, trade := range trades {
s.EmitTradeUpdate(trade)
}
}
// start one re-connector goroutine with the base context
go s.Reconnector(ctx)
s.EmitStart()
return nil
}
func (s *Stream) Reconnector(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-s.ReconnectC:
log.Warnf("received reconnect signal, reconnecting...")
time.Sleep(3 * time.Second)
if err := s.connect(ctx); err != nil {
log.WithError(err).Errorf("connect error, try to reconnect again...")
s.Reconnect()
}
orders, err := toGlobalOrders(detailOrders)
if err != nil {
log.WithError(err).Errorf("error converting order details into orders")
} else {
for _, order := range orders {
s.EmitOrderUpdate(order)
}
}
}
func (s *Stream) connect(ctx context.Context) error {
// when in public mode, the listen key is an empty string
func (s *Stream) handleAccountEvent(account okexapi.Account) {
balances := toGlobalBalance(&account)
s.EmitBalanceSnapshot(balances)
}
func (s *Stream) handleBookEvent(data BookEvent) {
book := data.Book()
switch data.Action {
case "snapshot":
s.EmitBookSnapshot(book)
case "update":
s.EmitBookUpdate(book)
}
}
func (s *Stream) handleCandleEvent(candle Candle) {
key := CandleKey{Channel: candle.Channel, InstrumentID: candle.InstrumentID}
kline := candle.KLine()
// check if we need to close previous kline
lastCandle, ok := s.lastCandle[key]
if ok && candle.StartTime.After(lastCandle.StartTime) {
lastKline := lastCandle.KLine()
lastKline.Closed = true
s.EmitKLineClosed(lastKline)
}
s.EmitKLine(kline)
s.lastCandle[key] = candle
}
func (s *Stream) createEndpoint(ctx context.Context) (string, error) {
var url string
if s.PublicOnly {
url = okexapi.PublicWebSocketURL
} else {
url = okexapi.PrivateWebSocketURL
}
conn, err := s.StandardStream.Dial(ctx, url)
if err != nil {
return err
}
log.Infof("websocket connected: %s", url)
// should only start one connection one time, so we lock the mutex
s.connLock.Lock()
// ensure the previous context is cancelled
if s.connCancel != nil {
s.connCancel()
}
// create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx)
conn.SetReadDeadline(time.Now().Add(readTimeout))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(readTimeout))
return nil
})
s.Conn = conn
s.connLock.Unlock()
s.EmitConnect()
go s.read(s.connCtx)
go s.ping(s.connCtx)
return nil
return url, nil
}
func (s *Stream) read(ctx context.Context) {
defer func() {
if s.connCancel != nil {
s.connCancel()
func (s *Stream) dispatchEvent(e interface{}) {
switch et := e.(type) {
case *WebSocketEvent:
s.EmitEvent(*et)
case *BookEvent:
// there's "books" for 400 depth and books5 for 5 depth
if et.channel != "books5" {
s.EmitBookEvent(*et)
}
s.EmitDisconnect()
}()
s.EmitBookTickerUpdate(et.BookTicker())
case *Candle:
s.EmitCandleEvent(*et)
for {
select {
case *okexapi.Account:
s.EmitAccountEvent(*et)
case <-ctx.Done():
return
case []okexapi.OrderDetails:
s.EmitOrderDetailsEvent(et)
default:
s.connLock.Lock()
conn := s.Conn
s.connLock.Unlock()
if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
}
mt, message, err := conn.ReadMessage()
if err != nil {
// if it's a network timeout error, we should re-connect
switch err := err.(type) {
// if it's a websocket related error
case *websocket.CloseError:
if err.Code == websocket.CloseNormalClosure {
return
}
// for unexpected close error, we should re-connect
// emit reconnect to start a new connection
s.Reconnect()
return
case net.Error:
log.WithError(err).Error("network error")
s.Reconnect()
return
default:
log.WithError(err).Error("unexpected connection error")
s.Reconnect()
return
}
}
// skip non-text messages
if mt != websocket.TextMessage {
continue
}
e, err := Parse(string(message))
if err != nil {
log.WithError(err).Error("message parse error")
}
if e != nil {
switch et := e.(type) {
case *WebSocketEvent:
s.EmitEvent(*et)
case *BookData:
// there's "books" for 400 depth and books5 for 5 depth
if et.channel != "books5" {
s.EmitBookData(*et)
}
s.EmitBookTickerUpdate(et.BookTicker())
case *Candle:
s.EmitCandleData(*et)
case *okexapi.Account:
s.EmitAccount(*et)
case []okexapi.OrderDetails:
s.EmitOrderDetails(et)
}
}
}
}
}
func (s *Stream) ping(ctx context.Context) {
pingTicker := time.NewTicker(readTimeout / 2)
defer pingTicker.Stop()
for {
select {
case <-ctx.Done():
log.Debug("ping worker stopped")
return
case <-pingTicker.C:
s.connLock.Lock()
conn := s.Conn
s.connLock.Unlock()
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err)
s.Reconnect()
}
}
}
}

View File

@ -6,22 +6,22 @@ import (
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
)
func (s *Stream) OnCandleData(cb func(candle Candle)) {
s.candleDataCallbacks = append(s.candleDataCallbacks, cb)
func (s *Stream) OnCandleEvent(cb func(candle Candle)) {
s.candleEventCallbacks = append(s.candleEventCallbacks, cb)
}
func (s *Stream) EmitCandleData(candle Candle) {
for _, cb := range s.candleDataCallbacks {
func (s *Stream) EmitCandleEvent(candle Candle) {
for _, cb := range s.candleEventCallbacks {
cb(candle)
}
}
func (s *Stream) OnBookData(cb func(book BookData)) {
s.bookDataCallbacks = append(s.bookDataCallbacks, cb)
func (s *Stream) OnBookEvent(cb func(book BookEvent)) {
s.bookEventCallbacks = append(s.bookEventCallbacks, cb)
}
func (s *Stream) EmitBookData(book BookData) {
for _, cb := range s.bookDataCallbacks {
func (s *Stream) EmitBookEvent(book BookEvent) {
for _, cb := range s.bookEventCallbacks {
cb(book)
}
}
@ -36,34 +36,34 @@ func (s *Stream) EmitEvent(event WebSocketEvent) {
}
}
func (s *Stream) OnAccount(cb func(account okexapi.Account)) {
s.accountCallbacks = append(s.accountCallbacks, cb)
func (s *Stream) OnAccountEvent(cb func(account okexapi.Account)) {
s.accountEventCallbacks = append(s.accountEventCallbacks, cb)
}
func (s *Stream) EmitAccount(account okexapi.Account) {
for _, cb := range s.accountCallbacks {
func (s *Stream) EmitAccountEvent(account okexapi.Account) {
for _, cb := range s.accountEventCallbacks {
cb(account)
}
}
func (s *Stream) OnOrderDetails(cb func(orderDetails []okexapi.OrderDetails)) {
s.orderDetailsCallbacks = append(s.orderDetailsCallbacks, cb)
func (s *Stream) OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails)) {
s.orderDetailsEventCallbacks = append(s.orderDetailsEventCallbacks, cb)
}
func (s *Stream) EmitOrderDetails(orderDetails []okexapi.OrderDetails) {
for _, cb := range s.orderDetailsCallbacks {
func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) {
for _, cb := range s.orderDetailsEventCallbacks {
cb(orderDetails)
}
}
type StreamEventHub interface {
OnCandleData(cb func(candle Candle))
OnCandleEvent(cb func(candle Candle))
OnBookData(cb func(book BookData))
OnBookEvent(cb func(book BookEvent))
OnEvent(cb func(event WebSocketEvent))
OnAccount(cb func(account okexapi.Account))
OnAccountEvent(cb func(account okexapi.Account))
OnOrderDetails(cb func(orderDetails []okexapi.OrderDetails))
OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails))
}