move to stream callbacks

This commit is contained in:
c9s 2020-07-11 16:07:09 +08:00
parent a105af35ce
commit 73276996a7
6 changed files with 290 additions and 147 deletions

View File

@ -1,52 +0,0 @@
package bbgo
import (
types2 "github.com/c9s/bbgo/pkg/bbgo/types"
"math"
)
// this is for BTC
const MinQuantity = 0.00000100
// https://www.desmos.com/calculator/ircjhtccbn
func BuyVolumeModifier(price float64) float64 {
targetPrice := 7500.0 // we will get 1 at price 7500, and more below 7500
flatness := 1000.0 // higher number buys more in the middle section. higher number gets more flat line, reduced to 0 at price 2000 * 10
return math.Min(2, math.Exp(-(price-targetPrice)/flatness))
}
func SellVolumeModifier(price float64) float64 {
// \exp\left(\frac{x-10000}{500}\right)
targetPrice := 10500.0 // target to sell most x1 at 10000.0
flatness := 500.0 // higher number sells more in the middle section, lower number sells fewer in the middle section.
return math.Min(2, math.Exp((price-targetPrice)/flatness))
}
func VolumeByPriceChange(market Market, currentPrice float64, change float64, side types2.SideType) float64 {
volume := BaseVolumeByPriceChange(change)
if side == types2.SideTypeSell {
volume *= SellVolumeModifier(currentPrice)
} else {
volume *= BuyVolumeModifier(currentPrice)
}
// at least the minimal quantity
volume = math.Max(market.MinQuantity, volume)
// modify volume for the min amount
amount := currentPrice * volume
if amount < market.MinAmount {
ratio := market.MinAmount / amount
volume *= ratio
}
volume = math.Trunc(volume * math.Pow10(market.VolumePrecision)) / math.Pow10(market.VolumePrecision)
return volume
}
func BaseVolumeByPriceChange(change float64) float64 {
return 0.2 * math.Exp((math.Abs(change)-3100.0)/1600.0)
// 0.116*math.Exp(math.Abs(change)/2400) - 0.1
}

View File

