okex: handle kline close event

This commit is contained in:
c9s 2021-05-27 18:43:42 +08:00
parent 2844b7c3a7
commit 545d0f18e3

View File

@ -32,6 +32,13 @@ type Stream struct {
candleDataCallbacks []func(candle Candle) candleDataCallbacks []func(candle Candle)
bookDataCallbacks []func(book BookData) bookDataCallbacks []func(book BookData)
eventCallbacks []func(event WebSocketEvent) eventCallbacks []func(event WebSocketEvent)
lastCandle map[CandleKey]Candle
}
type CandleKey struct {
InstrumentID string
Channel string
} }
func NewStream(client *okexapi.RestClient) *Stream { func NewStream(client *okexapi.RestClient) *Stream {
@ -40,11 +47,23 @@ func NewStream(client *okexapi.RestClient) *Stream {
StandardStream: types.StandardStream{ StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1), ReconnectC: make(chan struct{}, 1),
}, },
lastCandle: make(map[CandleKey]Candle),
} }
stream.OnCandleData(func(candle Candle) { stream.OnCandleData(func(candle Candle) {
key := CandleKey{Channel: candle.Channel, InstrumentID: candle.InstrumentID}
kline := candle.KLine() kline := candle.KLine()
// check if we need to close previous kline
lastCandle, ok := stream.lastCandle[key]
if ok && candle.StartTime.After(lastCandle.StartTime) {
lastKline := lastCandle.KLine()
lastKline.Closed = true
stream.EmitKLineClosed(lastKline)
}
stream.EmitKLine(kline) stream.EmitKLine(kline)
stream.lastCandle[key] = candle
}) })
stream.OnBookData(func(data BookData) { stream.OnBookData(func(data BookData) {
@ -186,13 +205,11 @@ func (s *Stream) read(ctx context.Context) {
return return
default: default:
s.connLock.Lock()
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil { if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error()) log.WithError(err).Errorf("set read deadline error: %s", err.Error())
} }
mt, message, err := s.Conn.ReadMessage() mt, message, err := s.Conn.ReadMessage()
s.connLock.Unlock()
if err != nil { if err != nil {
// if it's a network timeout error, we should re-connect // if it's a network timeout error, we should re-connect
@ -232,8 +249,6 @@ func (s *Stream) read(ctx context.Context) {
} }
if e != nil { if e != nil {
log.Infof("%+v", e)
switch et := e.(type) { switch et := e.(type) {
case *WebSocketEvent: case *WebSocketEvent:
s.EmitEvent(*et) s.EmitEvent(*et)