all: fix and improve kucoin orderbook stream

This commit is contained in:
c9s 2021-12-25 19:34:27 +08:00
parent 3d1ca46c77
commit f0d4236169
10 changed files with 186 additions and 26 deletions

View File

@ -69,15 +69,11 @@ var orderbookCmd = &cobra.Command{
bid.Price.Float64(), bid.Volume.Float64())
}
})
s.OnBookUpdate(func(book types.SliceOrderBook) {
log.Infof("orderbook update: %s", book.String())
orderBook.Update(book)
if ok, err := orderBook.IsValid() ; !ok {
log.WithError(err).Panicf("invalid error book update")
}
if bid, ask, ok := orderBook.BestBidAndAsk() ; ok {
log.Infof("ASK | %f x %f / %f x %f | BID",
ask.Volume.Float64(), ask.Price.Float64(),
@ -138,7 +134,15 @@ var orderUpdateCmd = &cobra.Command{
if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", sessionName)
}
log.Infof("connected")
defer func() {
log.Infof("closing connection...")
if err := s.Close(); err != nil {
log.WithError(err).Errorf("connection close error")
}
time.Sleep(1 * time.Second)
}()
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil

View File

@ -3,6 +3,7 @@ package depth
import (
"fmt"
"sync"
"time"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
@ -33,6 +34,8 @@ type Buffer struct {
resetC chan struct{}
mu sync.Mutex
once util.Reonce
bufferingPeriod time.Duration
}
func NewBuffer(fetcher SnapshotFetcher) *Buffer {
@ -42,6 +45,10 @@ func NewBuffer(fetcher SnapshotFetcher) *Buffer {
}
}
func (b *Buffer) SetBufferingPeriod(d time.Duration) {
b.bufferingPeriod = d
}
func (b *Buffer) resetSnapshot() {
b.snapshot = nil
b.finalUpdateID = 0
@ -129,7 +136,7 @@ func (b *Buffer) fetchAndPush() error {
b.resetSnapshot()
b.emitReset()
b.mu.Unlock()
return fmt.Errorf("depth final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID)
return fmt.Errorf("depth snapshot is too early, final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID)
}
}
@ -139,7 +146,7 @@ func (b *Buffer) fetchAndPush() error {
b.resetSnapshot()
b.emitReset()
b.mu.Unlock()
return fmt.Errorf("the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
}
if u.FirstUpdateID < finalUpdateID+1 {
@ -168,6 +175,10 @@ func (b *Buffer) fetchAndPush() error {
func (b *Buffer) tryFetch() {
for {
if b.bufferingPeriod > 0 {
<-time.After(b.bufferingPeriod)
}
err := b.fetchAndPush()
if err != nil {
log.WithError(err).Errorf("snapshot fetch failed")

View File

@ -117,6 +117,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
})
stream.depthBuffers[e.Symbol] = f
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
log.Errorf("depth snapshot is invalid, error: %v", err)

View File

@ -230,10 +230,10 @@ func (s *MarketDataService) GetTicker24HStat(symbol string) (*Ticker24H, error)
}
*/
type OrderBook struct {
Sequence string `json:"sequence"`
Sequence string `json:"sequence,omitempty"`
Time types.MillisecondTimestamp `json:"time"`
Bids [][]fixedpoint.Value `json:"bids,omitempty"`
Asks [][]fixedpoint.Value `json:"asks,omitempty"`
Bids types.PriceVolumeSlice `json:"bids,omitempty"`
Asks types.PriceVolumeSlice `json:"asks,omitempty"`
}
func (s *MarketDataService) GetOrderBook(symbol string, depth int) (*OrderBook, error) {
@ -268,6 +268,8 @@ func (s *MarketDataService) GetOrderBook(symbol string, depth int) (*OrderBook,
return nil, err
}
fmt.Println(string(response.Body))
var apiResponse struct {
Code string `json:"code"`
Message string `json:"msg"`

View File

@ -3,9 +3,11 @@ package kucoin
import (
"context"
"net"
"strconv"
"sync"
"time"
"github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
@ -33,7 +35,8 @@ type Stream struct {
accountBalanceEventCallbacks []func(e *WebSocketAccountBalanceEvent)
privateOrderEventCallbacks []func(e *WebSocketPrivateOrderEvent)
lastCandle map[string]types.KLine
lastCandle map[string]types.KLine
depthBuffers map[string]*depth.Buffer
}
func NewStream(client *kucoinapi.RestClient) *Stream {
@ -42,7 +45,8 @@ func NewStream(client *kucoinapi.RestClient) *Stream {
StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1),
},
lastCandle: make(map[string]types.KLine),
lastCandle: make(map[string]types.KLine),
depthBuffers: make(map[string]*depth.Buffer),
}
stream.OnConnect(stream.handleConnect)
@ -66,7 +70,54 @@ func (s *Stream) handleCandleEvent(candle *WebSocketCandleEvent, e *WebSocketEve
s.lastCandle[e.Topic] = kline
}
func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {}
func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {
f, ok := s.depthBuffers[e.Symbol]
if !ok {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
orderBook, err := s.client.MarketDataService.GetOrderBook(e.Symbol, 100)
if err != nil {
return types.SliceOrderBook{}, 0, err
}
if len(orderBook.Sequence) == 0 {
return types.SliceOrderBook{}, 0, errors.New("sequence is missing")
}
sequence, err := strconv.ParseInt(orderBook.Sequence, 10, 64)
if err != nil {
return types.SliceOrderBook{}, 0, err
}
return types.SliceOrderBook{
Symbol: toGlobalSymbol(e.Symbol),
Bids: orderBook.Bids,
Asks: orderBook.Asks,
}, sequence, nil
})
s.depthBuffers[e.Symbol] = f
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
log.Errorf("depth snapshot is invalid, error: %v", err)
return
}
s.EmitBookSnapshot(snapshot)
for _, u := range updates {
s.EmitBookUpdate(u.Object)
}
})
f.OnPush(func(update depth.Update) {
s.EmitBookUpdate(update.Object)
})
} else {
f.AddUpdate(types.SliceOrderBook{
Symbol: e.Symbol,
Bids: e.Changes.Bids,
Asks: e.Changes.Asks,
}, e.SequenceStart, e.SequenceEnd)
}
}
func (s *Stream) handleTickerEvent(e *WebSocketTickerEvent) {}

View File

@ -78,8 +78,8 @@ type WebSocketOrderBookL2Event struct {
SequenceEnd int64 `json:"sequenceEnd"`
Symbol string `json:"symbol"`
Changes struct {
Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"`
Asks types.PriceVolumeSlice `json:"asks"`
Bids types.PriceVolumeSlice `json:"bids"`
} `json:"changes"`
}

View File

@ -271,6 +271,20 @@ func Parse(input string) (num int64, numDecimalPoints int, err error) {
return num, numDecimalPoints, nil
}
func NewFromAny(any interface{}) (Value, error) {
switch v := any.(type) {
case string:
return NewFromString(v)
case float64:
return NewFromFloat(v), nil
case int64:
return NewFromInt64(v), nil
default:
return 0, fmt.Errorf("fixedpoint unsupported type %v", v)
}
}
func NewFromString(input string) (Value, error) {
length := len(input)

View File

@ -1,7 +1,6 @@
package types
import (
"fmt"
"os"
"strconv"
"sync"
@ -10,14 +9,6 @@ import (
"github.com/c9s/bbgo/pkg/sigchan"
)
type PriceVolume struct {
Price, Volume fixedpoint.Value
}
func (p PriceVolume) String() string {
return fmt.Sprintf("PriceVolume{ price: %f, volume: %f }", p.Price.Float64(), p.Volume.Float64())
}
type OrderBook interface {
Spread() (fixedpoint.Value, bool)
BestAsk() (PriceVolume, bool)
@ -26,8 +17,8 @@ type OrderBook interface {
Load(book SliceOrderBook)
Update(book SliceOrderBook)
Copy() OrderBook
CopyDepth(depth int) OrderBook
SideBook(sideType SideType) PriceVolumeSlice
CopyDepth(depth int) OrderBook
IsValid() (bool, error)
}

View File

@ -1,11 +1,21 @@
package types
import (
"encoding/json"
"fmt"
"sort"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
type PriceVolume struct {
Price, Volume fixedpoint.Value
}
func (p PriceVolume) String() string {
return fmt.Sprintf("PriceVolume{ price: %f, volume: %f }", p.Price.Float64(), p.Volume.Float64())
}
type PriceVolumeSlice []PriceVolume
func (slice PriceVolumeSlice) Len() int { return len(slice) }
@ -74,7 +84,7 @@ func (slice PriceVolumeSlice) InsertAt(idx int, pv PriceVolume) PriceVolumeSlice
func (slice PriceVolumeSlice) Remove(price fixedpoint.Value, descending bool) PriceVolumeSlice {
matched, idx := slice.Find(price, descending)
if matched.Price != price {
if matched.Price != price || matched.Price == 0 {
return slice
}
@ -116,3 +126,50 @@ func (slice PriceVolumeSlice) Upsert(pv PriceVolume, descending bool) PriceVolum
slice[idx].Volume = pv.Volume
return slice
}
func (slice *PriceVolumeSlice) UnmarshalJSON(b []byte) error {
s, err := ParsePriceVolumeSliceJSON(b)
if err != nil {
return err
}
*slice = s
return nil
}
// ParsePriceVolumeSliceJSON tries to parse a 2 dimensional string array into a PriceVolumeSlice
//
// [["9000", "10"], ["9900", "10"], ... ]
//
func ParsePriceVolumeSliceJSON(b []byte) (slice PriceVolumeSlice, err error) {
var as [][]interface{}
err = json.Unmarshal(b, &as)
if err != nil {
return slice, err
}
for _, a := range as {
var pv PriceVolume
price, err := fixedpoint.NewFromAny(a[0])
if err != nil {
return slice, err
}
volume, err := fixedpoint.NewFromAny(a[1])
if err != nil {
return slice, err
}
// kucoin returns price in 0, we should skip
if price == 0 {
continue
}
pv.Price = price
pv.Volume = volume
slice = append(slice, pv)
}
return slice, nil
}

View File

@ -0,0 +1,29 @@
package types
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestPriceVolumeSlice_Remove(t *testing.T) {
for _, descending := range []bool{true, false} {
slice := PriceVolumeSlice{}
slice = slice.Upsert(PriceVolume{Price: 1}, descending)
slice = slice.Upsert(PriceVolume{Price: 3}, descending)
slice = slice.Upsert(PriceVolume{Price: 5}, descending)
assert.Equal(t, 3, len(slice), "with descending %v", descending)
slice = slice.Remove(2, descending)
assert.Equal(t, 3, len(slice), "with descending %v", descending)
slice = slice.Remove(3, descending)
assert.Equal(t, 2, len(slice), "with descending %v", descending)
slice = slice.Remove(99, descending)
assert.Equal(t, 2, len(slice), "with descending %v", descending)
slice = slice.Remove(0, descending)
assert.Equal(t, 2, len(slice), "with descending %v", descending)
}
}