From 9b042b5e8f752a08301caaffc1c6eefaf03658f3 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 13 Jul 2020 13:25:48 +0800 Subject: [PATCH] fix subscription mech --- bbgo/exchange/binance/stream.go | 7 +-- bbgo/slack.go | 85 ++++++++++++++++++++++++++++++++ bbgo/trader.go | 76 +++++++++++++++++++++++++++-- slack/logrus_look.go | 2 - slack/notifier.go | 86 --------------------------------- 5 files changed, 161 insertions(+), 95 deletions(-) create mode 100644 bbgo/slack.go diff --git a/bbgo/exchange/binance/stream.go b/bbgo/exchange/binance/stream.go index 4aae85ac2..a295da70b 100644 --- a/bbgo/exchange/binance/stream.go +++ b/bbgo/exchange/binance/stream.go @@ -141,7 +141,7 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { } default: - if err := s.Conn.SetReadDeadline(time.Now().Add(6 * time.Second)); err != nil { + if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil { log.WithError(err).Errorf("set read deadline error: %s", err.Error()) } @@ -158,7 +158,8 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { return default: - s.Subscriptions = nil + _ = s.invalidateListenKey(ctx, s.ListenKey) + err = s.connect(ctx) time.Sleep(5 * time.Second) } @@ -180,7 +181,7 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { continue } - // log.Infof("[binance] event: %+v", e) + // log.Notify("[binance] event: %+v", e) switch e := e.(type) { diff --git a/bbgo/slack.go b/bbgo/slack.go new file mode 100644 index 000000000..bddfe0e9b --- /dev/null +++ b/bbgo/slack.go @@ -0,0 +1,85 @@ +package bbgo + +import ( + "context" + "fmt" + "github.com/c9s/bbgo/pkg/bbgo/types" + "github.com/c9s/bbgo/pkg/slack/slackstyle" + "github.com/c9s/bbgo/pkg/util" + "github.com/sirupsen/logrus" + "github.com/slack-go/slack" + "time" +) + +type SlackNotifier struct { + Slack *slack.Client + + TradingChannel string + ErrorChannel string + InfoChannel string +} + +func (t *SlackNotifier) Notify(format string, args ...interface{}) { + var slackAttachments []slack.Attachment = nil + var slackArgsStartIdx = -1 + for idx, arg := range args { + switch a := arg.(type) { + + // concrete type assert first + case slack.Attachment: + if slackArgsStartIdx == -1 { + slackArgsStartIdx = idx + } + slackAttachments = append(slackAttachments, a) + + case slackstyle.SlackAttachmentCreator: + if slackArgsStartIdx == -1 { + slackArgsStartIdx = idx + } + slackAttachments = append(slackAttachments, a.SlackAttachment()) + + } + } + + var nonSlackArgs = []interface{}{} + if slackArgsStartIdx > 0 { + nonSlackArgs = args[:slackArgsStartIdx] + } + + logrus.Infof(format, nonSlackArgs...) + + _, _, err := t.Slack.PostMessageContext(context.Background(), t.InfoChannel, + slack.MsgOptionText(fmt.Sprintf(format, nonSlackArgs...), true), + slack.MsgOptionAttachments(slackAttachments...)) + if err != nil { + logrus.WithError(err).Errorf("slack error: %s", err.Error()) + } +} + +func (t *SlackNotifier) ReportTrade(trade *types.Trade) { + _, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel, + slack.MsgOptionText(util.Render(`:handshake: trade execution @ {{ .Price }}`, trade), true), + slack.MsgOptionAttachments(trade.SlackAttachment())) + + if err != nil { + logrus.WithError(err).Error("slack send error") + } +} + +func (t *SlackNotifier) ReportPnL(report *ProfitAndLossReport) { + attachment := report.SlackAttachment() + + _, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel, + slack.MsgOptionText(util.Render( + `:heavy_dollar_sign: Here is your *{{ .symbol }}* PnL report collected since *{{ .startTime }}*`, + map[string]interface{}{ + "symbol": report.Symbol, + "startTime": report.StartTime.Format(time.RFC822), + }), true), + slack.MsgOptionAttachments(attachment)) + + if err != nil { + logrus.WithError(err).Errorf("slack send error") + } +} + diff --git a/bbgo/trader.go b/bbgo/trader.go index 5b33c0650..b75b975b2 100644 --- a/bbgo/trader.go +++ b/bbgo/trader.go @@ -2,25 +2,93 @@ package bbgo import ( "context" + "time" log "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/bbgo/exchange/binance" "github.com/c9s/bbgo/pkg/bbgo/types" - "github.com/c9s/bbgo/pkg/slack" ) type Trader struct { - Notifier *slack.SlackNotifier + Notifier *SlackNotifier // Context is trading Context Context *TradingContext Exchange *binance.Exchange + + reportTimer *time.Timer +} + + +type Strategy interface { + Init(trader *Trader, stream *binance.PrivateStream) error +} + +func (t *Trader) RunStrategy(ctx context.Context, strategy Strategy) error { + symbol := t.Context.Symbol + + stream, err := t.Exchange.NewPrivateStream() + if err != nil { + return err + } + + if err := strategy.Init(t, stream); err != nil { + return err + } + + t.reportTimer = time.AfterFunc(1*time.Second, func() { + t.ReportPnL() + }) + + stream.OnTrade(func(trade *types.Trade) { + if trade.Symbol != symbol { + return + } + + t.ReportTrade(trade) + t.Context.ProfitAndLossCalculator.AddTrade(*trade) + + if t.reportTimer != nil { + t.reportTimer.Stop() + } + + t.reportTimer = time.AfterFunc(5*time.Second, func() { + t.ReportPnL() + }) + }) + + stream.OnKLineEvent(func(e *binance.KLineEvent) { + t.Context.SetCurrentPrice(e.KLine.GetClose()) + }) + + var eventC = make(chan interface{}, 20) + if err := stream.Connect(ctx, eventC); err != nil { + return err + } + + go func() { + defer stream.Close() + + for { + select { + + case <-ctx.Done(): + return + + // drain the event channel + case <-eventC: + + } + } + }() + + return nil } func (t *Trader) Infof(format string, args ...interface{}) { - t.Notifier.Infof(format, args...) + t.Notifier.Notify(format, args...) } func (t *Trader) ReportTrade(trade *types.Trade) { @@ -34,7 +102,7 @@ func (t *Trader) ReportPnL() { } func (t *Trader) SubmitOrder(ctx context.Context, order *types.Order) { - t.Infof(":memo: Submitting %s order on side %s with volume: %s", order.Type, order.Side, order.VolumeStr, order.SlackAttachment()) + t.Notifier.Notify(":memo: Submitting %s order on side %s with volume: %s", order.Type, order.Side, order.VolumeStr, order.SlackAttachment()) err := t.Exchange.SubmitOrder(ctx, order) if err != nil { diff --git a/slack/logrus_look.go b/slack/logrus_look.go index 29c383ebe..cb2304962 100644 --- a/slack/logrus_look.go +++ b/slack/logrus_look.go @@ -53,5 +53,3 @@ func (t *SlackLogHook) Fire(e *logrus.Entry) error { return err } - - diff --git a/slack/notifier.go b/slack/notifier.go index 24f430fcf..578351f3c 100644 --- a/slack/notifier.go +++ b/slack/notifier.go @@ -1,87 +1 @@ package slack - -import ( - "context" - "fmt" - log "github.com/sirupsen/logrus" - "github.com/slack-go/slack" - "time" - - "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/bbgo/types" - "github.com/c9s/bbgo/pkg/slack/slackstyle" - "github.com/c9s/bbgo/pkg/util" -) - -type SlackNotifier struct { - Slack *slack.Client - - TradingChannel string - ErrorChannel string - InfoChannel string -} - -func (t *SlackNotifier) Infof(format string, args ...interface{}) { - var slackAttachments []slack.Attachment = nil - var slackArgsStartIdx = -1 - for idx, arg := range args { - switch a := arg.(type) { - - // concrete type assert first - case slack.Attachment: - if slackArgsStartIdx == -1 { - slackArgsStartIdx = idx - } - slackAttachments = append(slackAttachments, a) - - case slackstyle.SlackAttachmentCreator: - if slackArgsStartIdx == -1 { - slackArgsStartIdx = idx - } - slackAttachments = append(slackAttachments, a.SlackAttachment()) - - } - } - - var nonSlackArgs = []interface{}{} - if slackArgsStartIdx > 0 { - nonSlackArgs = args[:slackArgsStartIdx] - } - - log.Infof(format, nonSlackArgs...) - - _, _, err := t.Slack.PostMessageContext(context.Background(), t.InfoChannel, - slack.MsgOptionText(fmt.Sprintf(format, nonSlackArgs...), true), - slack.MsgOptionAttachments(slackAttachments...)) - if err != nil { - log.WithError(err).Errorf("slack error: %s", err.Error()) - } -} - -func (t *SlackNotifier) ReportTrade(trade *types.Trade) { - _, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel, - slack.MsgOptionText(util.Render(`:handshake: trade execution @ {{ .Price }}`, trade), true), - slack.MsgOptionAttachments(trade.SlackAttachment())) - - if err != nil { - log.WithError(err).Error("slack send error") - } -} - -func (t *SlackNotifier) ReportPnL(report *bbgo.ProfitAndLossReport) { - attachment := report.SlackAttachment() - - _, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel, - slack.MsgOptionText(util.Render( - `:heavy_dollar_sign: Here is your *{{ .symbol }}* PnL report collected since *{{ .startTime }}*`, - map[string]interface{}{ - "symbol": report.Symbol, - "startTime": report.StartTime.Format(time.RFC822), - }), true), - slack.MsgOptionAttachments(attachment)) - - if err != nil { - log.WithError(err).Errorf("slack send error") - } -} -