From 1d400e281ca6da014fd53740387bdea2e8d9e8b2 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 27 May 2021 16:01:15 +0800 Subject: [PATCH] okex: convert book data to book snapshot and book update --- pkg/exchange/okex/parse.go | 17 +++++++++++++ pkg/exchange/okex/stream.go | 27 ++++++++++++++++++-- pkg/exchange/okex/stream_callbacks.go | 36 ++++++++++++++++++++++----- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 9c3472444..739c325b9 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -6,6 +6,7 @@ import ( "strconv" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" "github.com/valyala/fastjson" ) @@ -54,6 +55,22 @@ type BookData struct { Checksum int } +func (data *BookData) Book() types.SliceOrderBook { + book := types.SliceOrderBook{ + Symbol: data.Symbol, + } + + for _, bid := range data.Bids { + book.Bids = append(book.Bids, types.PriceVolume{Price: bid.Price, Volume: bid.Volume}) + } + + for _, ask := range data.Asks { + book.Asks = append(book.Asks, types.PriceVolume{Price: ask.Price, Volume: ask.Volume}) + } + + return book +} + type BookEntry struct { Price fixedpoint.Value Volume fixedpoint.Value diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index b85d54dc2..2a5d6d27f 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -28,7 +28,10 @@ type Stream struct { publicOnly bool - klineCallbacks []func() + // public callbacks + cancelDataCallbacks []func() + bookDataCallbacks []func(data BookData) + eventCallbacks []func(event WebSocketEvent) } func NewStream(client *okexapi.RestClient) *Stream { @@ -39,6 +42,16 @@ func NewStream(client *okexapi.RestClient) *Stream { }, } + stream.OnBookData(func(data BookData) { + book := data.Book() + switch data.Action { + case "snapshot": + stream.EmitBookSnapshot(book) + case "update": + stream.EmitBookUpdate(book) + } + }) + stream.OnConnect(func() { var subs []WebsocketSubscription for _, subscription := range stream.Subscriptions { @@ -213,7 +226,17 @@ func (s *Stream) read(ctx context.Context) { log.WithError(err).Error("message parse error") } - log.Infof("%+v", e) + if e != nil { + log.Infof("%+v", e) + + switch et := e.(type) { + case *WebSocketEvent: + s.EmitEvent(*et) + + case *BookData: + s.EmitBookData(*et) + } + } } } } diff --git a/pkg/exchange/okex/stream_callbacks.go b/pkg/exchange/okex/stream_callbacks.go index 8f65e5284..4a95aea36 100644 --- a/pkg/exchange/okex/stream_callbacks.go +++ b/pkg/exchange/okex/stream_callbacks.go @@ -4,16 +4,40 @@ package okex import () -func (s *Stream) OnKline(cb func()) { - s.klineCallbacks = append(s.klineCallbacks, cb) +func (s *Stream) OnCancelData(cb func()) { + s.cancelDataCallbacks = append(s.cancelDataCallbacks, cb) } -func (s *Stream) EmitKline() { - for _, cb := range s.klineCallbacks { +func (s *Stream) EmitCancelData() { + for _, cb := range s.cancelDataCallbacks { cb() } } -type StreamEventHub interface { - OnKline(cb func()) +func (s *Stream) OnBookData(cb func(data BookData)) { + s.bookDataCallbacks = append(s.bookDataCallbacks, cb) +} + +func (s *Stream) EmitBookData(data BookData) { + for _, cb := range s.bookDataCallbacks { + cb(data) + } +} + +func (s *Stream) OnEvent(cb func(event WebSocketEvent)) { + s.eventCallbacks = append(s.eventCallbacks, cb) +} + +func (s *Stream) EmitEvent(event WebSocketEvent) { + for _, cb := range s.eventCallbacks { + cb(event) + } +} + +type StreamEventHub interface { + OnCancelData(cb func()) + + OnBookData(cb func(data BookData)) + + OnEvent(cb func(event WebSocketEvent)) }