Merge pull request #1464 from c9s/kbearXD/dca2/run-state-and-recover

FEATURE: [dca2] run state machine
This commit is contained in:
c9s 2023-12-28 17:35:57 +08:00 committed by GitHub
commit 60043d6239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 795 additions and 25 deletions

View File

@ -23,8 +23,9 @@ exchangeStrategies:
dca2: dca2:
symbol: ETHUSDT symbol: ETHUSDT
short: false short: false
budget: 5000 budget: 200
maxOrderNum: 10 maxOrderNum: 5
priceDeviation: 1% priceDeviation: 1%
takeProfitRatio: 1% takeProfitRatio: 0.2%
coolDownInterval: 5m coolDownInterval: 3m
circuitBreakLossThreshold: -0.9

View File

@ -72,7 +72,6 @@ func (g *GetWalletOpenOrdersRequest) GetParameters() (map[string]interface{}, er
// assign parameter of timestamp // assign parameter of timestamp
// convert time.Time to milliseconds time stamp // convert time.Time to milliseconds time stamp
params["timestamp"] = strconv.FormatInt(timestamp.UnixNano()/int64(time.Millisecond), 10) params["timestamp"] = strconv.FormatInt(timestamp.UnixNano()/int64(time.Millisecond), 10)
fmt.Println(params["timestamp"], timestamp)
} else { } else {
} }
// check orderBy field -> json key order_by // check orderBy field -> json key order_by

View File

@ -5,6 +5,7 @@ import (
"strings" "strings"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
) )
func (s *Strategy) debugOrders(submitOrders []types.Order) { func (s *Strategy) debugOrders(submitOrders []types.Order) {
@ -17,3 +18,15 @@ func (s *Strategy) debugOrders(submitOrders []types.Order) {
s.logger.Info(sb.String()) s.logger.Info(sb.String())
} }
func debugRoundOrders(logger *logrus.Entry, roundName string, round Round) {
var sb strings.Builder
sb.WriteString("ROUND " + roundName + " [\n")
sb.WriteString(round.TakeProfitOrder.String() + "\n")
sb.WriteString("------------------------------------------------\n")
for i, order := range round.OpenPositionOrders {
sb.WriteString(fmt.Sprintf("%3d) ", i+1) + order.String() + "\n")
}
sb.WriteString("] END OF ROUND")
logger.Info(sb.String())
}

View File

@ -38,8 +38,6 @@ func newTestStrategy(va ...string) *Strategy {
Market: market, Market: market,
Short: false, Short: false,
TakeProfitRatio: Number("10%"), TakeProfitRatio: Number("10%"),
openPositionSide: types.SideTypeBuy,
takeProfitSide: types.SideTypeSell,
} }
return s return s
} }

View File

