implement standard private stream support for regression trader

This commit is contained in:
c9s 2020-07-15 21:02:08 +08:00
parent 7d035d75ea
commit 9cf95b7c26
6 changed files with 203 additions and 84 deletions

View File

@ -3,7 +3,6 @@
package binance
import (
"github.com/c9s/bbgo/pkg/bbgo/types"
"reflect"
)
@ -36,35 +35,6 @@ func (s *PrivateStream) RemoveOnConnect(needle func(stream *PrivateStream)) (fou
return found
}
func (s *PrivateStream) OnTrade(cb func(trade *types.Trade)) {
s.tradeCallbacks = append(s.tradeCallbacks, cb)
}
func (s *PrivateStream) EmitTrade(trade *types.Trade) {
for _, cb := range s.tradeCallbacks {
cb(trade)
}
}
func (s *PrivateStream) RemoveOnTrade(needle func(trade *types.Trade)) (found bool) {
var newcallbacks []func(trade *types.Trade)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.tradeCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.tradeCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnKLineEvent(cb func(event *KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
@ -213,8 +183,6 @@ func (s *PrivateStream) RemoveOnExecutionReportEvent(needle func(event *Executio
type PrivateStreamEventHub interface {
OnConnect(cb func(stream *PrivateStream))
OnTrade(cb func(trade *types.Trade))
OnKLineEvent(cb func(event *KLineEvent))
OnKLineClosedEvent(cb func(event *KLineEvent))

View File

@ -2,8 +2,7 @@ package binance
import (
"context"
"fmt"
"strings"
"github.com/c9s/bbgo/pkg/util"
"time"
"github.com/adshao/go-binance"
@ -13,30 +12,6 @@ import (
"github.com/c9s/bbgo/pkg/bbgo/types"
)
type SubscribeOptions struct {
Interval string
Depth string
}
func (o SubscribeOptions) String() string {
if len(o.Interval) > 0 {
return o.Interval
}
return o.Depth
}
type Subscription struct {
Symbol string
Channel string
Options SubscribeOptions
}
func (s *Subscription) String() string {
// binance uses lower case symbol name
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
}
type StreamRequest struct {
// request ID is required
ID int `json:"id"`
@ -46,13 +21,13 @@ type StreamRequest struct {
//go:generate callbackgen -type PrivateStream -interface
type PrivateStream struct {
types.StandardPrivateStream
Client *binance.Client
ListenKey string
Conn *websocket.Conn
Subscriptions []Subscription
connectCallbacks []func(stream *PrivateStream)
tradeCallbacks []func(trade *types.Trade)
// custom callbacks
kLineEventCallbacks []func(event *KLineEvent)
@ -63,13 +38,6 @@ type PrivateStream struct {
executionReportEventCallbacks []func(event *ExecutionReportEvent)
}
func (s *PrivateStream) Subscribe(channel string, symbol string, options SubscribeOptions) {
s.Subscriptions = append(s.Subscriptions, Subscription{
Channel: channel,
Symbol: symbol,
Options: options,
})
}
func (s *PrivateStream) dial(listenKey string) (*websocket.Conn, error) {
url := "wss://stream.binance.com:9443/ws/" + listenKey
@ -191,6 +159,18 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
log.Info(e.Event, " ", e.Balances)
s.EmitOutboundAccountInfoEvent(e)
snapshot := map[string]types.Balance{}
for _, balance := range e.Balances {
available := util.MustParseFloat(balance.Free)
locked := util.MustParseFloat(balance.Locked)
snapshot[balance.Asset] = types.Balance{
Currency: balance.Asset,
Available: available,
Locked: locked,
}
}
s.EmitBalanceSnapshot(snapshot)
case *BalanceUpdateEvent:
log.Info(e.Event, " ", e.Asset, " ", e.Delta)
s.EmitBalanceUpdateEvent(e)
@ -201,6 +181,7 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
if e.KLine.Closed {
s.EmitKLineClosedEvent(e)
s.EmitKLineClosed(e.KLine)
}
case *ExecutionReportEvent:

View File

@ -12,19 +12,38 @@ import (
)
type Strategy interface {
Init(trader *Trader) error
OnNewStream(stream *binance.PrivateStream) error
Init(tradingContext *TradingContext, trader types.Trader) error
OnNewStream(stream *types.StandardPrivateStream) error
}
type RegressionTrader struct {
type KLineRegressionTrader struct {
// Context is trading Context
Context *TradingContext
SourceKLines []types.KLine
}
func (trader *KLineRegressionTrader) SubmitOrder(cxt context.Context, order *types.Order) {
}
func (trader *RegressionTrader) RunStrategy(ctx context.Context, strategy Strategy) {
func (trader *KLineRegressionTrader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error){
done := make(chan struct{})
defer close(done)
if err := strategy.Init(trader.Context, trader) ; err != nil {
return nil, err
}
standardStream := types.StandardPrivateStream{}
if err := strategy.OnNewStream(&standardStream); err != nil {
return nil, err
}
for _, kline := range trader.SourceKLines {
standardStream.EmitKLineClosed(&kline)
}
return done, nil
}
@ -55,7 +74,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
}
}
if err := strategy.Init(trader) ; err != nil {
if err := strategy.Init(trader.Context, trader) ; err != nil {
return nil, err
}
@ -64,7 +83,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
return nil, err
}
if err := strategy.OnNewStream(stream); err != nil {
if err := strategy.OnNewStream(&stream.StandardPrivateStream); err != nil {
return nil, err
}
@ -93,25 +112,18 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
trader.Context.SetCurrentPrice(e.KLine.GetClose())
})
stream.OnOutboundAccountInfoEvent(func(e *binance.OutboundAccountInfoEvent) {
stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) {
trader.Context.Lock()
defer trader.Context.Unlock()
for _, balance := range e.Balances {
available := util.MustParseFloat(balance.Free)
locked := util.MustParseFloat(balance.Locked)
trader.Context.Balances[balance.Asset] = types.Balance{
Currency: balance.Asset,
Available: available,
Locked: locked,
}
for _ , balance := range snapshot {
trader.Context.Balances[balance.Currency] = balance
}
})
// stream.OnOutboundAccountInfoEvent(func(e *binance.OutboundAccountInfoEvent) { })
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
trader.Context.Lock()
defer trader.Context.Unlock()
delta := util.MustParseFloat(e.Delta)
if balance, ok := trader.Context.Balances[e.Asset] ; ok {
balance.Available += delta

View File

@ -0,0 +1,102 @@
// Code generated by "callbackgen -type StandardPrivateStream -interface"; DO NOT EDIT.
package types
import (
"reflect"
)
func (stream *StandardPrivateStream) OnTrade(cb func(trade *Trade)) {
stream.tradeCallbacks = append(stream.tradeCallbacks, cb)
}
func (stream *StandardPrivateStream) EmitTrade(trade *Trade) {
for _, cb := range stream.tradeCallbacks {
cb(trade)
}
}
func (stream *StandardPrivateStream) RemoveOnTrade(needle func(trade *Trade)) (found bool) {
var newcallbacks []func(trade *Trade)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range stream.tradeCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
stream.tradeCallbacks = newcallbacks
}
return found
}
func (stream *StandardPrivateStream) OnBalanceSnapshot(cb func(balanceSnapshot map[string]Balance)) {
stream.balanceSnapshotCallbacks = append(stream.balanceSnapshotCallbacks, cb)
}
func (stream *StandardPrivateStream) EmitBalanceSnapshot(balanceSnapshot map[string]Balance) {
for _, cb := range stream.balanceSnapshotCallbacks {
cb(balanceSnapshot)
}
}
func (stream *StandardPrivateStream) RemoveOnBalanceSnapshot(needle func(balanceSnapshot map[string]Balance)) (found bool) {
var newcallbacks []func(balanceSnapshot map[string]Balance)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range stream.balanceSnapshotCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
stream.balanceSnapshotCallbacks = newcallbacks
}
return found
}
func (stream *StandardPrivateStream) OnKLineClosed(cb func(kline *KLine)) {
stream.kLineClosedCallbacks = append(stream.kLineClosedCallbacks, cb)
}
func (stream *StandardPrivateStream) EmitKLineClosed(kline *KLine) {
for _, cb := range stream.kLineClosedCallbacks {
cb(kline)
}
}
func (stream *StandardPrivateStream) RemoveOnKLineClosed(needle func(kline *KLine)) (found bool) {
var newcallbacks []func(kline *KLine)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range stream.kLineClosedCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
stream.kLineClosedCallbacks = newcallbacks
}
return found
}
type StandardPrivateStreamEventHub interface {
OnTrade(cb func(trade *Trade))
OnBalanceSnapshot(cb func(balanceSnapshot map[string]Balance))
OnKLineClosed(cb func(kline *KLine))
}

49
bbgo/types/stream.go Normal file
View File

@ -0,0 +1,49 @@
package types
import (
"fmt"
"strings"
)
//go:generate callbackgen -type StandardPrivateStream -interface
type StandardPrivateStream struct {
Subscriptions []Subscription
tradeCallbacks []func(trade *Trade)
balanceSnapshotCallbacks []func(balanceSnapshot map[string]Balance)
kLineClosedCallbacks []func(kline *KLine)
}
func (stream *StandardPrivateStream) Subscribe(channel string, symbol string, options SubscribeOptions) {
stream.Subscriptions = append(stream.Subscriptions, Subscription{
Channel: channel,
Symbol: symbol,
Options: options,
})
}
type SubscribeOptions struct {
Interval string
Depth string
}
func (o SubscribeOptions) String() string {
if len(o.Interval) > 0 {
return o.Interval
}
return o.Depth
}
type Subscription struct {
Symbol string
Channel string
Options SubscribeOptions
}
func (s *Subscription) String() string {
// binance uses lower case symbol name
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
}

7
bbgo/types/trader.go Normal file
View File

@ -0,0 +1,7 @@
package types
import "context"
type Trader interface {
SubmitOrder(ctx context.Context, order *Order)
}