rename private stream to just stream

This commit is contained in:
c9s 2020-10-03 18:35:28 +08:00
parent 9dd8b026e6
commit 657a145699
9 changed files with 235 additions and 257 deletions

View File

@ -46,7 +46,7 @@ func (e *Exchange) NewPrivateStream() (types.PrivateStream, error) {
return NewPrivateStream(e.Client)
}
func NewPrivateStream(client *binance.Client) (*PrivateStream, error) {
func NewPrivateStream(client *binance.Client) (*Stream, error) {
// binance BalanceUpdate = withdrawal or deposit changes
/*
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
@ -61,7 +61,7 @@ func NewPrivateStream(client *binance.Client) (*PrivateStream, error) {
})
*/
return &PrivateStream{
return &Stream{
Client: client,
}, nil
}

View File

@ -1,195 +0,0 @@
// Code generated by "callbackgen -type PrivateStream -interface"; DO NOT EDIT.
package binance
import (
"reflect"
)
func (s *PrivateStream) OnConnect(cb func(stream *PrivateStream)) {
s.connectCallbacks = append(s.connectCallbacks, cb)
}
func (s *PrivateStream) EmitConnect(stream *PrivateStream) {
for _, cb := range s.connectCallbacks {
cb(stream)
}
}
func (s *PrivateStream) RemoveOnConnect(needle func(stream *PrivateStream)) (found bool) {
var newcallbacks []func(stream *PrivateStream)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.connectCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.connectCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnKLineEvent(cb func(event *KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
func (s *PrivateStream) EmitKLineEvent(event *KLineEvent) {
for _, cb := range s.kLineEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnKLineEvent(needle func(event *KLineEvent)) (found bool) {
var newcallbacks []func(event *KLineEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.kLineEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.kLineEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnKLineClosedEvent(cb func(event *KLineEvent)) {
s.kLineClosedEventCallbacks = append(s.kLineClosedEventCallbacks, cb)
}
func (s *PrivateStream) EmitKLineClosedEvent(event *KLineEvent) {
for _, cb := range s.kLineClosedEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnKLineClosedEvent(needle func(event *KLineEvent)) (found bool) {
var newcallbacks []func(event *KLineEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.kLineClosedEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.kLineClosedEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) {
s.balanceUpdateEventCallbacks = append(s.balanceUpdateEventCallbacks, cb)
}
func (s *PrivateStream) EmitBalanceUpdateEvent(event *BalanceUpdateEvent) {
for _, cb := range s.balanceUpdateEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnBalanceUpdateEvent(needle func(event *BalanceUpdateEvent)) (found bool) {
var newcallbacks []func(event *BalanceUpdateEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.balanceUpdateEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.balanceUpdateEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent)) {
s.outboundAccountInfoEventCallbacks = append(s.outboundAccountInfoEventCallbacks, cb)
}
func (s *PrivateStream) EmitOutboundAccountInfoEvent(event *OutboundAccountInfoEvent) {
for _, cb := range s.outboundAccountInfoEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnOutboundAccountInfoEvent(needle func(event *OutboundAccountInfoEvent)) (found bool) {
var newcallbacks []func(event *OutboundAccountInfoEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.outboundAccountInfoEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.outboundAccountInfoEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnExecutionReportEvent(cb func(event *ExecutionReportEvent)) {
s.executionReportEventCallbacks = append(s.executionReportEventCallbacks, cb)
}
func (s *PrivateStream) EmitExecutionReportEvent(event *ExecutionReportEvent) {
for _, cb := range s.executionReportEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnExecutionReportEvent(needle func(event *ExecutionReportEvent)) (found bool) {
var newcallbacks []func(event *ExecutionReportEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.executionReportEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.executionReportEventCallbacks = newcallbacks
}
return found
}
type PrivateStreamEventHub interface {
OnConnect(cb func(stream *PrivateStream))
OnKLineEvent(cb func(event *KLineEvent))
OnKLineClosedEvent(cb func(event *KLineEvent))
OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent))
OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent))
OnExecutionReportEvent(cb func(event *ExecutionReportEvent))
}

View File

@ -20,15 +20,15 @@ type StreamRequest struct {
Params []string `json:"params"`
}
//go:generate callbackgen -type PrivateStream -interface
type PrivateStream struct {
types.StandardPrivateStream
//go:generate callbackgen -type Stream -interface
type Stream struct {
types.StandardStream
Client *binance.Client
ListenKey string
Conn *websocket.Conn
connectCallbacks []func(stream *PrivateStream)
connectCallbacks []func(stream *Stream)
// custom callbacks
kLineEventCallbacks []func(e *KLineEvent)
@ -39,7 +39,7 @@ type PrivateStream struct {
executionReportEventCallbacks []func(event *ExecutionReportEvent)
}
func (s *PrivateStream) dial(listenKey string) (*websocket.Conn, error) {
func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
url := "wss://stream.binance.com:9443/ws/" + listenKey
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
@ -49,7 +49,7 @@ func (s *PrivateStream) dial(listenKey string) (*websocket.Conn, error) {
return conn, nil
}
func (s *PrivateStream) connect(ctx context.Context) error {
func (s *Stream) connect(ctx context.Context) error {
log.Infof("[binance] creating user data stream...")
listenKey, err := s.Client.NewStartUserStreamService().Do(ctx)
if err != nil {
@ -82,7 +82,7 @@ func (s *PrivateStream) connect(ctx context.Context) error {
})
}
func (s *PrivateStream) Connect(ctx context.Context) error {
func (s *Stream) Connect(ctx context.Context) error {
err := s.connect(ctx)
if err != nil {
return err
@ -92,7 +92,7 @@ func (s *PrivateStream) Connect(ctx context.Context) error {
return nil
}
func (s *PrivateStream) read(ctx context.Context) {
func (s *Stream) read(ctx context.Context) {
pingTicker := time.NewTicker(1 * time.Minute)
defer pingTicker.Stop()
@ -210,7 +210,7 @@ func (s *PrivateStream) read(ctx context.Context) {
}
}
func (s *PrivateStream) invalidateListenKey(ctx context.Context, listenKey string) error {
func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) error {
// use background context to invalidate the user stream
err := s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx)
if err != nil {
@ -221,7 +221,7 @@ func (s *PrivateStream) invalidateListenKey(ctx context.Context, listenKey strin
return nil
}
func (s *PrivateStream) Close() error {
func (s *Stream) Close() error {
log.Infof("[binance] closing user data stream...")
defer s.Conn.Close()
err := s.invalidateListenKey(context.Background(), s.ListenKey)

View File

@ -0,0 +1,79 @@
// Code generated by "callbackgen -type Stream -interface"; DO NOT EDIT.
package binance
import ()
func (s *Stream) OnConnect(cb func(stream *Stream)) {
s.connectCallbacks = append(s.connectCallbacks, cb)
}
func (s *Stream) EmitConnect(stream *Stream) {
for _, cb := range s.connectCallbacks {
cb(stream)
}
}
func (s *Stream) OnKLineEvent(cb func(e *KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
func (s *Stream) EmitKLineEvent(e *KLineEvent) {
for _, cb := range s.kLineEventCallbacks {
cb(e)
}
}
func (s *Stream) OnKLineClosedEvent(cb func(e *KLineEvent)) {
s.kLineClosedEventCallbacks = append(s.kLineClosedEventCallbacks, cb)
}
func (s *Stream) EmitKLineClosedEvent(e *KLineEvent) {
for _, cb := range s.kLineClosedEventCallbacks {
cb(e)
}
}
func (s *Stream) OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) {
s.balanceUpdateEventCallbacks = append(s.balanceUpdateEventCallbacks, cb)
}
func (s *Stream) EmitBalanceUpdateEvent(event *BalanceUpdateEvent) {
for _, cb := range s.balanceUpdateEventCallbacks {
cb(event)
}
}
func (s *Stream) OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent)) {
s.outboundAccountInfoEventCallbacks = append(s.outboundAccountInfoEventCallbacks, cb)
}
func (s *Stream) EmitOutboundAccountInfoEvent(event *OutboundAccountInfoEvent) {
for _, cb := range s.outboundAccountInfoEventCallbacks {
cb(event)
}
}
func (s *Stream) OnExecutionReportEvent(cb func(event *ExecutionReportEvent)) {
s.executionReportEventCallbacks = append(s.executionReportEventCallbacks, cb)
}
func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) {
for _, cb := range s.executionReportEventCallbacks {
cb(event)
}
}
type StreamEventHub interface {
OnConnect(cb func(stream *Stream))
OnKLineEvent(cb func(e *KLineEvent))
OnKLineClosedEvent(cb func(e *KLineEvent))
OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent))
OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent))
OnExecutionReportEvent(cb func(event *ExecutionReportEvent))
}

View File

@ -1 +1,50 @@
package max
import (
"context"
log "github.com/sirupsen/logrus"
max "github.com/c9s/bbgo/pkg/bbgo/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/bbgo/types"
)
var logger = log.WithField("exchange", "max")
type Stream struct {
types.StandardStream
websocketService *max.WebSocketService
}
func NewStream(key, secret string) *Stream {
wss := max.NewWebSocketService(max.WebSocketURL, key, secret)
stream := &Stream{
websocketService: wss,
}
wss.OnBookEvent(func(e max.BookEvent) {
newbook, err := e.OrderBook()
if err != nil {
logger.WithError(err).Error("book convert error")
return
}
switch e.Event {
case "snapshot":
stream.EmitBookSnapshot(newbook)
case "update":
stream.EmitBookUpdate(newbook)
}
})
return stream
}
func (s *Stream) Connect(ctx context.Context) error {
return nil
}
func (s *Stream) Close() error {
return s.websocketService.Close()
}

View File

@ -12,7 +12,7 @@ import (
)
type BackTestStream struct {
types.StandardPrivateStream
types.StandardStream
}

View File

@ -1,43 +0,0 @@
// Code generated by "callbackgen -type StandardPrivateStream -interface"; DO NOT EDIT.
package types
import ()
func (stream *StandardPrivateStream) OnTrade(cb func(trade *Trade)) {
stream.tradeCallbacks = append(stream.tradeCallbacks, cb)
}
func (stream *StandardPrivateStream) EmitTrade(trade *Trade) {
for _, cb := range stream.tradeCallbacks {
cb(trade)
}
}
func (stream *StandardPrivateStream) OnBalanceSnapshot(cb func(balanceSnapshot map[string]Balance)) {
stream.balanceSnapshotCallbacks = append(stream.balanceSnapshotCallbacks, cb)
}
func (stream *StandardPrivateStream) EmitBalanceSnapshot(balanceSnapshot map[string]Balance) {
for _, cb := range stream.balanceSnapshotCallbacks {
cb(balanceSnapshot)
}
}
func (stream *StandardPrivateStream) OnKLineClosed(cb func(kline KLine)) {
stream.kLineClosedCallbacks = append(stream.kLineClosedCallbacks, cb)
}
func (stream *StandardPrivateStream) EmitKLineClosed(kline KLine) {
for _, cb := range stream.kLineClosedCallbacks {
cb(kline)
}
}
type StandardPrivateStreamEventHub interface {
OnTrade(cb func(trade *Trade))
OnBalanceSnapshot(cb func(balanceSnapshot map[string]Balance))
OnKLineClosed(cb func(kline KLine))
}

View File

@ -0,0 +1,79 @@
// Code generated by "callbackgen -type StandardStream -interface"; DO NOT EDIT.
package types
import ()
func (stream *StandardStream) OnTrade(cb func(trade *Trade)) {
stream.tradeCallbacks = append(stream.tradeCallbacks, cb)
}
func (stream *StandardStream) EmitTrade(trade *Trade) {
for _, cb := range stream.tradeCallbacks {
cb(trade)
}
}
func (stream *StandardStream) OnBalanceSnapshot(cb func(balances map[string]Balance)) {
stream.balanceSnapshotCallbacks = append(stream.balanceSnapshotCallbacks, cb)
}
func (stream *StandardStream) EmitBalanceSnapshot(balances map[string]Balance) {
for _, cb := range stream.balanceSnapshotCallbacks {
cb(balances)
}
}
func (stream *StandardStream) OnBalanceUpdate(cb func(balances map[string]Balance)) {
stream.balanceUpdateCallbacks = append(stream.balanceUpdateCallbacks, cb)
}
func (stream *StandardStream) EmitBalanceUpdate(balances map[string]Balance) {
for _, cb := range stream.balanceUpdateCallbacks {
cb(balances)
}
}
func (stream *StandardStream) OnKLineClosed(cb func(kline KLine)) {
stream.kLineClosedCallbacks = append(stream.kLineClosedCallbacks, cb)
}
func (stream *StandardStream) EmitKLineClosed(kline KLine) {
for _, cb := range stream.kLineClosedCallbacks {
cb(kline)
}
}
func (stream *StandardStream) OnBookUpdate(cb func(book OrderBook)) {
stream.bookUpdateCallbacks = append(stream.bookUpdateCallbacks, cb)
}
func (stream *StandardStream) EmitBookUpdate(book OrderBook) {
for _, cb := range stream.bookUpdateCallbacks {
cb(book)
}
}
func (stream *StandardStream) OnBookSnapshot(cb func(book OrderBook)) {
stream.bookSnapshotCallbacks = append(stream.bookSnapshotCallbacks, cb)
}
func (stream *StandardStream) EmitBookSnapshot(book OrderBook) {
for _, cb := range stream.bookSnapshotCallbacks {
cb(book)
}
}
type StandardStreamEventHub interface {
OnTrade(cb func(trade *Trade))
OnBalanceSnapshot(cb func(balances map[string]Balance))
OnBalanceUpdate(cb func(balances map[string]Balance))
OnKLineClosed(cb func(kline KLine))
OnBookUpdate(cb func(book OrderBook))
OnBookSnapshot(cb func(book OrderBook))
}

View File

@ -7,23 +7,33 @@ import (
)
type PrivateStream interface {
StandardPrivateStreamEventHub
StandardStreamEventHub
Subscribe(channel string, symbol string, options SubscribeOptions)
Connect(ctx context.Context) error
Close() error
}
//go:generate callbackgen -type StandardPrivateStream -interface
type StandardPrivateStream struct {
//go:generate callbackgen -type StandardStream -interface
type StandardStream struct {
Subscriptions []Subscription
// private trade callbacks
tradeCallbacks []func(trade *Trade)
balanceSnapshotCallbacks []func(balanceSnapshot map[string]Balance)
// balance snapshot callbacks
balanceSnapshotCallbacks []func(balances map[string]Balance)
balanceUpdateCallbacks []func(balances map[string]Balance)
kLineClosedCallbacks []func(kline KLine)
bookUpdateCallbacks []func(book OrderBook)
bookSnapshotCallbacks []func(book OrderBook)
}
func (stream *StandardPrivateStream) Subscribe(channel string, symbol string, options SubscribeOptions) {
func (stream *StandardStream) Subscribe(channel string, symbol string, options SubscribeOptions) {
stream.Subscriptions = append(stream.Subscriptions, Subscription{
Channel: channel,
Symbol: symbol,
@ -31,7 +41,6 @@ func (stream *StandardPrivateStream) Subscribe(channel string, symbol string, op
})
}
type SubscribeOptions struct {
Interval string
Depth string