@ -0,0 +1,270 @@
package dca2
import (
"context"
"fmt"
"strconv"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type descendingClosedOrderQueryService interface {
QueryClosedOrdersDesc(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) ([]types.Order, error)
}
type RecoverApiQueryService interface {
types.ExchangeOrderQueryService
types.ExchangeTradeService
descendingClosedOrderQueryService
}
func (s *Strategy) recover(ctx context.Context) error {
s.logger.Info("[DCA] recover")
queryService, ok := s.Session.Exchange.(RecoverApiQueryService)
if !ok {
return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.Session.ExchangeName)
}
openOrders, err := queryService.QueryOpenOrders(ctx, s.Symbol)
if err != nil {
return err
}
closedOrders, err := queryService.QueryClosedOrdersDesc(ctx, s.Symbol, time.Time{}, time.Now(), 0)
if err != nil {
return err
}
currentRound, err := getCurrentRoundOrders(s.Short, openOrders, closedOrders, s.OrderGroupID)
if err != nil {
return err
}
debugRoundOrders(s.logger, "current", currentRound)
// recover state
state, err := recoverState(ctx, s.Symbol, s.Short, int(s.MaxOrderNum), openOrders, currentRound, s.OrderExecutor.ActiveMakerOrders(), s.OrderExecutor.OrderStore(), s.OrderGroupID)
if err != nil {
return err
}
// recover position
if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil {
return err
}
// recover budget
budget := recoverBudget(currentRound)
// recover startTimeOfNextRound
startTimeOfNextRound := recoverStartTimeOfNextRound(ctx, currentRound, s.CoolDownInterval)
s.state = state
if !budget.IsZero() {
s.Budget = budget
}
s.startTimeOfNextRound = startTimeOfNextRound
return nil
}
// recover state
func recoverState(ctx context.Context, symbol string, short bool, maxOrderNum int, openOrders []types.Order, currentRound Round, activeOrderBook *bbgo.ActiveOrderBook, orderStore *core.OrderStore, groupID uint32) (State, error) {
if len(currentRound.OpenPositionOrders) == 0 {
// new strategy
return WaitToOpenPosition, nil
}
numOpenOrders := len(openOrders)
// dca stop at take profit order stage
if currentRound.TakeProfitOrder.OrderID != 0 {
if numOpenOrders == 0 {
// current round's take-profit order filled, wait to open next round
return WaitToOpenPosition, nil
}
// check the open orders is take profit order or not
if numOpenOrders == 1 {
if openOrders[0].OrderID == currentRound.TakeProfitOrder.OrderID {
activeOrderBook.Add(openOrders[0])
// current round's take-profit order still opened, wait to fill
return TakeProfitReady, nil
} else {
return None, fmt.Errorf("stop at taking profit stage, but the open order's OrderID is not the take-profit order's OrderID")
}
}
return None, fmt.Errorf("stop at taking profit stage, but the number of open orders is > 1")
}
numOpenPositionOrders := len(currentRound.OpenPositionOrders)
if numOpenPositionOrders > maxOrderNum {
return None, fmt.Errorf("the number of open-position orders is > max order number")
} else if numOpenPositionOrders < maxOrderNum {
// The number of open-position orders should be the same as maxOrderNum
// If not, it may be the following possible cause
// 1. This strategy at position opening, so it may not place all orders we want successfully
// 2. There are some errors when placing open-position orders. e.g. cannot lock fund.....
return None, fmt.Errorf("the number of open-position orders is < max order number")
}
if numOpenOrders > numOpenPositionOrders {
return None, fmt.Errorf("the number of open orders is > the number of open-position orders")
}
if numOpenOrders == numOpenPositionOrders {
activeOrderBook.Add(openOrders...)
orderStore.Add(openOrders...)
return OpenPositionReady, nil
}
var openedCnt, filledCnt, cancelledCnt int64
for _, order := range currentRound.OpenPositionOrders {
switch order.Status {
case types.OrderStatusNew, types.OrderStatusPartiallyFilled:
openedCnt++
case types.OrderStatusFilled:
filledCnt++
case types.OrderStatusCanceled:
cancelledCnt++
default:
return None, fmt.Errorf("there is unexpected status %s of order %s", order.Status, order)
}
}
if filledCnt > 0 && cancelledCnt == 0 {
activeOrderBook.Add(openOrders...)
orderStore.Add(openOrders...)
return OpenPositionOrderFilled, nil
}
if openedCnt > 0 && filledCnt > 0 && cancelledCnt > 0 {
return OpenPositionOrdersCancelling, nil
}
if openedCnt == 0 && filledCnt > 0 && cancelledCnt > 0 {
return OpenPositionOrdersCancelled, nil
}
return None, fmt.Errorf("unexpected order status combination")
}
func recoverPosition(ctx context.Context, position *types.Position, queryService RecoverApiQueryService, currentRound Round) error {
if position == nil {
return nil
}
var positionOrders []types.Order
position.Reset()
if currentRound.TakeProfitOrder.OrderID != 0 {
if !types.IsActiveOrder(currentRound.TakeProfitOrder) {
return nil
}
positionOrders = append(positionOrders, currentRound.TakeProfitOrder)
}
for _, order := range currentRound.OpenPositionOrders {
// no executed quantity order, no need to get trades
if order.ExecutedQuantity.IsZero() {
continue
}
positionOrders = append(positionOrders, order)
}
for _, positionOrder := range positionOrders {
trades, err := queryService.QueryOrderTrades(ctx, types.OrderQuery{
Symbol: position.Symbol,
OrderID: strconv.FormatUint(positionOrder.OrderID, 10),
})
if err != nil {
return fmt.Errorf("failed to get trades of order (%d)", positionOrder.OrderID)
}
position.AddTrades(trades)
}
return nil
}
func recoverBudget(currentRound Round) fixedpoint.Value {
if len(currentRound.OpenPositionOrders) == 0 {
return fixedpoint.Zero
}
total := fixedpoint.Zero
for _, order := range currentRound.OpenPositionOrders {
total = total.Add(order.Quantity.Mul(order.Price))
}
if currentRound.TakeProfitOrder.OrderID != 0 && currentRound.TakeProfitOrder.Status == types.OrderStatusFilled {
total = total.Add(currentRound.TakeProfitOrder.Quantity.Mul(currentRound.TakeProfitOrder.Price))
for _, order := range currentRound.OpenPositionOrders {
total = total.Sub(order.ExecutedQuantity.Mul(order.Price))
}
}
return total
}
func recoverStartTimeOfNextRound(ctx context.Context, currentRound Round, coolDownInterval types.Duration) time.Time {
if currentRound.TakeProfitOrder.OrderID != 0 && currentRound.TakeProfitOrder.Status == types.OrderStatusFilled {
return currentRound.TakeProfitOrder.UpdateTime.Time().Add(coolDownInterval.Duration())
}
return time.Time{}
}
type Round struct {
OpenPositionOrders []types.Order
TakeProfitOrder types.Order
}
func getCurrentRoundOrders(short bool, openOrders, closedOrders []types.Order, groupID uint32) (Round, error) {
openPositionSide := types.SideTypeBuy
takeProfitSide := types.SideTypeSell
if short {
openPositionSide = types.SideTypeSell
takeProfitSide = types.SideTypeBuy
}
var allOrders []types.Order
allOrders = append(allOrders, openOrders...)
allOrders = append(allOrders, closedOrders...)
types.SortOrdersDescending(allOrders)
var currentRound Round
lastSide := takeProfitSide
for _, order := range allOrders {
// group id filter is used for debug when local running
if order.GroupID != groupID {
continue
}
if order.Side == takeProfitSide && lastSide == openPositionSide {
break
}
switch order.Side {
case openPositionSide:
currentRound.OpenPositionOrders = append(currentRound.OpenPositionOrders, order)
case takeProfitSide:
if currentRound.TakeProfitOrder.OrderID != 0 {
return currentRound, fmt.Errorf("there are two take-profit orders in one round, please check it")
}
currentRound.TakeProfitOrder = order
default:
}
lastSide = order.Side
}
return currentRound, nil
}

