refactor private stream

This commit is contained in:
c9s 2020-06-29 03:08:13 +08:00
parent 778d046820
commit 338d2443b9
2 changed files with 145 additions and 7 deletions

View File

@ -2,16 +2,154 @@ package bbgo
import ( import (
"context" "context"
"fmt"
"github.com/adshao/go-binance" "github.com/adshao/go-binance"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv" "strconv"
"time" "time"
) )
type SubscribeOptions struct {
Interval string
Depth string
}
func (o SubscribeOptions) String() string {
if len(o.Interval) > 0 {
return o.Interval
}
return o.Depth
}
type Subscription struct {
Symbol string
Channel string
Options SubscribeOptions
}
func (s *Subscription) String() string {
return fmt.Sprintf("%s@%s_%s", s.Symbol, s.Channel, s.Options.String())
}
type StreamCommand struct {
ID int `json:"id,omitempty"`
Method string `json:"method"`
Params []string `json:"params"`
}
type PrivateStream struct {
Client *binance.Client
ListenKey string
Conn *websocket.Conn
Subscriptions []Subscription
}
func (s *PrivateStream) Subscribe(channel string, symbol string, options SubscribeOptions) {
s.Subscriptions = append(s.Subscriptions, Subscription{
Channel: channel,
Symbol: symbol,
Options: options,
})
}
func (s *PrivateStream) Connect(ctx context.Context) error {
url := "wss://stream.binance.com:9443/ws/" + s.ListenKey
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return err
}
log.Infof("[binance] websocket connected")
s.Conn = conn
var params []string
for _, subscription := range s.Subscriptions {
params = append(params, subscription.String())
}
log.Infof("[binance] subscribing channels: %+v", params)
err = conn.WriteJSON(StreamCommand{
Method: "SUBSCRIBE",
Params: params,
ID: 1,
})
if err != nil {
return err
}
return nil
}
func (s *PrivateStream) Read(ctx context.Context, messages chan []byte) {
defer close(messages)
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := s.Client.NewKeepaliveUserStreamService().ListenKey(s.ListenKey).Do(ctx)
if err != nil {
log.WithError(err).Error("listen key keep-alive error", err)
}
default:
if err := s.Conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil {
log.WithError(err).Errorf("set read deadline error", err)
}
_, message, err := s.Conn.ReadMessage()
if err != nil {
log.WithError(err).Errorf("read error", err)
return
}
log.Debugf("[binance] recv: %s", message)
messages <- message
}
}
}
func (s *PrivateStream) Close() error {
log.Infof("[binance] closing user data stream...")
defer s.Conn.Close()
// use background context to close user stream
err := s.Client.NewCloseUserStreamService().ListenKey(s.ListenKey).Do(context.Background())
if err != nil {
log.WithError(err).Error("[binance] error close user data stream")
return err
}
return err
}
type BinanceExchange struct { type BinanceExchange struct {
Client *binance.Client Client *binance.Client
} }
func (e *BinanceExchange) NewPrivateStream(ctx context.Context) (*PrivateStream, error) {
log.Infof("[binance] creating user data stream...")
listenKey, err := e.Client.NewStartUserStreamService().Do(ctx)
if err != nil {
return nil, err
}
log.Infof("[binance] user data stream created. listenKey: %s", listenKey)
return &PrivateStream{
Client: e.Client,
ListenKey: listenKey,
}, nil
}
func (e *BinanceExchange) SubmitOrder(ctx context.Context, order *Order) error { func (e *BinanceExchange) SubmitOrder(ctx context.Context, order *Order) error {
/* /*
limit order example limit order example
@ -140,4 +278,3 @@ func (e *BinanceExchange) QueryTrades(ctx context.Context, market string, startT
return trades, nil return trades, nil
} }

View File

@ -48,7 +48,7 @@ executionReport
"Q": "0.00000000" // Quote Order Qty "Q": "0.00000000" // Quote Order Qty
} }
*/ */
type ExecutionReportEvent struct { type BinanceExecutionReportEvent struct {
EventBase EventBase
Symbol string `json:"s"` Symbol string `json:"s"`
@ -82,7 +82,7 @@ type ExecutionReportEvent struct {
OrderCreationTime int `json:"O"` OrderCreationTime int `json:"O"`
} }
func (e *ExecutionReportEvent) Trade() (*Trade, error) { func (e *BinanceExecutionReportEvent) Trade() (*Trade, error) {
if e.CurrentExecutionType != "TRADE" { if e.CurrentExecutionType != "TRADE" {
return nil, errors.New("execution report is not a trade") return nil, errors.New("execution report is not a trade")
} }
@ -112,7 +112,7 @@ balanceUpdate
"T": 1573200697068 //Clear Time "T": 1573200697068 //Clear Time
} }
*/ */
type BalanceUpdateEvent struct { type BinanceBalanceUpdateEvent struct {
EventBase EventBase
Asset string `json:"a"` Asset string `json:"a"`
@ -173,6 +173,7 @@ type Balance struct {
Free string `json:"f"` Free string `json:"f"`
Locked string `json:"l"` Locked string `json:"l"`
} }
type OutboundAccountInfoEvent struct { type OutboundAccountInfoEvent struct {
EventBase EventBase
@ -191,7 +192,7 @@ type OutboundAccountInfoEvent struct {
Permissions []string `json:"P,omitempty"` Permissions []string `json:"P,omitempty"`
} }
func ParseEvent(message string) (interface{}, error) { func ParseBinanceEvent(message string) (interface{}, error) {
val, err := fastjson.Parse(message) val, err := fastjson.Parse(message)
if err != nil { if err != nil {
return nil, err return nil, err
@ -211,12 +212,12 @@ func ParseEvent(message string) (interface{}, error) {
return &event, err return &event, err
case "balanceUpdate": case "balanceUpdate":
var event BalanceUpdateEvent var event BinanceBalanceUpdateEvent
err := json.Unmarshal([]byte(message), &event) err := json.Unmarshal([]byte(message), &event)
return &event, err return &event, err
case "executionReport": case "executionReport":
var event ExecutionReportEvent var event BinanceExecutionReportEvent
err := json.Unmarshal([]byte(message), &event) err := json.Unmarshal([]byte(message), &event)
return &event, err return &event, err