diff --git a/bbgo/calc.go b/bbgo/calc.go deleted file mode 100644 index 6689bbdec..000000000 --- a/bbgo/calc.go +++ /dev/null @@ -1,52 +0,0 @@ -package bbgo - -import ( - types2 "github.com/c9s/bbgo/pkg/bbgo/types" - "math" -) - -// this is for BTC -const MinQuantity = 0.00000100 - -// https://www.desmos.com/calculator/ircjhtccbn -func BuyVolumeModifier(price float64) float64 { - targetPrice := 7500.0 // we will get 1 at price 7500, and more below 7500 - flatness := 1000.0 // higher number buys more in the middle section. higher number gets more flat line, reduced to 0 at price 2000 * 10 - return math.Min(2, math.Exp(-(price-targetPrice)/flatness)) -} - -func SellVolumeModifier(price float64) float64 { - // \exp\left(\frac{x-10000}{500}\right) - targetPrice := 10500.0 // target to sell most x1 at 10000.0 - flatness := 500.0 // higher number sells more in the middle section, lower number sells fewer in the middle section. - return math.Min(2, math.Exp((price-targetPrice)/flatness)) -} - -func VolumeByPriceChange(market Market, currentPrice float64, change float64, side types2.SideType) float64 { - volume := BaseVolumeByPriceChange(change) - - if side == types2.SideTypeSell { - volume *= SellVolumeModifier(currentPrice) - } else { - volume *= BuyVolumeModifier(currentPrice) - } - - // at least the minimal quantity - volume = math.Max(market.MinQuantity, volume) - - // modify volume for the min amount - amount := currentPrice * volume - if amount < market.MinAmount { - ratio := market.MinAmount / amount - volume *= ratio - } - - volume = math.Trunc(volume * math.Pow10(market.VolumePrecision)) / math.Pow10(market.VolumePrecision) - return volume -} - -func BaseVolumeByPriceChange(change float64) float64 { - return 0.2 * math.Exp((math.Abs(change)-3100.0)/1600.0) - // 0.116*math.Exp(math.Abs(change)/2400) - 0.1 -} - diff --git a/bbgo/calc_test.go b/bbgo/calc_test.go deleted file mode 100644 index a0b2801f6..000000000 --- a/bbgo/calc_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package bbgo - -import ( - "github.com/adshao/go-binance" - "testing" -) - -func TestVolumeByPriceChange(t *testing.T) { - type args struct { - market Market - currentPrice float64 - change float64 - side binance.SideType - } - tests := []struct { - name string - args args - want float64 - }{ - { - name: "buy-change-50-at-9400", - args: args{ - market: MarketBTCUSDT, - currentPrice: 9400, - change: 50, - side: binance.SideTypeBuy, - }, - want: 0.00444627, - }, - { - name: "buy-change-100-at-9200", - args: args{ - market: MarketBTCUSDT, - currentPrice: 9200, - change: 100, - side: binance.SideTypeBuy, - }, - want: 0.00560308, - }, - { - name: "sell-change-100-at-9500", - args: args{ - market: MarketBTCUSDT, - currentPrice: 9500, - change: 100, - side: binance.SideTypeSell, - }, - want: 0.00415086, - }, - { - name: "sell-change-200-at-9600", - args: args{ - market: MarketBTCUSDT, - currentPrice: 9500, - change: 200, - side: binance.SideTypeSell, - }, - want: 0.00441857, - }, - { - name: "sell-change-500-at-9600", - args: args{ - market: MarketBTCUSDT, - currentPrice: 9600, - change: 500, - side: binance.SideTypeSell, - }, - want: 0.00650985, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := VolumeByPriceChange(tt.args.market, tt.args.currentPrice, tt.args.change, tt.args.side); got != tt.want { - t.Errorf("VolumeByPriceChange() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/bbgo/exchange/binance/parse.go b/bbgo/exchange/binance/parse.go index bf4d5317d..cf6f6b2bc 100644 --- a/bbgo/exchange/binance/parse.go +++ b/bbgo/exchange/binance/parse.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - types2 "github.com/c9s/bbgo/pkg/bbgo/types" "github.com/c9s/bbgo/pkg/bbgo/types" "github.com/c9s/bbgo/pkg/util" "github.com/valyala/fastjson" @@ -84,13 +83,13 @@ type ExecutionReportEvent struct { OrderCreationTime int `json:"O"` } -func (e *ExecutionReportEvent) Trade() (*types2.Trade, error) { +func (e *ExecutionReportEvent) Trade() (*types.Trade, error) { if e.CurrentExecutionType != "TRADE" { return nil, errors.New("execution report is not a trade") } tt := time.Unix(0, e.TransactionTime/1000000) - return &types2.Trade{ + return &types.Trade{ ID: e.TradeID, Symbol: e.Symbol, Price: util.MustParseFloat(e.LastExecutedPrice), diff --git a/bbgo/exchange/binance/privatestream_callbacks.go b/bbgo/exchange/binance/privatestream_callbacks.go new file mode 100644 index 000000000..4cedd1ca8 --- /dev/null +++ b/bbgo/exchange/binance/privatestream_callbacks.go @@ -0,0 +1,227 @@ +// Code generated by "callbackgen -type PrivateStream -interface"; DO NOT EDIT. + +package binance + +import ( + "github.com/c9s/bbgo/pkg/bbgo/types" + "reflect" +) + +func (s *PrivateStream) OnConnect(cb func(stream *PrivateStream)) { + s.connectCallbacks = append(s.connectCallbacks, cb) +} + +func (s *PrivateStream) EmitConnect(stream *PrivateStream) { + for _, cb := range s.connectCallbacks { + cb(stream) + } +} + +func (s *PrivateStream) RemoveOnConnect(needle func(stream *PrivateStream)) (found bool) { + + var newcallbacks []func(stream *PrivateStream) + var fp = reflect.ValueOf(needle).Pointer() + for _, cb := range s.connectCallbacks { + if fp == reflect.ValueOf(cb).Pointer() { + found = true + } else { + newcallbacks = append(newcallbacks, cb) + } + } + + if found { + s.connectCallbacks = newcallbacks + } + + return found +} + +func (s *PrivateStream) OnTrade(cb func(trade *types.Trade)) { + s.tradeCallbacks = append(s.tradeCallbacks, cb) +} + +func (s *PrivateStream) EmitTrade(trade *types.Trade) { + for _, cb := range s.tradeCallbacks { + cb(trade) + } +} + +func (s *PrivateStream) RemoveOnTrade(needle func(trade *types.Trade)) (found bool) { + + var newcallbacks []func(trade *types.Trade) + var fp = reflect.ValueOf(needle).Pointer() + for _, cb := range s.tradeCallbacks { + if fp == reflect.ValueOf(cb).Pointer() { + found = true + } else { + newcallbacks = append(newcallbacks, cb) + } + } + + if found { + s.tradeCallbacks = newcallbacks + } + + return found +} + +func (s *PrivateStream) OnKLineEvent(cb func(event *KLineEvent)) { + s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb) +} + +func (s *PrivateStream) EmitKLineEvent(event *KLineEvent) { + for _, cb := range s.kLineEventCallbacks { + cb(event) + } +} + +func (s *PrivateStream) RemoveOnKLineEvent(needle func(event *KLineEvent)) (found bool) { + + var newcallbacks []func(event *KLineEvent) + var fp = reflect.ValueOf(needle).Pointer() + for _, cb := range s.kLineEventCallbacks { + if fp == reflect.ValueOf(cb).Pointer() { + found = true + } else { + newcallbacks = append(newcallbacks, cb) + } + } + + if found { + s.kLineEventCallbacks = newcallbacks + } + + return found +} + +func (s *PrivateStream) OnKLineClosedEvent(cb func(event *KLineEvent)) { + s.kLineClosedEventCallbacks = append(s.kLineClosedEventCallbacks, cb) +} + +func (s *PrivateStream) EmitKLineClosedEvent(event *KLineEvent) { + for _, cb := range s.kLineClosedEventCallbacks { + cb(event) + } +} + +func (s *PrivateStream) RemoveOnKLineClosedEvent(needle func(event *KLineEvent)) (found bool) { + + var newcallbacks []func(event *KLineEvent) + var fp = reflect.ValueOf(needle).Pointer() + for _, cb := range s.kLineClosedEventCallbacks { + if fp == reflect.ValueOf(cb).Pointer() { + found = true + } else { + newcallbacks = append(newcallbacks, cb) + } + } + + if found { + s.kLineClosedEventCallbacks = newcallbacks + } + + return found +} + +func (s *PrivateStream) OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) { + s.balanceUpdateEventCallbacks = append(s.balanceUpdateEventCallbacks, cb) +} + +func (s *PrivateStream) EmitBalanceUpdateEvent(event *BalanceUpdateEvent) { + for _, cb := range s.balanceUpdateEventCallbacks { + cb(event) + } +} + +func (s *PrivateStream) RemoveOnBalanceUpdateEvent(needle func(event *BalanceUpdateEvent)) (found bool) { + + var newcallbacks []func(event *BalanceUpdateEvent) + var fp = reflect.ValueOf(needle).Pointer() + for _, cb := range s.balanceUpdateEventCallbacks { + if fp == reflect.ValueOf(cb).Pointer() { + found = true + } else { + newcallbacks = append(newcallbacks, cb) + } + } + + if found { + s.balanceUpdateEventCallbacks = newcallbacks + } + + return found +} + +func (s *PrivateStream) OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent)) { + s.outboundAccountInfoEventCallbacks = append(s.outboundAccountInfoEventCallbacks, cb) +} + +func (s *PrivateStream) EmitOutboundAccountInfoEvent(event *OutboundAccountInfoEvent) { + for _, cb := range s.outboundAccountInfoEventCallbacks { + cb(event) + } +} + +func (s *PrivateStream) RemoveOnOutboundAccountInfoEvent(needle func(event *OutboundAccountInfoEvent)) (found bool) { + + var newcallbacks []func(event *OutboundAccountInfoEvent) + var fp = reflect.ValueOf(needle).Pointer() + for _, cb := range s.outboundAccountInfoEventCallbacks { + if fp == reflect.ValueOf(cb).Pointer() { + found = true + } else { + newcallbacks = append(newcallbacks, cb) + } + } + + if found { + s.outboundAccountInfoEventCallbacks = newcallbacks + } + + return found +} + +func (s *PrivateStream) OnExecutionReportEvent(cb func(event *ExecutionReportEvent)) { + s.executionReportEventCallbacks = append(s.executionReportEventCallbacks, cb) +} + +func (s *PrivateStream) EmitExecutionReportEvent(event *ExecutionReportEvent) { + for _, cb := range s.executionReportEventCallbacks { + cb(event) + } +} + +func (s *PrivateStream) RemoveOnExecutionReportEvent(needle func(event *ExecutionReportEvent)) (found bool) { + + var newcallbacks []func(event *ExecutionReportEvent) + var fp = reflect.ValueOf(needle).Pointer() + for _, cb := range s.executionReportEventCallbacks { + if fp == reflect.ValueOf(cb).Pointer() { + found = true + } else { + newcallbacks = append(newcallbacks, cb) + } + } + + if found { + s.executionReportEventCallbacks = newcallbacks + } + + return found +} + +type PrivateStreamEventHub interface { + OnConnect(cb func(stream *PrivateStream)) + + OnTrade(cb func(trade *types.Trade)) + + OnKLineEvent(cb func(event *KLineEvent)) + + OnKLineClosedEvent(cb func(event *KLineEvent)) + + OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) + + OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent)) + + OnExecutionReportEvent(cb func(event *ExecutionReportEvent)) +} diff --git a/bbgo/exchange/binance/stream.go b/bbgo/exchange/binance/stream.go index ed2df3a11..d4c95b291 100644 --- a/bbgo/exchange/binance/stream.go +++ b/bbgo/exchange/binance/stream.go @@ -4,8 +4,9 @@ import ( "context" "fmt" "github.com/adshao/go-binance" + "github.com/c9s/bbgo/pkg/bbgo/types" "github.com/gorilla/websocket" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "strings" "time" ) @@ -41,11 +42,23 @@ type StreamRequest struct { Params []string `json:"params"` } +//go:generate callbackgen -type PrivateStream -interface type PrivateStream struct { Client *binance.Client ListenKey string Conn *websocket.Conn Subscriptions []Subscription + + connectCallbacks []func(stream *PrivateStream) + tradeCallbacks []func(trade *types.Trade) + + // custom callbacks + kLineEventCallbacks []func(event *KLineEvent) + kLineClosedEventCallbacks []func(event *KLineEvent) + + balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent) + outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent) + executionReportEventCallbacks []func(event *ExecutionReportEvent) } func (s *PrivateStream) Subscribe(channel string, symbol string, options SubscribeOptions) { @@ -63,15 +76,17 @@ func (s *PrivateStream) Connect(ctx context.Context, eventC chan interface{}) er return err } - logrus.Infof("[binance] websocket connected") + log.Infof("[binance] websocket connected") s.Conn = conn + s.EmitConnect(s) + var params []string for _, subscription := range s.Subscriptions { params = append(params, subscription.String()) } - logrus.Infof("[binance] subscribing channels: %+v", params) + log.Infof("[binance] subscribing channels: %+v", params) err = conn.WriteJSON(StreamRequest{ Method: "SUBSCRIBE", Params: params, @@ -102,17 +117,17 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { case <-ticker.C: err := s.Client.NewKeepaliveUserStreamService().ListenKey(s.ListenKey).Do(ctx) if err != nil { - logrus.WithError(err).Error("listen key keep-alive error", err) + log.WithError(err).Error("listen key keep-alive error", err) } default: if err := s.Conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil { - logrus.WithError(err).Errorf("set read deadline error: %s", err.Error()) + log.WithError(err).Errorf("set read deadline error: %s", err.Error()) } mt, message, err := s.Conn.ReadMessage() if err != nil { - logrus.WithError(err).Errorf("read error: %s", err.Error()) + log.WithError(err).Errorf("read error: %s", err.Error()) return } @@ -121,31 +136,63 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { continue } - logrus.Debugf("[binance] recv: %s", message) + log.Debugf("[binance] recv: %s", message) e, err := ParseEvent(string(message)) if err != nil { - logrus.WithError(err).Errorf("[binance] event parse error") + log.WithError(err).Errorf("[binance] event parse error") continue } + log.Infof("[binance] event: %+v", e) + + switch e := e.(type) { + + case *OutboundAccountInfoEvent: + log.Info(e.Event, " ", e.Balances) + s.EmitOutboundAccountInfoEvent(e) + + case *BalanceUpdateEvent: + log.Info(e.Event, " ", e.Asset, " ", e.Delta) + s.EmitBalanceUpdateEvent(e) + + case *KLineEvent: + log.Info(e.Event, " ", e.KLine, " ", e.KLine.Interval) + s.EmitKLineEvent(e) + + if e.KLine.Closed { + s.EmitKLineClosedEvent(e) + } + + case *ExecutionReportEvent: + s.EmitExecutionReportEvent(e) + + switch e.CurrentExecutionType { + case "TRADE": + trade, err := e.Trade() + if err != nil { + break + } + s.EmitTrade(trade) + } + } + eventC <- e } } } func (s *PrivateStream) Close() error { - logrus.Infof("[binance] closing user data stream...") + log.Infof("[binance] closing user data stream...") defer s.Conn.Close() // use background context to close user stream err := s.Client.NewCloseUserStreamService().ListenKey(s.ListenKey).Do(context.Background()) if err != nil { - logrus.WithError(err).Error("[binance] error close user data stream") + log.WithError(err).Error("[binance] error close user data stream") return err } return err } - diff --git a/bbgo/trader.go b/bbgo/trader.go index 2743f2fe9..441bdacb6 100644 --- a/bbgo/trader.go +++ b/bbgo/trader.go @@ -76,7 +76,7 @@ func (t *Trader) Errorf(err error, format string, args ...interface{}) { } } -func (t *Trader) ReportTrade(e *binance.ExecutionReportEvent, trade *types2.Trade) { +func (t *Trader) ReportTrade(trade *types2.Trade) { var color = "" if trade.IsBuyer { color = "#228B22" @@ -85,7 +85,7 @@ func (t *Trader) ReportTrade(e *binance.ExecutionReportEvent, trade *types2.Trad } _, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel, - slack.MsgOptionText(util.Render(`:handshake: {{ .CurrentExecutionType }} execution`, e), true), + slack.MsgOptionText(util.Render(`:handshake: trade execution`, trade), true), slack.MsgOptionAttachments(slack.Attachment{ Title: "New Trade", Color: color, @@ -93,7 +93,7 @@ func (t *Trader) ReportTrade(e *binance.ExecutionReportEvent, trade *types2.Trad // Text: "", Fields: []slack.AttachmentField{ {Title: "Symbol", Value: trade.Symbol, Short: true,}, - {Title: "Side", Value: e.Side, Short: true,}, + {Title: "Side", Value: trade.Side, Short: true,}, {Title: "Price", Value: USD.FormatMoney(trade.Price), Short: true,}, {Title: "Volume", Value: t.Context.Market.FormatVolume(trade.Volume), Short: true,}, },