add signal chan package

This commit is contained in:
c9s 2020-10-03 19:14:15 +08:00
parent 657a145699
commit cea51679ba
5 changed files with 115 additions and 25 deletions

View File

@ -2,6 +2,7 @@ package binance
import ( import (
"context" "context"
"fmt"
"strings" "strings"
"time" "time"
@ -71,7 +72,7 @@ func (s *Stream) connect(ctx context.Context) error {
var params []string var params []string
for _, subscription := range s.Subscriptions { for _, subscription := range s.Subscriptions {
params = append(params, subscription.String()) params = append(params, convertSubscription(subscription))
} }
log.Infof("[binance] subscribing channels: %+v", params) log.Infof("[binance] subscribing channels: %+v", params)
@ -82,6 +83,21 @@ func (s *Stream) connect(ctx context.Context) error {
}) })
} }
func convertSubscription(s types.Subscription) string {
// binance uses lower case symbol name,
// for kline, it's "<symbol>@kline_<interval>"
// for depth, it's "<symbol>@depth OR <symbol>@depth@100ms"
switch s.Channel {
case types.KLineChannel:
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case types.BookChannel:
return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol))
}
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
}
func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) Connect(ctx context.Context) error {
err := s.connect(ctx) err := s.connect(ctx)
if err != nil { if err != nil {

View File

@ -213,7 +213,7 @@ func (s *WebSocketService) Reconnect() {
// Subscribe is a helper method for building subscription request from the internal mapping types. // Subscribe is a helper method for building subscription request from the internal mapping types.
// (Internal public method) // (Internal public method)
func (s *WebSocketService) Subscribe(channel string, market string) { func (s *WebSocketService) Subscribe(channel, market string) {
s.AddSubscription(Subscription{ s.AddSubscription(Subscription{
Channel: channel, Channel: channel,
Market: market, Market: market,

View File

@ -19,10 +19,15 @@ type Stream struct {
func NewStream(key, secret string) *Stream { func NewStream(key, secret string) *Stream {
wss := max.NewWebSocketService(max.WebSocketURL, key, secret) wss := max.NewWebSocketService(max.WebSocketURL, key, secret)
stream := &Stream{ stream := &Stream{
websocketService: wss, websocketService: wss,
} }
wss.OnMessage(func(message []byte) {
logger.Infof("M: %s", message)
})
wss.OnBookEvent(func(e max.BookEvent) { wss.OnBookEvent(func(e max.BookEvent) {
newbook, err := e.OrderBook() newbook, err := e.OrderBook()
if err != nil { if err != nil {
@ -38,11 +43,50 @@ func NewStream(key, secret string) *Stream {
} }
}) })
wss.OnAccountSnapshotEvent(func(e max.AccountSnapshotEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
if err != nil {
continue
}
snapshot[balance.Currency] = *balance
}
stream.EmitBalanceSnapshot(snapshot)
})
wss.OnAccountUpdateEvent(func(e max.AccountUpdateEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
if err != nil {
continue
}
snapshot[balance.Currency] = *balance
}
stream.EmitBalanceUpdate(snapshot)
})
return stream return stream
} }
func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) {
// "book"
switch channel {
case types.KLineChannel:
panic("kline channel is not supported in max")
}
s.websocketService.Subscribe(string(channel), symbol)
}
func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) Connect(ctx context.Context) error {
return nil return s.websocketService.Connect(ctx)
} }
func (s *Stream) Close() error { func (s *Stream) Close() error {

39
bbgo/sigchan/sigchan.go Normal file
View File

@ -0,0 +1,39 @@
package sigchan
import "time"
type Chan chan struct{}
func New(cap int) Chan {
return make(Chan, cap)
}
func (c Chan) Drain(duration, deadline time.Duration) (cnt int) {
cnt = 0
deadlineC := time.After(deadline)
for {
select {
case <-c:
cnt++
case <-deadlineC:
return cnt
case <-time.After(duration):
return cnt
}
}
}
func (c Chan) Emit() {
select {
case c <- struct{}{}:
default:
}
}
func (c Chan) Close() {
close(c)
}

View File

@ -2,18 +2,23 @@ package types
import ( import (
"context" "context"
"fmt"
"strings"
) )
type PrivateStream interface { type PrivateStream interface {
StandardStreamEventHub StandardStreamEventHub
Subscribe(channel string, symbol string, options SubscribeOptions) Subscribe(channel Channel, symbol string, options SubscribeOptions)
Connect(ctx context.Context) error Connect(ctx context.Context) error
Close() error Close() error
} }
type Channel string
var BookChannel = Channel("book")
var KLineChannel = Channel("kline")
//go:generate callbackgen -type StandardStream -interface //go:generate callbackgen -type StandardStream -interface
type StandardStream struct { type StandardStream struct {
Subscriptions []Subscription Subscriptions []Subscription
@ -33,7 +38,7 @@ type StandardStream struct {
bookSnapshotCallbacks []func(book OrderBook) bookSnapshotCallbacks []func(book OrderBook)
} }
func (stream *StandardStream) Subscribe(channel string, symbol string, options SubscribeOptions) { func (stream *StandardStream) Subscribe(channel Channel, symbol string, options SubscribeOptions) {
stream.Subscriptions = append(stream.Subscriptions, Subscription{ stream.Subscriptions = append(stream.Subscriptions, Subscription{
Channel: channel, Channel: channel,
Symbol: symbol, Symbol: symbol,
@ -41,6 +46,7 @@ func (stream *StandardStream) Subscribe(channel string, symbol string, options S
}) })
} }
// SubscribeOptions provides the standard stream options
type SubscribeOptions struct { type SubscribeOptions struct {
Interval string Interval string
Depth string Depth string
@ -56,21 +62,6 @@ func (o SubscribeOptions) String() string {
type Subscription struct { type Subscription struct {
Symbol string Symbol string
Channel string Channel Channel
Options SubscribeOptions Options SubscribeOptions
} }
func (s *Subscription) String() string {
// binance uses lower case symbol name,
// for kline, it's "<symbol>@kline_<interval>"
// for depth, it's "<symbol>@depth OR <symbol>@depth@100ms"
switch s.Channel {
case "kline":
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case "depth", "book":
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
default:
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
}
}