From 30a5193923ffc33f9a86363208e303b2dbeb64b2 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 3 Oct 2020 19:38:35 +0800 Subject: [PATCH] refactor binance event passing and convertion --- bbgo/exchange/binance/exchange.go | 22 +------- bbgo/exchange/binance/stream.go | 85 +++++++++++++++++++++---------- 2 files changed, 58 insertions(+), 49 deletions(-) diff --git a/bbgo/exchange/binance/exchange.go b/bbgo/exchange/binance/exchange.go index 24399e392..aa89fa236 100644 --- a/bbgo/exchange/binance/exchange.go +++ b/bbgo/exchange/binance/exchange.go @@ -43,27 +43,7 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6 } func (e *Exchange) NewPrivateStream() (types.PrivateStream, error) { - return NewPrivateStream(e.Client) -} - -func NewPrivateStream(client *binance.Client) (*Stream, error) { - // binance BalanceUpdate = withdrawal or deposit changes - /* - stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) { - a.mu.Lock() - defer a.mu.Unlock() - - delta := util.MustParseFloat(e.Delta) - if balance, ok := a.Balances[e.Asset]; ok { - balance.Available += delta - a.Balances[e.Asset] = balance - } - }) - */ - - return &Stream{ - Client: client, - }, nil + return NewStream(e.Client) } type Withdraw struct { diff --git a/bbgo/exchange/binance/stream.go b/bbgo/exchange/binance/stream.go index 12fa6b69e..08d8720f9 100644 --- a/bbgo/exchange/binance/stream.go +++ b/bbgo/exchange/binance/stream.go @@ -40,6 +40,61 @@ type Stream struct { executionReportEventCallbacks []func(event *ExecutionReportEvent) } +func NewStream(client *binance.Client) (*Stream, error) { + // binance BalanceUpdate = withdrawal or deposit changes + /* + stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) { + a.mu.Lock() + defer a.mu.Unlock() + + delta := util.MustParseFloat(e.Delta) + if balance, ok := a.Balances[e.Asset]; ok { + balance.Available += delta + a.Balances[e.Asset] = balance + } + }) + */ + + stream := &Stream{ + Client: client, + } + + stream.OnOutboundAccountInfoEvent(func(e *OutboundAccountInfoEvent) { + snapshot := map[string]types.Balance{} + for _, balance := range e.Balances { + available := util.MustParseFloat(balance.Free) + locked := util.MustParseFloat(balance.Locked) + snapshot[balance.Asset] = types.Balance{ + Currency: balance.Asset, + Available: available, + Locked: locked, + } + } + stream.EmitBalanceSnapshot(snapshot) + }) + + stream.OnKLineEvent(func(e *KLineEvent) { + if e.KLine.Closed { + stream.EmitKLineClosedEvent(e) + stream.EmitKLineClosed(e.KLine.KLine()) + } + }) + + stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) { + switch e.CurrentExecutionType { + case "TRADE": + trade, err := e.Trade() + if err != nil { + break + } + stream.EmitTrade(trade) + } + }) + + return stream, nil +} + + func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { url := "wss://stream.binance.com:9443/ws/" + listenKey conn, _, err := websocket.DefaultDialer.Dial(url, nil) @@ -176,25 +231,12 @@ func (s *Stream) read(ctx context.Context) { } // log.Notify("[binance] event: %+v", e) - switch e := e.(type) { case *OutboundAccountInfoEvent: log.Info(e.Event, " ", e.Balances) s.EmitOutboundAccountInfoEvent(e) - snapshot := map[string]types.Balance{} - for _, balance := range e.Balances { - available := util.MustParseFloat(balance.Free) - locked := util.MustParseFloat(balance.Locked) - snapshot[balance.Asset] = types.Balance{ - Currency: balance.Asset, - Available: available, - Locked: locked, - } - } - s.EmitBalanceSnapshot(snapshot) - case *BalanceUpdateEvent: log.Info(e.Event, " ", e.Asset, " ", e.Delta) s.EmitBalanceUpdateEvent(e) @@ -203,24 +245,9 @@ func (s *Stream) read(ctx context.Context) { log.Info(e.Event, " ", e.KLine, " ", e.KLine.Interval) s.EmitKLineEvent(e) - if e.KLine.Closed { - s.EmitKLineClosedEvent(e) - s.EmitKLineClosed(e.KLine.KLine()) - } - case *ExecutionReportEvent: log.Info(e.Event, " ", e) - s.EmitExecutionReportEvent(e) - - switch e.CurrentExecutionType { - case "TRADE": - trade, err := e.Trade() - if err != nil { - break - } - s.EmitTrade(trade) - } } } } @@ -250,3 +277,5 @@ func maskListenKey(listenKey string) string { maskKey := listenKey[0:5] return maskKey + strings.Repeat("*", len(listenKey)-1-5) } + +