View File

@ -0,0 +1,233 @@
package dca2
import (
"context"
"math/rand"
"testing"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
)
func generateTestOrder(side types.SideType, status types.OrderStatus, createdAt time.Time) types.Order {
return types.Order{
OrderID: rand.Uint64(),
SubmitOrder: types.SubmitOrder{
Side: side,
},
Status: status,
CreationTime: types.Time(createdAt),
}
}
func Test_GetCurrenctAndLastRoundOrders(t *testing.T) {
t.Run("case 1", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{
generateTestOrder(types.SideTypeSell, types.OrderStatusNew, now),
}
closedOrders := []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-4*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-5*time.Second)),
}
currentRound, err := getCurrentRoundOrders(false, openOrders, closedOrders, 0)
assert.NoError(t, err)
assert.NotEqual(t, 0, currentRound.TakeProfitOrder.OrderID)
assert.Equal(t, 5, len(currentRound.OpenPositionOrders))
})
t.Run("case 2", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{
generateTestOrder(types.SideTypeSell, types.OrderStatusNew, now),
}
closedOrders := []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-4*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-5*time.Second)),
generateTestOrder(types.SideTypeSell, types.OrderStatusFilled, now.Add(-6*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-7*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-8*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-9*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-10*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-11*time.Second)),
generateTestOrder(types.SideTypeSell, types.OrderStatusFilled, now.Add(-12*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-13*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-14*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-15*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-16*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-17*time.Second)),
}
currentRound, err := getCurrentRoundOrders(false, openOrders, closedOrders, 0)
assert.NoError(t, err)
assert.NotEqual(t, 0, currentRound.TakeProfitOrder.OrderID)
assert.Equal(t, 5, len(currentRound.OpenPositionOrders))
})
}
type MockQueryOrders struct {
OpenOrders []types.Order
ClosedOrders []types.Order
}
func (m *MockQueryOrders) QueryOpenOrders(ctx context.Context, symbol string) ([]types.Order, error) {
return m.OpenOrders, nil
}
func (m *MockQueryOrders) QueryClosedOrdersDesc(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) ([]types.Order, error) {
return m.ClosedOrders, nil
}
func Test_RecoverState(t *testing.T) {
symbol := "BTCUSDT"
t.Run("new strategy", func(t *testing.T) {
openOrders := []types.Order{}
currentRound := Round{}
activeOrderBook := bbgo.NewActiveOrderBook(symbol)
orderStore := core.NewOrderStore(symbol)
state, err := recoverState(context.Background(), symbol, false, 5, openOrders, currentRound, activeOrderBook, orderStore, 0)
assert.NoError(t, err)
assert.Equal(t, WaitToOpenPosition, state)
})
t.Run("at open position stage and no filled order", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusPartiallyFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-4*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-5*time.Second)),
}
currentRound := Round{
OpenPositionOrders: openOrders,
}
orderStore := core.NewOrderStore(symbol)
activeOrderBook := bbgo.NewActiveOrderBook(symbol)
state, err := recoverState(context.Background(), symbol, false, 5, openOrders, currentRound, activeOrderBook, orderStore, 0)
assert.NoError(t, err)
assert.Equal(t, OpenPositionReady, state)
})
t.Run("at open position stage and there at least one filled order", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-4*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-5*time.Second)),
}
currentRound := Round{
OpenPositionOrders: []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
openOrders[0],
openOrders[1],
openOrders[2],
openOrders[3],
},
}
orderStore := core.NewOrderStore(symbol)
activeOrderBook := bbgo.NewActiveOrderBook(symbol)
state, err := recoverState(context.Background(), symbol, false, 5, openOrders, currentRound, activeOrderBook, orderStore, 0)
assert.NoError(t, err)
assert.Equal(t, OpenPositionOrderFilled, state)
})
t.Run("open position stage finish, but stop at cancelling", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusNew, now.Add(-5*time.Second)),
}
currentRound := Round{
OpenPositionOrders: []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-4*time.Second)),
openOrders[0],
},
}
orderStore := core.NewOrderStore(symbol)
activeOrderBook := bbgo.NewActiveOrderBook(symbol)
state, err := recoverState(context.Background(), symbol, false, 5, openOrders, currentRound, activeOrderBook, orderStore, 0)
assert.NoError(t, err)
assert.Equal(t, OpenPositionOrdersCancelling, state)
})
t.Run("open-position orders are cancelled", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{}
currentRound := Round{
OpenPositionOrders: []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-4*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-5*time.Second)),
},
}
orderStore := core.NewOrderStore(symbol)
activeOrderBook := bbgo.NewActiveOrderBook(symbol)
state, err := recoverState(context.Background(), symbol, false, 5, openOrders, currentRound, activeOrderBook, orderStore, 0)
assert.NoError(t, err)
assert.Equal(t, OpenPositionOrdersCancelled, state)
})
t.Run("at take profit stage, and not filled yet", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{
generateTestOrder(types.SideTypeSell, types.OrderStatusNew, now),
}
currentRound := Round{
TakeProfitOrder: openOrders[0],
OpenPositionOrders: []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-4*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-5*time.Second)),
},
}
orderStore := core.NewOrderStore(symbol)
activeOrderBook := bbgo.NewActiveOrderBook(symbol)
state, err := recoverState(context.Background(), symbol, false, 5, openOrders, currentRound, activeOrderBook, orderStore, 0)
assert.NoError(t, err)
assert.Equal(t, TakeProfitReady, state)
})
t.Run("at take profit stage, take-profit order filled", func(t *testing.T) {
now := time.Now()
openOrders := []types.Order{}
currentRound := Round{
TakeProfitOrder: generateTestOrder(types.SideTypeSell, types.OrderStatusFilled, now),
OpenPositionOrders: []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-3*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-4*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-5*time.Second)),
},
}
orderStore := core.NewOrderStore(symbol)
activeOrderBook := bbgo.NewActiveOrderBook(symbol)
state, err := recoverState(context.Background(), symbol, false, 5, openOrders, currentRound, activeOrderBook, orderStore, 0)
assert.NoError(t, err)
assert.Equal(t, WaitToOpenPosition, state)
})
}

