call tradeCollector process to check trades

This commit is contained in:
c9s 2021-12-27 00:51:57 +08:00
parent c49b9ef276
commit f7c39290a0
3 changed files with 20 additions and 17 deletions

View File

@ -25,6 +25,9 @@ func NewOrderStore(symbol string) *OrderStore {
} }
func (s *OrderStore) AllFilled() bool { func (s *OrderStore) AllFilled() bool {
s.mu.Lock()
defer s.mu.Unlock()
// If any order is new or partially filled, we return false // If any order is new or partially filled, we return false
for _, o := range s.orders { for _, o := range s.orders {
switch o.Status { switch o.Status {
@ -101,14 +104,12 @@ func (s *OrderStore) Update(o types.Order) bool {
func (s *OrderStore) BindStream(stream types.Stream) { func (s *OrderStore) BindStream(stream types.Stream) {
hasSymbol := s.Symbol != "" hasSymbol := s.Symbol != ""
stream.OnOrderUpdate(func(order types.Order) { stream.OnOrderUpdate(func(order types.Order) {
if hasSymbol { // if we have symbol defined, we should filter out the orders that we are not interested in
if order.Symbol != s.Symbol { if hasSymbol && order.Symbol != s.Symbol {
return return
}
s.handleOrderUpdate(order)
} else {
s.handleOrderUpdate(order)
} }
s.handleOrderUpdate(order)
}) })
} }

View File

@ -6,12 +6,13 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types"
) )
// go run ./cmd/bbgo userdatastream --session=ftx // go run ./cmd/bbgo userdatastream --session=ftx
@ -41,16 +42,16 @@ var userDataStreamCmd = &cobra.Command{
s := session.Exchange.NewStream() s := session.Exchange.NewStream()
s.OnOrderUpdate(func(order types.Order) { s.OnOrderUpdate(func(order types.Order) {
log.Infof("orderUpdate: %+v", order) log.Infof("[orderUpdate] %+v", order)
}) })
s.OnTradeUpdate(func(trade types.Trade) { s.OnTradeUpdate(func(trade types.Trade) {
log.Infof("tradeUpdate: %+v", trade) log.Infof("[tradeUpdate] %+v", trade)
}) })
s.OnBalanceUpdate(func(trade types.BalanceMap) { s.OnBalanceUpdate(func(trade types.BalanceMap) {
log.Infof("balanceUpdate: %+v", trade) log.Infof("[balanceUpdate] %+v", trade)
}) })
s.OnBalanceSnapshot(func(trade types.BalanceMap) { s.OnBalanceSnapshot(func(trade types.BalanceMap) {
log.Infof("balanceSnapshot: %+v", trade) log.Infof("[balanceSnapshot] %+v", trade)
}) })
log.Infof("connecting...") log.Infof("connecting...")

View File

@ -769,9 +769,8 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.tradeCollector.OnPositionUpdate(func(position *types.Position) { s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
s.Notifiability.Notify(position) s.Notifiability.Notify(position)
}) })
s.tradeCollector.BindStreamForBackground(s.sourceSession.UserDataStream) s.tradeCollector.BindStream(s.sourceSession.UserDataStream)
s.tradeCollector.BindStreamForBackground(s.makerSession.UserDataStream) s.tradeCollector.BindStream(s.makerSession.UserDataStream)
go s.tradeCollector.Run(ctx)
s.stopC = make(chan struct{}) s.stopC = make(chan struct{})
@ -813,6 +812,8 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
// uncover position = 5 - 3 (covered position) = 2 // uncover position = 5 - 3 (covered position) = 2
// for negative position: // for negative position:
// uncover position = -5 - -3 (covered position) = -2 // uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process()
position := s.state.HedgePosition.AtomicLoad() position := s.state.HedgePosition.AtomicLoad()
uncoverPosition := position - s.state.CoveredPosition.AtomicLoad() uncoverPosition := position - s.state.CoveredPosition.AtomicLoad()
absPos := math.Abs(uncoverPosition.Float64()) absPos := math.Abs(uncoverPosition.Float64())