okex: implement basic stream

This commit is contained in:
c9s 2021-05-27 01:07:25 +08:00
parent 2381df5009
commit 2538824661
3 changed files with 226 additions and 3 deletions

View File

@ -22,7 +22,6 @@ func toLocalSymbol(symbol string) string {
return symbol return symbol
} }
func toGlobalTicker(marketTicker okexapi.MarketTicker) *types.Ticker { func toGlobalTicker(marketTicker okexapi.MarketTicker) *types.Ticker {
return &types.Ticker{ return &types.Ticker{
Time: marketTicker.Timestamp.Time(), Time: marketTicker.Timestamp.Time(),
@ -42,7 +41,7 @@ func toGlobalBalance(balanceSummaries []okexapi.BalanceSummary) types.BalanceMap
for _, balanceDetail := range balanceSummary.Details { for _, balanceDetail := range balanceSummary.Details {
balanceMap[balanceDetail.Currency] = types.Balance{ balanceMap[balanceDetail.Currency] = types.Balance{
Currency: balanceDetail.Currency, Currency: balanceDetail.Currency,
Available: balanceDetail.Available, Available: balanceDetail.CashBalance,
Locked: balanceDetail.Frozen, Locked: balanceDetail.Frozen,
} }
} }

View File

@ -31,6 +31,7 @@ func New(key, secret, passphrase string) *Exchange {
key: key, key: key,
secret: secret, secret: secret,
passphrase: passphrase, passphrase: passphrase,
client: client,
} }
} }
@ -162,7 +163,7 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro
} }
func (e *Exchange) NewStream() types.Stream { func (e *Exchange) NewStream() types.Stream {
panic("implement me") return NewStream(e.client)
} }
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {

223
pkg/exchange/okex/stream.go Normal file
View File

@ -0,0 +1,223 @@
package okex
import (
"context"
"net"
"sync"
"time"
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/types"
"github.com/gorilla/websocket"
)
//go:generate callbackgen -type Stream -interface
type Stream struct {
types.StandardStream
Client *okexapi.RestClient
ListenKey string
Conn *websocket.Conn
connLock sync.Mutex
reconnectC chan struct{}
connCtx context.Context
connCancel context.CancelFunc
publicOnly bool
klineCallbacks []func()
}
func NewStream(client *okexapi.RestClient) *Stream {
stream := &Stream{
Client: client,
reconnectC: make(chan struct{}, 1),
}
return stream
}
func (s *Stream) SetPublicOnly() {
s.publicOnly = true
}
func (s *Stream) Close() error {
return nil
}
func (s *Stream) Connect(ctx context.Context) error {
err := s.connect(ctx)
if err != nil {
return err
}
// start one re-connector goroutine with the base context
go s.reconnector(ctx)
s.EmitStart()
return nil
}
func (s *Stream) emitReconnect() {
select {
case s.reconnectC <- struct{}{}:
default:
}
}
func (s *Stream) reconnector(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-s.reconnectC:
// ensure the previous context is cancelled
if s.connCancel != nil {
s.connCancel()
}
log.Warnf("received reconnect signal, reconnecting...")
time.Sleep(3 * time.Second)
if err := s.connect(ctx); err != nil {
log.WithError(err).Errorf("connect error, try to reconnect again...")
s.emitReconnect()
}
}
}
}
func (s *Stream) dial() (*websocket.Conn, error) {
var url string
if s.publicOnly {
url = okexapi.PublicWebSocketURL
} else {
url = okexapi.PrivateWebSocketURL
}
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
// use the default ping handler
conn.SetPingHandler(nil)
return conn, nil
}
func (s *Stream) connect(ctx context.Context) error {
// should only start one connection one time, so we lock the mutex
s.connLock.Lock()
// create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx)
if s.publicOnly {
log.Infof("stream is set to public only mode")
} else {
log.Infof("request listen key for creating user data stream...")
}
// when in public mode, the listen key is an empty string
conn, err := s.dial()
if err != nil {
s.connCancel()
s.connLock.Unlock()
return err
}
log.Infof("websocket connected")
s.Conn = conn
s.connLock.Unlock()
s.EmitConnect()
go s.read(s.connCtx)
go s.ping(s.connCtx)
return nil
}
func (s *Stream) read(ctx context.Context) {
defer func() {
if s.connCancel != nil {
s.connCancel()
}
s.EmitDisconnect()
}()
for {
select {
case <-ctx.Done():
return
default:
s.connLock.Lock()
if err := s.Conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
}
mt, message, err := s.Conn.ReadMessage()
s.connLock.Unlock()
if err != nil {
// if it's a network timeout error, we should re-connect
switch err := err.(type) {
// if it's a websocket related error
case *websocket.CloseError:
if err.Code == websocket.CloseNormalClosure {
return
}
// for unexpected close error, we should re-connect
// emit reconnect to start a new connection
s.emitReconnect()
return
case net.Error:
log.WithError(err).Error("network error")
s.emitReconnect()
return
default:
log.WithError(err).Error("unexpected connection error")
s.emitReconnect()
return
}
}
// skip non-text messages
if mt != websocket.TextMessage {
continue
}
log.Debug(string(message))
}
}
}
func (s *Stream) ping(ctx context.Context) {
pingTicker := time.NewTicker(15 * time.Second)
defer pingTicker.Stop()
for {
select {
case <-ctx.Done():
log.Info("ping worker stopped")
return
case <-pingTicker.C:
s.connLock.Lock()
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err)
s.emitReconnect()
}
s.connLock.Unlock()
}
}
}