fix subscription mech

This commit is contained in:
c9s 2020-07-13 13:25:48 +08:00
parent 79553212a7
commit 9b042b5e8f
5 changed files with 161 additions and 95 deletions

View File

@ -141,7 +141,7 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
}
default:
if err := s.Conn.SetReadDeadline(time.Now().Add(6 * time.Second)); err != nil {
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
}
@ -158,7 +158,8 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
return
default:
s.Subscriptions = nil
_ = s.invalidateListenKey(ctx, s.ListenKey)
err = s.connect(ctx)
time.Sleep(5 * time.Second)
}
@ -180,7 +181,7 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
continue
}
// log.Infof("[binance] event: %+v", e)
// log.Notify("[binance] event: %+v", e)
switch e := e.(type) {

85
bbgo/slack.go Normal file
View File

@ -0,0 +1,85 @@
package bbgo
import (
"context"
"fmt"
"github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/c9s/bbgo/pkg/slack/slackstyle"
"github.com/c9s/bbgo/pkg/util"
"github.com/sirupsen/logrus"
"github.com/slack-go/slack"
"time"
)
type SlackNotifier struct {
Slack *slack.Client
TradingChannel string
ErrorChannel string
InfoChannel string
}
func (t *SlackNotifier) Notify(format string, args ...interface{}) {
var slackAttachments []slack.Attachment = nil
var slackArgsStartIdx = -1
for idx, arg := range args {
switch a := arg.(type) {
// concrete type assert first
case slack.Attachment:
if slackArgsStartIdx == -1 {
slackArgsStartIdx = idx
}
slackAttachments = append(slackAttachments, a)
case slackstyle.SlackAttachmentCreator:
if slackArgsStartIdx == -1 {
slackArgsStartIdx = idx
}
slackAttachments = append(slackAttachments, a.SlackAttachment())
}
}
var nonSlackArgs = []interface{}{}
if slackArgsStartIdx > 0 {
nonSlackArgs = args[:slackArgsStartIdx]
}
logrus.Infof(format, nonSlackArgs...)
_, _, err := t.Slack.PostMessageContext(context.Background(), t.InfoChannel,
slack.MsgOptionText(fmt.Sprintf(format, nonSlackArgs...), true),
slack.MsgOptionAttachments(slackAttachments...))
if err != nil {
logrus.WithError(err).Errorf("slack error: %s", err.Error())
}
}
func (t *SlackNotifier) ReportTrade(trade *types.Trade) {
_, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel,
slack.MsgOptionText(util.Render(`:handshake: trade execution @ {{ .Price }}`, trade), true),
slack.MsgOptionAttachments(trade.SlackAttachment()))
if err != nil {
logrus.WithError(err).Error("slack send error")
}
}
func (t *SlackNotifier) ReportPnL(report *ProfitAndLossReport) {
attachment := report.SlackAttachment()
_, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel,
slack.MsgOptionText(util.Render(
`:heavy_dollar_sign: Here is your *{{ .symbol }}* PnL report collected since *{{ .startTime }}*`,
map[string]interface{}{
"symbol": report.Symbol,
"startTime": report.StartTime.Format(time.RFC822),
}), true),
slack.MsgOptionAttachments(attachment))
if err != nil {
logrus.WithError(err).Errorf("slack send error")
}
}

View File