@ -1,78 +0,0 @@
package bbgo
import (
"github.com/adshao/go-binance"
"testing"
)
func TestVolumeByPriceChange(t *testing.T) {
type args struct {
market Market
currentPrice float64
change float64
side binance.SideType
}
tests := []struct {
name string
args args
want float64
}{
{
name: "buy-change-50-at-9400",
args: args{
market: MarketBTCUSDT,
currentPrice: 9400,
change: 50,
side: binance.SideTypeBuy,
},
want: 0.00444627,
},
{
name: "buy-change-100-at-9200",
args: args{
market: MarketBTCUSDT,
currentPrice: 9200,
change: 100,
side: binance.SideTypeBuy,
},
want: 0.00560308,
},
{
name: "sell-change-100-at-9500",
args: args{
market: MarketBTCUSDT,
currentPrice: 9500,
change: 100,
side: binance.SideTypeSell,
},
want: 0.00415086,
},
{
name: "sell-change-200-at-9600",
args: args{
market: MarketBTCUSDT,
currentPrice: 9500,
change: 200,
side: binance.SideTypeSell,
},
want: 0.00441857,
},
{
name: "sell-change-500-at-9600",
args: args{
market: MarketBTCUSDT,
currentPrice: 9600,
change: 500,
side: binance.SideTypeSell,
},
want: 0.00650985,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := VolumeByPriceChange(tt.args.market, tt.args.currentPrice, tt.args.change, tt.args.side); got != tt.want {
t.Errorf("VolumeByPriceChange() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
types2 "github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/c9s/bbgo/pkg/bbgo/types" "github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
@ -84,13 +83,13 @@ type ExecutionReportEvent struct {
OrderCreationTime int `json:"O"` OrderCreationTime int `json:"O"`
} }
func (e *ExecutionReportEvent) Trade() (*types2.Trade, error) { func (e *ExecutionReportEvent) Trade() (*types.Trade, error) {
if e.CurrentExecutionType != "TRADE" { if e.CurrentExecutionType != "TRADE" {
return nil, errors.New("execution report is not a trade") return nil, errors.New("execution report is not a trade")
} }
tt := time.Unix(0, e.TransactionTime/1000000) tt := time.Unix(0, e.TransactionTime/1000000)
return &types2.Trade{ return &types.Trade{
ID: e.TradeID, ID: e.TradeID,
Symbol: e.Symbol, Symbol: e.Symbol,
Price: util.MustParseFloat(e.LastExecutedPrice), Price: util.MustParseFloat(e.LastExecutedPrice),

View File

@ -0,0 +1,227 @@
// Code generated by "callbackgen -type PrivateStream -interface"; DO NOT EDIT.
package binance
import (
"github.com/c9s/bbgo/pkg/bbgo/types"
"reflect"
)
func (s *PrivateStream) OnConnect(cb func(stream *PrivateStream)) {
s.connectCallbacks = append(s.connectCallbacks, cb)
}
func (s *PrivateStream) EmitConnect(stream *PrivateStream) {
for _, cb := range s.connectCallbacks {
cb(stream)
}
}
func (s *PrivateStream) RemoveOnConnect(needle func(stream *PrivateStream)) (found bool) {
var newcallbacks []func(stream *PrivateStream)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.connectCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.connectCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnTrade(cb func(trade *types.Trade)) {
s.tradeCallbacks = append(s.tradeCallbacks, cb)
}
func (s *PrivateStream) EmitTrade(trade *types.Trade) {
for _, cb := range s.tradeCallbacks {
cb(trade)
}
}
func (s *PrivateStream) RemoveOnTrade(needle func(trade *types.Trade)) (found bool) {
var newcallbacks []func(trade *types.Trade)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.tradeCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.tradeCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnKLineEvent(cb func(event *KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
func (s *PrivateStream) EmitKLineEvent(event *KLineEvent) {
for _, cb := range s.kLineEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnKLineEvent(needle func(event *KLineEvent)) (found bool) {
var newcallbacks []func(event *KLineEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.kLineEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.kLineEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnKLineClosedEvent(cb func(event *KLineEvent)) {
s.kLineClosedEventCallbacks = append(s.kLineClosedEventCallbacks, cb)
}
func (s *PrivateStream) EmitKLineClosedEvent(event *KLineEvent) {
for _, cb := range s.kLineClosedEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnKLineClosedEvent(needle func(event *KLineEvent)) (found bool) {
var newcallbacks []func(event *KLineEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.kLineClosedEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.kLineClosedEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) {
s.balanceUpdateEventCallbacks = append(s.balanceUpdateEventCallbacks, cb)
}
func (s *PrivateStream) EmitBalanceUpdateEvent(event *BalanceUpdateEvent) {
for _, cb := range s.balanceUpdateEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnBalanceUpdateEvent(needle func(event *BalanceUpdateEvent)) (found bool) {
var newcallbacks []func(event *BalanceUpdateEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.balanceUpdateEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.balanceUpdateEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent)) {
s.outboundAccountInfoEventCallbacks = append(s.outboundAccountInfoEventCallbacks, cb)
}
func (s *PrivateStream) EmitOutboundAccountInfoEvent(event *OutboundAccountInfoEvent) {
for _, cb := range s.outboundAccountInfoEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnOutboundAccountInfoEvent(needle func(event *OutboundAccountInfoEvent)) (found bool) {
var newcallbacks []func(event *OutboundAccountInfoEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.outboundAccountInfoEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.outboundAccountInfoEventCallbacks = newcallbacks
}
return found
}
func (s *PrivateStream) OnExecutionReportEvent(cb func(event *ExecutionReportEvent)) {
s.executionReportEventCallbacks = append(s.executionReportEventCallbacks, cb)
}
func (s *PrivateStream) EmitExecutionReportEvent(event *ExecutionReportEvent) {
for _, cb := range s.executionReportEventCallbacks {
cb(event)
}
}
func (s *PrivateStream) RemoveOnExecutionReportEvent(needle func(event *ExecutionReportEvent)) (found bool) {
var newcallbacks []func(event *ExecutionReportEvent)
var fp = reflect.ValueOf(needle).Pointer()
for _, cb := range s.executionReportEventCallbacks {
if fp == reflect.ValueOf(cb).Pointer() {
found = true
} else {
newcallbacks = append(newcallbacks, cb)
}
}
if found {
s.executionReportEventCallbacks = newcallbacks
}
return found
}
type PrivateStreamEventHub interface {
OnConnect(cb func(stream *PrivateStream))
OnTrade(cb func(trade *types.Trade))
OnKLineEvent(cb func(event *KLineEvent))
OnKLineClosedEvent(cb func(event *KLineEvent))
OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent))
OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent))
OnExecutionReportEvent(cb func(event *ExecutionReportEvent))
}

View File

@ -4,8 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/adshao/go-binance" "github.com/adshao/go-binance"
"github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strings" "strings"
"time" "time"
) )
@ -41,11 +42,23 @@ type StreamRequest struct {
Params []string `json:"params"` Params []string `json:"params"`
} }
//go:generate callbackgen -type PrivateStream -interface
type PrivateStream struct { type PrivateStream struct {
Client *binance.Client Client *binance.Client
ListenKey string ListenKey string
Conn *websocket.Conn Conn *websocket.Conn
Subscriptions []Subscription Subscriptions []Subscription
connectCallbacks []func(stream *PrivateStream)
tradeCallbacks []func(trade *types.Trade)
// custom callbacks
kLineEventCallbacks []func(event *KLineEvent)
kLineClosedEventCallbacks []func(event *KLineEvent)
balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent)
outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent)
executionReportEventCallbacks []func(event *ExecutionReportEvent)
} }
func (s *PrivateStream) Subscribe(channel string, symbol string, options SubscribeOptions) { func (s *PrivateStream) Subscribe(channel string, symbol string, options SubscribeOptions) {
@ -63,15 +76,17 @@ func (s *PrivateStream) Connect(ctx context.Context, eventC chan interface{}) er
return err return err
} }
logrus.Infof("[binance] websocket connected") log.Infof("[binance] websocket connected")
s.Conn = conn s.Conn = conn
s.EmitConnect(s)
var params []string var params []string
for _, subscription := range s.Subscriptions { for _, subscription := range s.Subscriptions {
params = append(params, subscription.String()) params = append(params, subscription.String())
} }
logrus.Infof("[binance] subscribing channels: %+v", params) log.Infof("[binance] subscribing channels: %+v", params)
err = conn.WriteJSON(StreamRequest{ err = conn.WriteJSON(StreamRequest{
Method: "SUBSCRIBE", Method: "SUBSCRIBE",
Params: params, Params: params,
@ -102,17 +117,17 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
case <-ticker.C: case <-ticker.C:
err := s.Client.NewKeepaliveUserStreamService().ListenKey(s.ListenKey).Do(ctx) err := s.Client.NewKeepaliveUserStreamService().ListenKey(s.ListenKey).Do(ctx)
if err != nil { if err != nil {
logrus.WithError(err).Error("listen key keep-alive error", err) log.WithError(err).Error("listen key keep-alive error", err)
} }
default: default:
if err := s.Conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil { if err := s.Conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil {
logrus.WithError(err).Errorf("set read deadline error: %s", err.Error()) log.WithError(err).Errorf("set read deadline error: %s", err.Error())
} }
mt, message, err := s.Conn.ReadMessage() mt, message, err := s.Conn.ReadMessage()
if err != nil { if err != nil {
logrus.WithError(err).Errorf("read error: %s", err.Error()) log.WithError(err).Errorf("read error: %s", err.Error())
return return
} }
@ -121,31 +136,63 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
continue continue
} }
logrus.Debugf("[binance] recv: %s", message) log.Debugf("[binance] recv: %s", message)
e, err := ParseEvent(string(message)) e, err := ParseEvent(string(message))
if err != nil { if err != nil {
logrus.WithError(err).Errorf("[binance] event parse error") log.WithError(err).Errorf("[binance] event parse error")
continue continue
} }
log.Infof("[binance] event: %+v", e)
switch e := e.(type) {
case *OutboundAccountInfoEvent:
log.Info(e.Event, " ", e.Balances)
s.EmitOutboundAccountInfoEvent(e)
case *BalanceUpdateEvent:
log.Info(e.Event, " ", e.Asset, " ", e.Delta)
s.EmitBalanceUpdateEvent(e)
case *KLineEvent:
log.Info(e.Event, " ", e.KLine, " ", e.KLine.Interval)
s.EmitKLineEvent(e)
if e.KLine.Closed {
s.EmitKLineClosedEvent(e)
}
case *ExecutionReportEvent:
s.EmitExecutionReportEvent(e)
switch e.CurrentExecutionType {
case "TRADE":
trade, err := e.Trade()
if err != nil {
break
}
s.EmitTrade(trade)
}
}
eventC <- e eventC <- e
} }
} }
} }
func (s *PrivateStream) Close() error { func (s *PrivateStream) Close() error {
logrus.Infof("[binance] closing user data stream...") log.Infof("[binance] closing user data stream...")
defer s.Conn.Close() defer s.Conn.Close()
// use background context to close user stream // use background context to close user stream
err := s.Client.NewCloseUserStreamService().ListenKey(s.ListenKey).Do(context.Background()) err := s.Client.NewCloseUserStreamService().ListenKey(s.ListenKey).Do(context.Background())
if err != nil { if err != nil {
logrus.WithError(err).Error("[binance] error close user data stream") log.WithError(err).Error("[binance] error close user data stream")
return err return err
} }
return err return err
} }

View File

@ -76,7 +76,7 @@ func (t *Trader) Errorf(err error, format string, args ...interface{}) {
} }
} }
func (t *Trader) ReportTrade(e *binance.ExecutionReportEvent, trade *types2.Trade) { func (t *Trader) ReportTrade(trade *types2.Trade) {
var color = "" var color = ""
if trade.IsBuyer { if trade.IsBuyer {
color = "#228B22" color = "#228B22"
@ -85,7 +85,7 @@ func (t *Trader) ReportTrade(e *binance.ExecutionReportEvent, trade *types2.Trad
} }
_, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel, _, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel,
slack.MsgOptionText(util.Render(`:handshake: {{ .CurrentExecutionType }} execution`, e), true), slack.MsgOptionText(util.Render(`:handshake: trade execution`, trade), true),
slack.MsgOptionAttachments(slack.Attachment{ slack.MsgOptionAttachments(slack.Attachment{
Title: "New Trade", Title: "New Trade",
Color: color, Color: color,
@ -93,7 +93,7 @@ func (t *Trader) ReportTrade(e *binance.ExecutionReportEvent, trade *types2.Trad
// Text: "", // Text: "",
Fields: []slack.AttachmentField{ Fields: []slack.AttachmentField{
{Title: "Symbol", Value: trade.Symbol, Short: true,}, {Title: "Symbol", Value: trade.Symbol, Short: true,},
{Title: "Side", Value: e.Side, Short: true,}, {Title: "Side", Value: trade.Side, Short: true,},
{Title: "Price", Value: USD.FormatMoney(trade.Price), Short: true,}, {Title: "Price", Value: USD.FormatMoney(trade.Price), Short: true,},
{Title: "Volume", Value: t.Context.Market.FormatVolume(trade.Volume), Short: true,}, {Title: "Volume", Value: t.Context.Market.FormatVolume(trade.Volume), Short: true,},
}, },