refactor binance event passing and convertion

This commit is contained in:
c9s 2020-10-03 19:38:35 +08:00
parent 46fa720f6a
commit 30a5193923
2 changed files with 58 additions and 49 deletions

View File

@ -43,27 +43,7 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6
}
func (e *Exchange) NewPrivateStream() (types.PrivateStream, error) {
return NewPrivateStream(e.Client)
}
func NewPrivateStream(client *binance.Client) (*Stream, error) {
// binance BalanceUpdate = withdrawal or deposit changes
/*
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
a.mu.Lock()
defer a.mu.Unlock()
delta := util.MustParseFloat(e.Delta)
if balance, ok := a.Balances[e.Asset]; ok {
balance.Available += delta
a.Balances[e.Asset] = balance
}
})
*/
return &Stream{
Client: client,
}, nil
return NewStream(e.Client)
}
type Withdraw struct {

View File

@ -40,6 +40,61 @@ type Stream struct {
executionReportEventCallbacks []func(event *ExecutionReportEvent)
}
func NewStream(client *binance.Client) (*Stream, error) {
// binance BalanceUpdate = withdrawal or deposit changes
/*
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
a.mu.Lock()
defer a.mu.Unlock()
delta := util.MustParseFloat(e.Delta)
if balance, ok := a.Balances[e.Asset]; ok {
balance.Available += delta
a.Balances[e.Asset] = balance
}
})
*/
stream := &Stream{
Client: client,
}
stream.OnOutboundAccountInfoEvent(func(e *OutboundAccountInfoEvent) {
snapshot := map[string]types.Balance{}
for _, balance := range e.Balances {
available := util.MustParseFloat(balance.Free)
locked := util.MustParseFloat(balance.Locked)
snapshot[balance.Asset] = types.Balance{
Currency: balance.Asset,
Available: available,
Locked: locked,
}
}
stream.EmitBalanceSnapshot(snapshot)
})
stream.OnKLineEvent(func(e *KLineEvent) {
if e.KLine.Closed {
stream.EmitKLineClosedEvent(e)
stream.EmitKLineClosed(e.KLine.KLine())
}
})
stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) {
switch e.CurrentExecutionType {
case "TRADE":
trade, err := e.Trade()
if err != nil {
break
}
stream.EmitTrade(trade)
}
})
return stream, nil
}
func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
url := "wss://stream.binance.com:9443/ws/" + listenKey
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
@ -176,25 +231,12 @@ func (s *Stream) read(ctx context.Context) {
}
// log.Notify("[binance] event: %+v", e)
switch e := e.(type) {
case *OutboundAccountInfoEvent:
log.Info(e.Event, " ", e.Balances)
s.EmitOutboundAccountInfoEvent(e)
snapshot := map[string]types.Balance{}
for _, balance := range e.Balances {
available := util.MustParseFloat(balance.Free)
locked := util.MustParseFloat(balance.Locked)
snapshot[balance.Asset] = types.Balance{
Currency: balance.Asset,
Available: available,
Locked: locked,
}
}
s.EmitBalanceSnapshot(snapshot)
case *BalanceUpdateEvent:
log.Info(e.Event, " ", e.Asset, " ", e.Delta)
s.EmitBalanceUpdateEvent(e)
@ -203,24 +245,9 @@ func (s *Stream) read(ctx context.Context) {
log.Info(e.Event, " ", e.KLine, " ", e.KLine.Interval)
s.EmitKLineEvent(e)
if e.KLine.Closed {
s.EmitKLineClosedEvent(e)
s.EmitKLineClosed(e.KLine.KLine())
}
case *ExecutionReportEvent:
log.Info(e.Event, " ", e)
s.EmitExecutionReportEvent(e)
switch e.CurrentExecutionType {
case "TRADE":
trade, err := e.Trade()
if err != nil {
break
}
s.EmitTrade(trade)
}
}
}
}
@ -250,3 +277,5 @@ func maskListenKey(listenKey string) string {
maskKey := listenKey[0:5]
return maskKey + strings.Repeat("*", len(listenKey)-1-5)
}