@ -2,25 +2,93 @@ package bbgo
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo/exchange/binance"
"github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/c9s/bbgo/pkg/slack"
)
type Trader struct {
Notifier *slack.SlackNotifier
Notifier *SlackNotifier
// Context is trading Context
Context *TradingContext
Exchange *binance.Exchange
reportTimer *time.Timer
}
type Strategy interface {
Init(trader *Trader, stream *binance.PrivateStream) error
}
func (t *Trader) RunStrategy(ctx context.Context, strategy Strategy) error {
symbol := t.Context.Symbol
stream, err := t.Exchange.NewPrivateStream()
if err != nil {
return err
}
if err := strategy.Init(t, stream); err != nil {
return err
}
t.reportTimer = time.AfterFunc(1*time.Second, func() {
t.ReportPnL()
})
stream.OnTrade(func(trade *types.Trade) {
if trade.Symbol != symbol {
return
}
t.ReportTrade(trade)
t.Context.ProfitAndLossCalculator.AddTrade(*trade)
if t.reportTimer != nil {
t.reportTimer.Stop()
}
t.reportTimer = time.AfterFunc(5*time.Second, func() {
t.ReportPnL()
})
})
stream.OnKLineEvent(func(e *binance.KLineEvent) {
t.Context.SetCurrentPrice(e.KLine.GetClose())
})
var eventC = make(chan interface{}, 20)
if err := stream.Connect(ctx, eventC); err != nil {
return err
}
go func() {
defer stream.Close()
for {
select {
case <-ctx.Done():
return
// drain the event channel
case <-eventC:
}
}
}()
return nil
}
func (t *Trader) Infof(format string, args ...interface{}) {
t.Notifier.Infof(format, args...)
t.Notifier.Notify(format, args...)
}
func (t *Trader) ReportTrade(trade *types.Trade) {
@ -34,7 +102,7 @@ func (t *Trader) ReportPnL() {
}
func (t *Trader) SubmitOrder(ctx context.Context, order *types.Order) {
t.Infof(":memo: Submitting %s order on side %s with volume: %s", order.Type, order.Side, order.VolumeStr, order.SlackAttachment())
t.Notifier.Notify(":memo: Submitting %s order on side %s with volume: %s", order.Type, order.Side, order.VolumeStr, order.SlackAttachment())
err := t.Exchange.SubmitOrder(ctx, order)
if err != nil {

View File

@ -53,5 +53,3 @@ func (t *SlackLogHook) Fire(e *logrus.Entry) error {
return err
}

View File

@ -1,87 +1 @@
package slack
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/slack-go/slack"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/bbgo/types"
"github.com/c9s/bbgo/pkg/slack/slackstyle"
"github.com/c9s/bbgo/pkg/util"
)
type SlackNotifier struct {
Slack *slack.Client
TradingChannel string
ErrorChannel string
InfoChannel string
}
func (t *SlackNotifier) Infof(format string, args ...interface{}) {
var slackAttachments []slack.Attachment = nil
var slackArgsStartIdx = -1
for idx, arg := range args {
switch a := arg.(type) {
// concrete type assert first
case slack.Attachment:
if slackArgsStartIdx == -1 {
slackArgsStartIdx = idx
}
slackAttachments = append(slackAttachments, a)
case slackstyle.SlackAttachmentCreator:
if slackArgsStartIdx == -1 {
slackArgsStartIdx = idx
}
slackAttachments = append(slackAttachments, a.SlackAttachment())
}
}
var nonSlackArgs = []interface{}{}
if slackArgsStartIdx > 0 {
nonSlackArgs = args[:slackArgsStartIdx]
}
log.Infof(format, nonSlackArgs...)
_, _, err := t.Slack.PostMessageContext(context.Background(), t.InfoChannel,
slack.MsgOptionText(fmt.Sprintf(format, nonSlackArgs...), true),
slack.MsgOptionAttachments(slackAttachments...))
if err != nil {
log.WithError(err).Errorf("slack error: %s", err.Error())
}
}
func (t *SlackNotifier) ReportTrade(trade *types.Trade) {
_, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel,
slack.MsgOptionText(util.Render(`:handshake: trade execution @ {{ .Price }}`, trade), true),
slack.MsgOptionAttachments(trade.SlackAttachment()))
if err != nil {
log.WithError(err).Error("slack send error")
}
}
func (t *SlackNotifier) ReportPnL(report *bbgo.ProfitAndLossReport) {
attachment := report.SlackAttachment()
_, _, err := t.Slack.PostMessageContext(context.Background(), t.TradingChannel,
slack.MsgOptionText(util.Render(
`:heavy_dollar_sign: Here is your *{{ .symbol }}* PnL report collected since *{{ .startTime }}*`,
map[string]interface{}{
"symbol": report.Symbol,
"startTime": report.StartTime.Format(time.RFC822),
}), true),
slack.MsgOptionAttachments(attachment))
if err != nil {
log.WithError(err).Errorf("slack send error")
}
}