support binance orderbook

This commit is contained in:
c9s 2020-10-05 14:16:55 +08:00
parent 1bbed2e477
commit 9cc4505594
3 changed files with 210 additions and 1 deletions

View File

@ -0,0 +1,25 @@
// Code generated by "callbackgen -type DepthFrame"; DO NOT EDIT.
package binance
import ()
func (f *DepthFrame) OnReady(cb func(snapshotDepth DepthEvent, bufEvents []DepthEvent)) {
f.readyCallbacks = append(f.readyCallbacks, cb)
}
func (f *DepthFrame) EmitReady(snapshotDepth DepthEvent, bufEvents []DepthEvent) {
for _, cb := range f.readyCallbacks {
cb(snapshotDepth, bufEvents)
}
}
func (f *DepthFrame) OnPush(cb func(e DepthEvent)) {
f.pushCallbacks = append(f.pushCallbacks, cb)
}
func (f *DepthFrame) EmitPush(e DepthEvent) {
for _, cb := range f.pushCallbacks {
cb(e)
}
}

View File

@ -6,8 +6,10 @@ import (
"fmt"
"time"
"github.com/adshao/go-binance"
"github.com/valyala/fastjson"
"github.com/c9s/bbgo/pkg/bbgo/fixedpoint"
"github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/c9s/bbgo/pkg/util"
)
@ -261,6 +263,50 @@ type DepthEvent struct {
Asks []DepthEntry
}
func (e *DepthEvent) OrderBook() (book types.OrderBook, err error) {
book.Symbol = e.Symbol
for _, entry := range e.Bids {
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
continue
}
price, err := fixedpoint.NewFromString(entry.PriceLevel)
if err != nil {
continue
}
pv := types.PriceVolume{
Price: price,
Volume: quantity,
}
book.Bids = book.Bids.Upsert(pv, true)
}
for _, entry := range e.Asks {
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
continue
}
price, err := fixedpoint.NewFromString(entry.PriceLevel)
if err != nil {
continue
}
pv := types.PriceVolume{
Price: price,
Volume: quantity,
}
book.Asks = book.Asks.Upsert(pv, false)
}
return
}
func parseDepthEntry(val *fastjson.Value) (*DepthEntry, error) {
arr, err := val.Array()
if err != nil {
@ -393,3 +439,6 @@ type EventBase struct {
Event string `json:"e"` // event
Time int64 `json:"E"`
}
func convertDepthResponseToSnapshot(client *binance.Client) {
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/c9s/bbgo/pkg/util"
@ -60,6 +61,49 @@ func NewStream(client *binance.Client) *Stream {
Client: client,
}
var depthFrames = make(map[string]*DepthFrame)
stream.OnDepthEvent(func(e *DepthEvent) {
f, ok := depthFrames[e.Symbol]
if !ok {
f = &DepthFrame{
client: client,
Symbol: e.Symbol,
}
f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) {
snapshot, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
return
}
stream.EmitBookSnapshot(snapshot)
for _, e := range bufEvents {
book, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
return
}
stream.EmitBookUpdate(book)
}
})
f.OnPush(func(e DepthEvent) {
book, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
return
}
stream.EmitBookUpdate(book)
})
depthFrames[e.Symbol] = f
} else {
f.PushEvent(*e)
}
})
stream.OnOutboundAccountInfoEvent(func(e *OutboundAccountInfoEvent) {
snapshot := map[string]types.Balance{}
for _, balance := range e.Balances {
@ -86,8 +130,10 @@ func NewStream(client *binance.Client) *Stream {
case "TRADE":
trade, err := e.Trade()
if err != nil {
log.WithError(err).Error("trade convert error")
break
}
stream.EmitTrade(trade)
}
})
@ -195,7 +241,6 @@ func (s *Stream) read(ctx context.Context) {
log.WithError(err).Error("ping error", err)
}
default:
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
@ -288,3 +333,93 @@ func maskListenKey(listenKey string) string {
maskKey := listenKey[0:5]
return maskKey + strings.Repeat("*", len(listenKey)-1-5)
}
//go:generate callbackgen -type DepthFrame
type DepthFrame struct {
client *binance.Client
mu sync.Mutex
once sync.Once
SnapshotDepth *DepthEvent
Symbol string
BufEvents []DepthEvent
readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent)
pushCallbacks []func(e DepthEvent)
}
func (f *DepthFrame) Reset() {
f.mu.Lock()
f.SnapshotDepth = nil
f.once = sync.Once{}
f.mu.Unlock()
}
func (f *DepthFrame) PushEvent(e DepthEvent) {
f.mu.Lock()
if f.SnapshotDepth == nil {
f.BufEvents = append(f.BufEvents, e)
f.mu.Unlock()
go f.once.Do(func() {
depth, err := f.fetch(context.Background())
if err != nil {
return
}
f.mu.Lock()
f.SnapshotDepth = depth
var events []DepthEvent
for _, e := range f.BufEvents {
/*
if i == 0 {
if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 {
// FIXME: we missed some events
log.Warn("miss matched final update id for order book")
f.SnapshotDepth = nil
f.mu.Unlock()
return
}
}
*/
if e.FirstUpdateID <= f.SnapshotDepth.FinalUpdateID || e.FinalUpdateID <= f.SnapshotDepth.FinalUpdateID {
continue
}
events = append(events, e)
}
f.BufEvents = nil
f.EmitReady(*depth, events)
f.mu.Unlock()
})
} else {
if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID || e.FinalUpdateID > f.SnapshotDepth.FinalUpdateID {
f.EmitPush(e)
}
f.mu.Unlock()
}
}
// fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type
func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) {
response, err := f.client.NewDepthService().Symbol(f.Symbol).Do(ctx)
if err != nil {
return nil, err
}
event := DepthEvent{
FirstUpdateID: 0,
FinalUpdateID: response.LastUpdateID,
}
for _, entry := range response.Bids {
event.Bids = append(event.Bids, DepthEntry{PriceLevel: entry.Price, Quantity: entry.Quantity})
}
for _, entry := range response.Asks {
event.Asks = append(event.Asks, DepthEntry{PriceLevel: entry.Price, Quantity: entry.Quantity})
}
return &event, nil
}