195
pkg/strategy/dca2/state.go Normal file
View File

@ -0,0 +1,195 @@
package dca2
import (
"context"
"time"
)
type State int64
const (
None State = iota
WaitToOpenPosition
PositionOpening
OpenPositionReady
OpenPositionOrderFilled
OpenPositionOrdersCancelling
OpenPositionOrdersCancelled
TakeProfitReady
)
var stateTransition map[State]State = map[State]State{
WaitToOpenPosition: PositionOpening,
PositionOpening: OpenPositionReady,
OpenPositionReady: OpenPositionOrderFilled,
OpenPositionOrderFilled: OpenPositionOrdersCancelling,
OpenPositionOrdersCancelling: OpenPositionOrdersCancelled,
OpenPositionOrdersCancelled: TakeProfitReady,
TakeProfitReady: WaitToOpenPosition,
}
func (s *Strategy) initializeNextStateC() bool {
s.mu.Lock()
defer s.mu.Unlock()
isInitialize := false
if s.nextStateC == nil {
s.logger.Info("[DCA] initializing next state channel")
s.nextStateC = make(chan State, 1)
} else {
s.logger.Info("[DCA] nextStateC is already initialized")
isInitialize = true
}
return isInitialize
}
func (s *Strategy) emitNextState(nextState State) {
select {
case s.nextStateC <- nextState:
default:
s.logger.Info("[DCA] nextStateC is full or not initialized")
}
}
// runState
// WaitToOpenPosition -> after startTimeOfNextRound, place dca orders ->
// PositionOpening
// OpenPositionReady -> any dca maker order filled ->
// OpenPositionOrderFilled -> price hit the take profit ration, start cancelling ->
// OpenPositionOrdersCancelled -> place the takeProfit order ->
// TakeProfitReady -> the takeProfit order filled ->
func (s *Strategy) runState(ctx context.Context) {
s.logger.Info("[DCA] runState")
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.logger.Info("[DCA] runState DONE")
return
case <-ticker.C:
s.triggerNextState()
case nextState := <-s.nextStateC:
s.logger.Infof("[DCA] currenct state: %d, next state: %d", s.state, nextState)
// check the next state is valid
validNextState, exist := stateTransition[s.state]
if !exist {
s.logger.Warnf("[DCA] %d not in stateTransition", s.state)
continue
}
if nextState != validNextState {
s.logger.Warnf("[DCA] %d is not valid next state of curreny state %d", nextState, s.state)
}
// move to next state
switch s.state {
case WaitToOpenPosition:
s.runWaitToOpenPositionState(ctx, nextState)
case PositionOpening:
s.runPositionOpening(ctx, nextState)
case OpenPositionReady:
s.runOpenPositionReady(ctx, nextState)
case OpenPositionOrderFilled:
s.runOpenPositionOrderFilled(ctx, nextState)
case OpenPositionOrdersCancelling:
s.runOpenPositionOrdersCancelling(ctx, nextState)
case OpenPositionOrdersCancelled:
s.runOpenPositionOrdersCancelled(ctx, nextState)
case TakeProfitReady:
s.runTakeProfitReady(ctx, nextState)
}
}
}
}
func (s *Strategy) triggerNextState() {
switch s.state {
case OpenPositionReady:
// only trigger from order filled event
case OpenPositionOrderFilled:
// only trigger from kline event
case TakeProfitReady:
// only trigger from order filled event
default:
if nextState, ok := stateTransition[s.state]; ok {
s.nextStateC <- nextState
}
}
}
func (s *Strategy) runWaitToOpenPositionState(_ context.Context, next State) {
s.logger.Info("[State] WaitToOpenPosition - check startTimeOfNextRound")
if time.Now().Before(s.startTimeOfNextRound) {
return
}
s.state = PositionOpening
s.logger.Info("[State] WaitToOpenPosition -> PositionOpening")
}
func (s *Strategy) runPositionOpening(ctx context.Context, next State) {
s.logger.Info("[State] PositionOpening - start placing open-position orders")
if err := s.placeOpenPositionOrders(ctx); err != nil {
s.logger.WithError(err).Error("failed to place dca orders, please check it.")
return
}
s.state = OpenPositionReady
s.logger.Info("[State] PositionOpening -> OpenPositionReady")
}
func (s *Strategy) runOpenPositionReady(_ context.Context, next State) {
s.state = OpenPositionOrderFilled
s.logger.Info("[State] OpenPositionReady -> OpenPositionOrderFilled")
}
func (s *Strategy) runOpenPositionOrderFilled(_ context.Context, next State) {
s.state = OpenPositionOrdersCancelling
s.logger.Info("[State] OpenPositionOrderFilled -> OpenPositionOrdersCancelling")
// after open position cancelling, immediately trigger open position cancelled to cancel the other orders
s.nextStateC <- OpenPositionOrdersCancelled
}
func (s *Strategy) runOpenPositionOrdersCancelling(ctx context.Context, next State) {
s.logger.Info("[State] OpenPositionOrdersCancelling - start cancelling open-position orders")
if err := s.cancelOpenPositionOrders(ctx); err != nil {
s.logger.WithError(err).Error("failed to cancel maker orders")
return
}
s.state = OpenPositionOrdersCancelled
s.logger.Info("[State] OpenPositionOrdersCancelling -> OpenPositionOrdersCancelled")
// after open position cancelled, immediately trigger take profit ready to open take-profit order
s.nextStateC <- TakeProfitReady
}
func (s *Strategy) runOpenPositionOrdersCancelled(ctx context.Context, next State) {
s.logger.Info("[State] OpenPositionOrdersCancelled - start placing take-profit orders")
if err := s.placeTakeProfitOrders(ctx); err != nil {
s.logger.WithError(err).Error("failed to open take profit orders")
return
}
s.state = TakeProfitReady
s.logger.Info("[State] OpenPositionOrdersCancelled -> TakeProfitReady")
}
func (s *Strategy) runTakeProfitReady(_ context.Context, next State) {
s.logger.Info("[State] TakeProfitReady - start reseting position and calculate budget for next round")
if s.Short {
s.Budget = s.Budget.Add(s.Position.Base)
} else {
s.Budget = s.Budget.Add(s.Position.Quote)
}
// reset position
s.Position.Reset()
// set the start time of the next round
s.startTimeOfNextRound = time.Now().Add(s.CoolDownInterval.Duration())
s.state = WaitToOpenPosition
s.logger.Info("[State] TakeProfitReady -> WaitToOpenPosition")
}

