support binance depth event parsing

This commit is contained in:
c9s 2020-10-03 20:09:22 +08:00
parent 9d26adb245
commit 1bbed2e477
10 changed files with 118 additions and 27 deletions

View File

@ -24,7 +24,7 @@ func LoadAccount(ctx context.Context, exchange *binance.Exchange) (*Account, err
}, err
}
func (a *Account) BindPrivateStream(stream types.PrivateStream) {
func (a *Account) BindPrivateStream(stream types.Stream) {
stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) {
a.mu.Lock()
defer a.mu.Unlock()

View File

@ -42,7 +42,7 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6
return util.MustParseFloat(resp.Price), nil
}
func (e *Exchange) NewPrivateStream() (types.PrivateStream, error) {
func (e *Exchange) NewStream() types.Stream {
return NewStream(e.Client)
}

View File

@ -232,6 +232,9 @@ func ParseEvent(message string) (interface{}, error) {
err := json.Unmarshal([]byte(message), &event)
return &event, err
case "depthUpdate":
return parseDepthEvent(val)
default:
id := val.GetInt("id")
if id > 0 {
@ -242,6 +245,73 @@ func ParseEvent(message string) (interface{}, error) {
return nil, fmt.Errorf("unsupported message: %s", message)
}
type DepthEntry struct {
PriceLevel string
Quantity string
}
type DepthEvent struct {
EventBase
Symbol string `json:"s"`
FirstUpdateID int64 `json:"U"`
FinalUpdateID int64 `json:"u"`
Bids []DepthEntry
Asks []DepthEntry
}
func parseDepthEntry(val *fastjson.Value) (*DepthEntry, error) {
arr, err := val.Array()
if err != nil {
return nil, err
}
if len(arr) < 2 {
return nil, errors.New("incorrect depth entry element length")
}
return &DepthEntry{
PriceLevel: string(arr[0].GetStringBytes()),
Quantity: string(arr[1].GetStringBytes()),
}, nil
}
func parseDepthEvent(val *fastjson.Value) (*DepthEvent, error) {
var err error
var depth = &DepthEvent{
EventBase: EventBase{
Event: string(val.GetStringBytes("e")),
Time: val.GetInt64("E"),
},
Symbol: string(val.GetStringBytes("s")),
FirstUpdateID: val.GetInt64("U"),
FinalUpdateID: val.GetInt64("u"),
}
for _, ev := range val.GetArray("b") {
entry, err2 := parseDepthEntry(ev)
if err2 != nil {
err = err2
continue
}
depth.Bids = append(depth.Bids, *entry)
}
for _, ev := range val.GetArray("a") {
entry, err2 := parseDepthEntry(ev)
if err2 != nil {
err = err2
continue
}
depth.Asks = append(depth.Asks, *entry)
}
return depth, err
}
type KLine struct {
StartTime int64 `json:"t"`
EndTime int64 `json:"T"`

View File

@ -32,6 +32,7 @@ type Stream struct {
connectCallbacks []func(stream *Stream)
// custom callbacks
depthEventCallbacks []func(e *DepthEvent)
kLineEventCallbacks []func(e *KLineEvent)
kLineClosedEventCallbacks []func(e *KLineEvent)
@ -40,7 +41,7 @@ type Stream struct {
executionReportEventCallbacks []func(event *ExecutionReportEvent)
}
func NewStream(client *binance.Client) (*Stream, error) {
func NewStream(client *binance.Client) *Stream {
// binance BalanceUpdate = withdrawal or deposit changes
/*
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
@ -91,9 +92,26 @@ func NewStream(client *binance.Client) (*Stream, error) {
}
})
return stream, nil
stream.OnConnect(func(stream *Stream) {
var params []string
for _, subscription := range stream.Subscriptions {
params = append(params, convertSubscription(subscription))
}
log.Infof("[binance] subscribing channels: %+v", params)
err := stream.Conn.WriteJSON(StreamRequest{
Method: "SUBSCRIBE",
Params: params,
ID: 1,
})
if err != nil {
log.WithError(err).Error("subscribe error")
}
})
return stream
}
func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
url := "wss://stream.binance.com:9443/ws/" + listenKey
@ -124,18 +142,7 @@ func (s *Stream) connect(ctx context.Context) error {
s.Conn = conn
s.EmitConnect(s)
var params []string
for _, subscription := range s.Subscriptions {
params = append(params, convertSubscription(subscription))
}
log.Infof("[binance] subscribing channels: %+v", params)
return conn.WriteJSON(StreamRequest{
Method: "SUBSCRIBE",
Params: params,
ID: 1,
})
return nil
}
func convertSubscription(s types.Subscription) string {
@ -245,6 +252,10 @@ func (s *Stream) read(ctx context.Context) {
log.Info(e.Event, " ", e.KLine, " ", e.KLine.Interval)
s.EmitKLineEvent(e)
case *DepthEvent:
log.Info(e.Event, " ", "asks:", e.Asks, "bids:", e.Bids)
s.EmitDepthEvent(e)
case *ExecutionReportEvent:
log.Info(e.Event, " ", e)
s.EmitExecutionReportEvent(e)
@ -277,5 +288,3 @@ func maskListenKey(listenKey string) string {
maskKey := listenKey[0:5]
return maskKey + strings.Repeat("*", len(listenKey)-1-5)
}

View File

@ -14,6 +14,16 @@ func (s *Stream) EmitConnect(stream *Stream) {
}
}
func (s *Stream) OnDepthEvent(cb func(e *DepthEvent)) {
s.depthEventCallbacks = append(s.depthEventCallbacks, cb)
}
func (s *Stream) EmitDepthEvent(e *DepthEvent) {
for _, cb := range s.depthEventCallbacks {
cb(e)
}
}
func (s *Stream) OnKLineEvent(cb func(e *KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
@ -67,6 +77,8 @@ func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) {
type StreamEventHub interface {
OnConnect(cb func(stream *Stream))
OnDepthEvent(cb func(e *DepthEvent))
OnKLineEvent(cb func(e *KLineEvent))
OnKLineClosedEvent(cb func(e *KLineEvent))

View File

@ -28,7 +28,7 @@ func NewMarketDataStore() *MarketDataStore {
}
}
func (store *MarketDataStore) BindPrivateStream(stream types.PrivateStream) {
func (store *MarketDataStore) BindPrivateStream(stream types.Stream) {
stream.OnKLineClosed(store.handleKLineClosed)
}

View File

@ -20,7 +20,7 @@ import (
// MarketStrategy represents the single Exchange strategy
type MarketStrategy interface {
OnLoad(tradingContext *Context, trader types.Trader) error
OnNewStream(stream types.PrivateStream) error
OnNewStream(stream types.Stream) error
}
type ExchangeSession struct {
@ -28,7 +28,7 @@ type ExchangeSession struct {
Account *Account
Stream types.PrivateStream
Stream types.Stream
Subscriptions []types.Subscription
@ -181,7 +181,7 @@ func (trader *Trader) Connect(ctx context.Context) (err error) {
return err
}
session.Stream, err = session.Exchange.NewPrivateStream()
session.Stream = session.Exchange.NewStream()
if err != nil {
return err
}
@ -354,7 +354,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy MarketStrategy)
return nil, err
}
stream, err := trader.Exchange.NewPrivateStream()
stream := trader.Exchange.NewStream()
if err != nil {
return nil, err
}

View File

@ -8,7 +8,7 @@ import (
type Exchange interface {
PlatformFeeCurrency() string
NewPrivateStream() (PrivateStream, error)
NewStream() Stream
QueryAccountBalances(ctx context.Context) (map[string]Balance, error)

View File

@ -227,7 +227,7 @@ func NewStreamBook(symbol string) *StreamOrderBook {
}
}
func (sb *StreamOrderBook) BindStream(stream PrivateStream) {
func (sb *StreamOrderBook) BindStream(stream Stream) {
stream.OnBookSnapshot(func(book OrderBook) {
sb.Load(book)
sb.C.Emit()

View File

@ -4,7 +4,7 @@ import (
"context"
)
type PrivateStream interface {
type Stream interface {
StandardStreamEventHub
Subscribe(channel Channel, symbol string, options SubscribeOptions)