okex: convert book data to book snapshot and book update

This commit is contained in:
c9s 2021-05-27 16:01:15 +08:00
parent 884e764fe7
commit 1d400e281c
3 changed files with 72 additions and 8 deletions

View File

@ -6,6 +6,7 @@ import (
"strconv"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/valyala/fastjson"
)
@ -54,6 +55,22 @@ type BookData struct {
Checksum int
}
func (data *BookData) Book() types.SliceOrderBook {
book := types.SliceOrderBook{
Symbol: data.Symbol,
}
for _, bid := range data.Bids {
book.Bids = append(book.Bids, types.PriceVolume{Price: bid.Price, Volume: bid.Volume})
}
for _, ask := range data.Asks {
book.Asks = append(book.Asks, types.PriceVolume{Price: ask.Price, Volume: ask.Volume})
}
return book
}
type BookEntry struct {
Price fixedpoint.Value
Volume fixedpoint.Value

View File

@ -28,7 +28,10 @@ type Stream struct {
publicOnly bool
klineCallbacks []func()
// public callbacks
cancelDataCallbacks []func()
bookDataCallbacks []func(data BookData)
eventCallbacks []func(event WebSocketEvent)
}
func NewStream(client *okexapi.RestClient) *Stream {
@ -39,6 +42,16 @@ func NewStream(client *okexapi.RestClient) *Stream {
},
}
stream.OnBookData(func(data BookData) {
book := data.Book()
switch data.Action {
case "snapshot":
stream.EmitBookSnapshot(book)
case "update":
stream.EmitBookUpdate(book)
}
})
stream.OnConnect(func() {
var subs []WebsocketSubscription
for _, subscription := range stream.Subscriptions {
@ -213,7 +226,17 @@ func (s *Stream) read(ctx context.Context) {
log.WithError(err).Error("message parse error")
}
log.Infof("%+v", e)
if e != nil {
log.Infof("%+v", e)
switch et := e.(type) {
case *WebSocketEvent:
s.EmitEvent(*et)
case *BookData:
s.EmitBookData(*et)
}
}
}
}
}

View File

@ -4,16 +4,40 @@ package okex
import ()
func (s *Stream) OnKline(cb func()) {
s.klineCallbacks = append(s.klineCallbacks, cb)
func (s *Stream) OnCancelData(cb func()) {
s.cancelDataCallbacks = append(s.cancelDataCallbacks, cb)
}
func (s *Stream) EmitKline() {
for _, cb := range s.klineCallbacks {
func (s *Stream) EmitCancelData() {
for _, cb := range s.cancelDataCallbacks {
cb()
}
}
type StreamEventHub interface {
OnKline(cb func())
func (s *Stream) OnBookData(cb func(data BookData)) {
s.bookDataCallbacks = append(s.bookDataCallbacks, cb)
}
func (s *Stream) EmitBookData(data BookData) {
for _, cb := range s.bookDataCallbacks {
cb(data)
}
}
func (s *Stream) OnEvent(cb func(event WebSocketEvent)) {
s.eventCallbacks = append(s.eventCallbacks, cb)
}
func (s *Stream) EmitEvent(event WebSocketEvent) {
for _, cb := range s.eventCallbacks {
cb(event)
}
}
type StreamEventHub interface {
OnCancelData(cb func())
OnBookData(cb func(data BookData))
OnEvent(cb func(event WebSocketEvent))
}