View File

@ -50,10 +50,10 @@ type Strategy struct {
// private field // private field
mu sync.Mutex mu sync.Mutex
openPositionSide types.SideType
takeProfitSide types.SideType
takeProfitPrice fixedpoint.Value takeProfitPrice fixedpoint.Value
startTimeOfNextRound time.Time startTimeOfNextRound time.Time
nextStateC chan State
state State
} }
func (s *Strategy) ID() string { func (s *Strategy) ID() string {
@ -105,33 +105,77 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.Strategy.Initialize(ctx, s.Environment, session, s.Market, ID, s.InstanceID()) s.Strategy.Initialize(ctx, s.Environment, session, s.Market, ID, s.InstanceID())
instanceID := s.InstanceID() instanceID := s.InstanceID()
if s.Short {
s.openPositionSide = types.SideTypeSell
s.takeProfitSide = types.SideTypeBuy
} else {
s.openPositionSide = types.SideTypeBuy
s.takeProfitSide = types.SideTypeSell
}
if s.OrderGroupID == 0 { if s.OrderGroupID == 0 {
s.OrderGroupID = util.FNV32(instanceID) % math.MaxInt32 s.OrderGroupID = util.FNV32(instanceID) % math.MaxInt32
} }
// order executor // order executor
s.OrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.OrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
s.logger.Infof("position: %s", s.Position.String()) s.logger.Infof("[DCA] POSITION UPDATE: %s", s.Position.String())
bbgo.Sync(ctx, s) bbgo.Sync(ctx, s)
// update take profit price here // update take profit price here
s.updateTakeProfitPrice()
})
s.OrderExecutor.ActiveMakerOrders().OnFilled(func(o types.Order) {
s.logger.Infof("[DCA] FILLED ORDER: %s", o.String())
openPositionSide := types.SideTypeBuy
takeProfitSide := types.SideTypeSell
if s.Short {
openPositionSide = types.SideTypeSell
takeProfitSide = types.SideTypeBuy
}
switch o.Side {
case openPositionSide:
s.emitNextState(OpenPositionOrderFilled)
case takeProfitSide:
s.emitNextState(WaitToOpenPosition)
default:
s.logger.Infof("[DCA] unsupported side (%s) of order: %s", o.Side, o)
}
}) })
session.MarketDataStream.OnKLine(func(kline types.KLine) { session.MarketDataStream.OnKLine(func(kline types.KLine) {
// check price here // check price here
if s.state != OpenPositionOrderFilled {
return
}
compRes := kline.Close.Compare(s.takeProfitPrice)
// price doesn't hit the take profit price
if (s.Short && compRes > 0) || (!s.Short && compRes < 0) {
return
}
s.emitNextState(OpenPositionOrdersCancelling)
}) })
session.UserDataStream.OnAuth(func() { session.UserDataStream.OnAuth(func() {
s.logger.Info("user data stream authenticated, start the process") s.logger.Info("[DCA] user data stream authenticated")
// decide state here time.AfterFunc(3*time.Second, func() {
if isInitialize := s.initializeNextStateC(); !isInitialize {
// recover
if err := s.recover(ctx); err != nil {
s.logger.WithError(err).Error("[DCA] something wrong when state recovering")
return
}
s.logger.Infof("[DCA] recovered state: %d", s.state)
s.logger.Infof("[DCA] recovered position %s", s.Position.String())
s.logger.Infof("[DCA] recovered budget %s", s.Budget)
s.logger.Infof("[DCA] recovered startTimeOfNextRound %s", s.startTimeOfNextRound)
s.updateTakeProfitPrice()
// store persistence
bbgo.Sync(ctx, s)
// start running state machine
s.runState(ctx)
}
})
}) })
balances, err := session.Exchange.QueryAccountBalances(ctx) balances, err := session.Exchange.QueryAccountBalances(ctx)
@ -146,3 +190,12 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
return nil return nil
} }
func (s *Strategy) updateTakeProfitPrice() {
takeProfitRatio := s.TakeProfitRatio
if s.Short {
takeProfitRatio = takeProfitRatio.Neg()
}
s.takeProfitPrice = s.Market.TruncatePrice(s.Position.AverageCost.Mul(fixedpoint.One.Add(takeProfitRatio)))
s.logger.Infof("[DCA] cost: %s, ratio: %s, price: %s", s.Position.AverageCost, takeProfitRatio, s.takeProfitPrice)
}

View File

@ -20,6 +20,14 @@ func SortOrdersAscending(orders []Order) []Order {
return orders return orders
} }
// SortOrdersDescending sorts by creation time descending-ly
func SortOrdersDescending(orders []Order) []Order {
sort.Slice(orders, func(i, j int) bool {
return orders[i].CreationTime.Time().After(orders[j].CreationTime.Time())
})
return orders
}
// SortOrdersByPrice sorts by creation time ascending-ly // SortOrdersByPrice sorts by creation time ascending-ly
func SortOrdersByPrice(orders []Order, descending bool) []Order { func SortOrdersByPrice(orders []Order, descending bool) []Order {
var f func(i, j int) bool var f func(i, j int) bool