Merge pull request #1697 from c9s/c9s/refactor-twap

FEATURE: redesign and refactor twap order executor
This commit is contained in:
c9s 2024-08-20 14:24:48 +08:00 committed by GitHub
commit d1617b6a0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1528 additions and 80 deletions

View File

@ -27,7 +27,9 @@ var quantityReduceDelta = fixedpoint.NewFromFloat(0.005)
// This is for the maximum retries // This is for the maximum retries
const submitOrderRetryLimit = 5 const submitOrderRetryLimit = 5
// BaseOrderExecutor provides the common accessors for order executor
type BaseOrderExecutor struct { type BaseOrderExecutor struct {
exchange types.Exchange
session *ExchangeSession session *ExchangeSession
activeMakerOrders *ActiveOrderBook activeMakerOrders *ActiveOrderBook
orderStore *core.OrderStore orderStore *core.OrderStore
@ -43,8 +45,8 @@ func (e *BaseOrderExecutor) ActiveMakerOrders() *ActiveOrderBook {
// GracefulCancel cancels all active maker orders if orders are not given, otherwise cancel all the given orders // GracefulCancel cancels all active maker orders if orders are not given, otherwise cancel all the given orders
func (e *BaseOrderExecutor) GracefulCancel(ctx context.Context, orders ...types.Order) error { func (e *BaseOrderExecutor) GracefulCancel(ctx context.Context, orders ...types.Order) error {
if err := e.activeMakerOrders.GracefulCancel(ctx, e.session.Exchange, orders...); err != nil { if err := e.activeMakerOrders.GracefulCancel(ctx, e.exchange, orders...); err != nil {
return errors.Wrap(err, "graceful cancel error") return errors.Wrap(err, "graceful cancel order error")
} }
return nil return nil
@ -84,6 +86,7 @@ func NewGeneralOrderExecutor(
executor := &GeneralOrderExecutor{ executor := &GeneralOrderExecutor{
BaseOrderExecutor: BaseOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{
session: session, session: session,
exchange: session.Exchange,
activeMakerOrders: NewActiveOrderBook(symbol), activeMakerOrders: NewActiveOrderBook(symbol),
orderStore: orderStore, orderStore: orderStore,
}, },
@ -111,7 +114,7 @@ func (e *GeneralOrderExecutor) SetMaxRetries(maxRetries uint) {
} }
func (e *GeneralOrderExecutor) startMarginAssetUpdater(ctx context.Context) { func (e *GeneralOrderExecutor) startMarginAssetUpdater(ctx context.Context) {
marginService, ok := e.session.Exchange.(types.MarginBorrowRepayService) marginService, ok := e.exchange.(types.MarginBorrowRepayService)
if !ok { if !ok {
log.Warnf("session %s (%T) exchange does not support MarginBorrowRepayService", e.session.Name, e.session.Exchange) log.Warnf("session %s (%T) exchange does not support MarginBorrowRepayService", e.session.Name, e.session.Exchange)
return return

View File

@ -22,6 +22,7 @@ func NewSimpleOrderExecutor(session *ExchangeSession) *SimpleOrderExecutor {
return &SimpleOrderExecutor{ return &SimpleOrderExecutor{
BaseOrderExecutor: BaseOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{
session: session, session: session,
exchange: session.Exchange,
activeMakerOrders: NewActiveOrderBook(""), activeMakerOrders: NewActiveOrderBook(""),
orderStore: core.NewOrderStore(""), orderStore: core.NewOrderStore(""),
}, },

View File

@ -14,6 +14,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/twap"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
@ -255,7 +256,7 @@ var executeOrderCmd = &cobra.Command{
executionCtx, cancelExecution := context.WithCancel(ctx) executionCtx, cancelExecution := context.WithCancel(ctx)
defer cancelExecution() defer cancelExecution()
execution := &bbgo.TwapExecution{ execution := &twap.StreamExecutor{
Session: session, Session: session,
Symbol: symbol, Symbol: symbol,
Side: side, Side: side,

View File

@ -1006,13 +1006,15 @@ func (e *Exchange) submitMarginOrder(ctx context.Context, order types.SubmitOrde
} }
// could be IOC or FOK // could be IOC or FOK
if len(order.TimeInForce) > 0 { switch order.Type {
// TODO: check the TimeInForce value case types.OrderTypeLimit, types.OrderTypeStopLimit:
req.TimeInForce(binance.TimeInForceType(order.TimeInForce)) req.TimeInForce(binance.TimeInForceTypeGTC)
} else { case types.OrderTypeLimitMaker:
switch order.Type { // do not set TimeInForce for LimitMaker
case types.OrderTypeLimit, types.OrderTypeStopLimit: default:
req.TimeInForce(binance.TimeInForceTypeGTC) if len(order.TimeInForce) > 0 {
// TODO: check the TimeInForce value
req.TimeInForce(binance.TimeInForceType(order.TimeInForce))
} }
} }

View File

@ -23,7 +23,7 @@ func Test_ModifiedQuantity(t *testing.T) {
BaseCurrency: "BTC", BaseCurrency: "BTC",
}, },
} }
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, "BTCUSDT", "strategy", "strategy-1", pos) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, "BTCUSDT", "strategy", "strategy-1", pos)
riskControl := NewPositionRiskControl(orderExecutor, fixedpoint.NewFromInt(10), fixedpoint.NewFromInt(2)) riskControl := NewPositionRiskControl(orderExecutor, fixedpoint.NewFromInt(10), fixedpoint.NewFromInt(2))
cases := []struct { cases := []struct {

View File

@ -5,13 +5,14 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2" indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
) )
const ID = "autobuy" const ID = "autobuy"
@ -128,7 +129,7 @@ func (s *Strategy) autobuy(ctx context.Context) {
} }
side := types.SideTypeBuy side := types.SideTypeBuy
price := s.PriceType.Map(ticker, side) price := s.PriceType.GetPrice(ticker, side)
if price.Float64() > s.boll.UpBand.Last(0) { if price.Float64() > s.boll.UpBand.Last(0) {
log.Infof("price %s is higher than upper band %f, skip", price.String(), s.boll.UpBand.Last(0)) log.Infof("price %s is higher than upper band %f, skip", price.String(), s.boll.UpBand.Last(0))

View File

@ -6,9 +6,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
) )
func generateTestOrder(side types.SideType, status types.OrderStatus, createdAt time.Time) types.Order { func generateTestOrder(side types.SideType, status types.OrderStatus, createdAt time.Time) types.Order {
@ -29,7 +30,7 @@ func Test_RecoverState(t *testing.T) {
t.Run("new strategy", func(t *testing.T) { t.Run("new strategy", func(t *testing.T) {
currentRound := Round{} currentRound := Round{}
position := types.NewPositionFromMarket(strategy.Market) position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, WaitToOpenPosition, state) assert.Equal(t, WaitToOpenPosition, state)
@ -47,7 +48,7 @@ func Test_RecoverState(t *testing.T) {
}, },
} }
position := types.NewPositionFromMarket(strategy.Market) position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, OpenPositionReady, state) assert.Equal(t, OpenPositionReady, state)
@ -65,7 +66,7 @@ func Test_RecoverState(t *testing.T) {
}, },
} }
position := types.NewPositionFromMarket(strategy.Market) position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, OpenPositionOrderFilled, state) assert.Equal(t, OpenPositionOrderFilled, state)
@ -83,7 +84,7 @@ func Test_RecoverState(t *testing.T) {
}, },
} }
position := types.NewPositionFromMarket(strategy.Market) position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, OpenPositionOrdersCancelling, state) assert.Equal(t, OpenPositionOrdersCancelling, state)
@ -101,7 +102,7 @@ func Test_RecoverState(t *testing.T) {
}, },
} }
position := types.NewPositionFromMarket(strategy.Market) position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, OpenPositionOrdersCancelling, state) assert.Equal(t, OpenPositionOrdersCancelling, state)
@ -122,7 +123,7 @@ func Test_RecoverState(t *testing.T) {
}, },
} }
position := types.NewPositionFromMarket(strategy.Market) position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, TakeProfitReady, state) assert.Equal(t, TakeProfitReady, state)
@ -143,7 +144,7 @@ func Test_RecoverState(t *testing.T) {
}, },
} }
position := types.NewPositionFromMarket(strategy.Market) position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, WaitToOpenPosition, state) assert.Equal(t, WaitToOpenPosition, state)

View File

@ -262,7 +262,7 @@ func (s *Strategy) generateOrder(ctx context.Context) (*types.SubmitOrder, error
} }
quantity = market.RoundDownQuantityByPrecision(quantity) quantity = market.RoundDownQuantityByPrecision(quantity)
price := s.PriceType.Map(ticker, side) price := s.PriceType.GetPrice(ticker, side)
if s.MaxAmount.Float64() > 0 { if s.MaxAmount.Float64() > 0 {
quantity = bbgo.AdjustQuantityByMaxAmount(quantity, price, s.MaxAmount) quantity = bbgo.AdjustQuantityByMaxAmount(quantity, price, s.MaxAmount)

View File

@ -1,4 +1,4 @@
package bbgo package twap
import ( import (
"context" "context"
@ -7,16 +7,18 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
type TwapExecution struct { // StreamExecutor is a TWAP execution that places orders on the best price by connecting to the real time market data stream.
Session *ExchangeSession type StreamExecutor struct {
Session *bbgo.ExchangeSession
Symbol string Symbol string
Side types.SideType Side types.SideType
TargetQuantity fixedpoint.Value TargetQuantity fixedpoint.Value
@ -37,7 +39,7 @@ type TwapExecution struct {
currentPrice fixedpoint.Value currentPrice fixedpoint.Value
activePosition fixedpoint.Value activePosition fixedpoint.Value
activeMakerOrders *ActiveOrderBook activeMakerOrders *bbgo.ActiveOrderBook
orderStore *core.OrderStore orderStore *core.OrderStore
position *types.Position position *types.Position
@ -46,26 +48,24 @@ type TwapExecution struct {
stoppedC chan struct{} stoppedC chan struct{}
state int
mu sync.Mutex mu sync.Mutex
} }
func (e *TwapExecution) connectMarketData(ctx context.Context) { func (e *StreamExecutor) connectMarketData(ctx context.Context) {
log.Infof("connecting market data stream...") logrus.Infof("connecting market data stream...")
if err := e.marketDataStream.Connect(ctx); err != nil { if err := e.marketDataStream.Connect(ctx); err != nil {
log.WithError(err).Errorf("market data stream connect error") logrus.WithError(err).Errorf("market data stream connect error")
} }
} }
func (e *TwapExecution) connectUserData(ctx context.Context) { func (e *StreamExecutor) connectUserData(ctx context.Context) {
log.Infof("connecting user data stream...") logrus.Infof("connecting user data stream...")
if err := e.userDataStream.Connect(ctx); err != nil { if err := e.userDataStream.Connect(ctx); err != nil {
log.WithError(err).Errorf("user data stream connect error") logrus.WithError(err).Errorf("user data stream connect error")
} }
} }
func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { func (e *StreamExecutor) newBestPriceOrder() (orderForm types.SubmitOrder, err error) {
book := e.orderBook.Copy() book := e.orderBook.Copy()
sideBook := book.SideBook(e.Side) sideBook := book.SideBook(e.Side)
@ -111,7 +111,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
switch e.Side { switch e.Side {
case types.SideTypeSell: case types.SideTypeSell:
if newPrice.Compare(e.StopPrice) < 0 { if newPrice.Compare(e.StopPrice) < 0 {
log.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s",
e.Symbol, e.Symbol,
newPrice.String(), newPrice.String(),
e.StopPrice.String(), e.StopPrice.String(),
@ -121,7 +121,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
case types.SideTypeBuy: case types.SideTypeBuy:
if newPrice.Compare(e.StopPrice) > 0 { if newPrice.Compare(e.StopPrice) > 0 {
log.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s",
e.Symbol, e.Symbol,
newPrice.String(), newPrice.String(),
e.StopPrice.String(), e.StopPrice.String(),
@ -157,7 +157,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
} }
minNotional := e.market.MinNotional minNotional := e.market.MinNotional
orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional)
switch e.Side { switch e.Side {
case types.SideTypeSell: case types.SideTypeSell:
@ -169,11 +169,11 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
case types.SideTypeBuy: case types.SideTypeBuy:
// check base balance for sell, try to sell as more as possible // check base balance for sell, try to sell as more as possible
if b, ok := e.Session.GetAccount().Balance(e.market.QuoteCurrency); ok { if b, ok := e.Session.GetAccount().Balance(e.market.QuoteCurrency); ok {
orderQuantity = AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available)
} }
} }
if e.DeadlineTime != emptyTime { if !e.DeadlineTime.IsZero() {
now := time.Now() now := time.Now()
if now.After(e.DeadlineTime) { if now.After(e.DeadlineTime) {
orderForm = types.SubmitOrder{ orderForm = types.SubmitOrder{
@ -200,7 +200,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
return orderForm, err return orderForm, err
} }
func (e *TwapExecution) updateOrder(ctx context.Context) error { func (e *StreamExecutor) updateOrder(ctx context.Context) error {
book := e.orderBook.Copy() book := e.orderBook.Copy()
sideBook := book.SideBook(e.Side) sideBook := book.SideBook(e.Side)
@ -224,7 +224,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
orders := e.activeMakerOrders.Orders() orders := e.activeMakerOrders.Orders()
if len(orders) > 1 { if len(orders) > 1 {
log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) logrus.Warnf("more than 1 %s open orders in the strategy...", e.Symbol)
} }
// get the first order // get the first order
@ -234,7 +234,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity) remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity)
if remainingQuantity.Compare(e.market.MinQuantity) <= 0 { if remainingQuantity.Compare(e.market.MinQuantity) <= 0 {
log.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String())
return nil return nil
} }
@ -243,24 +243,24 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
// DO NOT UPDATE IF: // DO NOT UPDATE IF:
// tickSpread > 0 AND current order price == second price + tickSpread // tickSpread > 0 AND current order price == second price + tickSpread
// current order price == first price // current order price == first price
log.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String())
switch e.Side { switch e.Side {
case types.SideTypeBuy: case types.SideTypeBuy:
if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) { if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) {
log.Infof("the current order is already on the best ask price %s", orderPrice.String()) logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
return nil return nil
} else if orderPrice == first.Price { } else if orderPrice == first.Price {
log.Infof("the current order is already on the best bid price %s", orderPrice.String()) logrus.Infof("the current order is already on the best bid price %s", orderPrice.String())
return nil return nil
} }
case types.SideTypeSell: case types.SideTypeSell:
if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) { if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) {
log.Infof("the current order is already on the best ask price %s", orderPrice.String()) logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
return nil return nil
} else if orderPrice == first.Price { } else if orderPrice == first.Price {
log.Infof("the current order is already on the best ask price %s", orderPrice.String()) logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
return nil return nil
} }
} }
@ -283,13 +283,13 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
return nil return nil
} }
func (e *TwapExecution) cancelActiveOrders() { func (e *StreamExecutor) cancelActiveOrders() {
gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second) gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer gracefulCancel() defer gracefulCancel()
e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange) e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange)
} }
func (e *TwapExecution) orderUpdater(ctx context.Context) { func (e *StreamExecutor) orderUpdater(ctx context.Context) {
updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1)
ticker := time.NewTimer(e.UpdateInterval) ticker := time.NewTimer(e.UpdateInterval)
defer ticker.Stop() defer ticker.Stop()
@ -317,9 +317,9 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
return return
} }
log.Infof("%s order book changed, checking order...", e.Symbol) logrus.Infof("%s order book changed, checking order...", e.Symbol)
if err := e.updateOrder(ctx); err != nil { if err := e.updateOrder(ctx); err != nil {
log.WithError(err).Errorf("order update failed") logrus.WithError(err).Errorf("order update failed")
} }
case <-ticker.C: case <-ticker.C:
@ -332,25 +332,26 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
} }
if err := e.updateOrder(ctx); err != nil { if err := e.updateOrder(ctx); err != nil {
log.WithError(err).Errorf("order update failed") logrus.WithError(err).Errorf("order update failed")
} }
} }
} }
} }
func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool { func (e *StreamExecutor) cancelContextIfTargetQuantityFilled() bool {
base := e.position.GetBase() base := e.position.GetBase()
if base.Abs().Compare(e.TargetQuantity) >= 0 { if base.Abs().Compare(e.TargetQuantity) >= 0 {
log.Infof("filled target quantity, canceling the order execution context") logrus.Infof("filled target quantity, canceling the order execution context")
e.cancelExecution() e.cancelExecution()
return true return true
} }
return false return false
} }
func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { func (e *StreamExecutor) handleTradeUpdate(trade types.Trade) {
// ignore trades that are not in the symbol we interested // ignore trades that are not in the symbol we interested
if trade.Symbol != e.Symbol { if trade.Symbol != e.Symbol {
return return
@ -360,21 +361,21 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) {
return return
} }
log.Info(trade.String()) logrus.Info(trade.String())
e.position.AddTrade(trade) e.position.AddTrade(trade)
log.Infof("position updated: %+v", e.position) logrus.Infof("position updated: %+v", e.position)
} }
func (e *TwapExecution) handleFilledOrder(order types.Order) { func (e *StreamExecutor) handleFilledOrder(order types.Order) {
log.Info(order.String()) logrus.Info(order.String())
// filled event triggers the order removal from the active order store // filled event triggers the order removal from the active order store
// we need to ensure we received every order update event before the execution is done. // we need to ensure we received every order update event before the execution is done.
e.cancelContextIfTargetQuantityFilled() e.cancelContextIfTargetQuantityFilled()
} }
func (e *TwapExecution) Run(parentCtx context.Context) error { func (e *StreamExecutor) Run(parentCtx context.Context) error {
e.mu.Lock() e.mu.Lock()
e.stoppedC = make(chan struct{}) e.stoppedC = make(chan struct{})
e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx)
@ -397,10 +398,10 @@ func (e *TwapExecution) Run(parentCtx context.Context) error {
e.orderBook = types.NewStreamBook(e.Symbol) e.orderBook = types.NewStreamBook(e.Symbol)
e.orderBook.BindStream(e.marketDataStream) e.orderBook.BindStream(e.marketDataStream)
go e.connectMarketData(e.executionCtx)
e.userDataStream = e.Session.Exchange.NewStream() e.userDataStream = e.Session.Exchange.NewStream()
e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) e.userDataStream.OnTradeUpdate(e.handleTradeUpdate)
e.position = &types.Position{ e.position = &types.Position{
Symbol: e.Symbol, Symbol: e.Symbol,
BaseCurrency: e.market.BaseCurrency, BaseCurrency: e.market.BaseCurrency,
@ -409,16 +410,17 @@ func (e *TwapExecution) Run(parentCtx context.Context) error {
e.orderStore = core.NewOrderStore(e.Symbol) e.orderStore = core.NewOrderStore(e.Symbol)
e.orderStore.BindStream(e.userDataStream) e.orderStore.BindStream(e.userDataStream)
e.activeMakerOrders = NewActiveOrderBook(e.Symbol) e.activeMakerOrders = bbgo.NewActiveOrderBook(e.Symbol)
e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.OnFilled(e.handleFilledOrder)
e.activeMakerOrders.BindStream(e.userDataStream) e.activeMakerOrders.BindStream(e.userDataStream)
go e.connectMarketData(e.executionCtx)
go e.connectUserData(e.userDataStreamCtx) go e.connectUserData(e.userDataStreamCtx)
go e.orderUpdater(e.executionCtx) go e.orderUpdater(e.executionCtx)
return nil return nil
} }
func (e *TwapExecution) emitDone() { func (e *StreamExecutor) emitDone() {
e.mu.Lock() e.mu.Lock()
if e.stoppedC == nil { if e.stoppedC == nil {
e.stoppedC = make(chan struct{}) e.stoppedC = make(chan struct{})
@ -427,7 +429,7 @@ func (e *TwapExecution) emitDone() {
e.mu.Unlock() e.mu.Unlock()
} }
func (e *TwapExecution) Done() (c <-chan struct{}) { func (e *StreamExecutor) Done() (c <-chan struct{}) {
e.mu.Lock() e.mu.Lock()
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel // if the channel is not allocated, it means it's not started yet, we need to return a closed channel
if e.stoppedC == nil { if e.stoppedC == nil {
@ -447,7 +449,7 @@ func (e *TwapExecution) Done() (c <-chan struct{}) {
// We need to: // We need to:
// 1. stop the order updater (by using the execution context) // 1. stop the order updater (by using the execution context)
// 2. the order updater cancels all open orders and close the user data stream // 2. the order updater cancels all open orders and close the user data stream
func (e *TwapExecution) Shutdown(shutdownCtx context.Context) { func (e *StreamExecutor) Shutdown(shutdownCtx context.Context) {
e.mu.Lock() e.mu.Lock()
if e.cancelExecution != nil { if e.cancelExecution != nil {
e.cancelExecution() e.cancelExecution()

53
pkg/twap/v2/bbomonitor.go Normal file
View File

@ -0,0 +1,53 @@
package twap
import (
"time"
"github.com/c9s/bbgo/pkg/types"
)
// BboMonitor monitors the best bid and ask price and volume.
//
//go:generate callbackgen -type BboMonitor
type BboMonitor struct {
Bid types.PriceVolume
Ask types.PriceVolume
UpdatedTime time.Time
updateCallbacks []func(bid, ask types.PriceVolume)
}
func NewBboMonitor() *BboMonitor {
return &BboMonitor{}
}
func (m *BboMonitor) OnUpdateFromBook(book *types.StreamOrderBook) bool {
bestBid, ok1 := book.BestBid()
bestAsk, ok2 := book.BestAsk()
if !ok1 || !ok2 {
return false
}
return m.Update(bestBid, bestAsk, book.LastUpdateTime())
}
func (m *BboMonitor) Update(bid, ask types.PriceVolume, t time.Time) bool {
changed := false
if m.Bid.Price.Compare(bid.Price) != 0 || m.Bid.Volume.Compare(bid.Volume) != 0 {
changed = true
}
if m.Ask.Price.Compare(ask.Price) != 0 || m.Ask.Volume.Compare(ask.Volume) != 0 {
changed = true
}
m.Bid = bid
m.Ask = ask
m.UpdatedTime = t
if changed {
m.EmitUpdate(bid, ask)
}
return changed
}

View File

@ -0,0 +1,17 @@
// Code generated by "callbackgen -type BboMonitor"; DO NOT EDIT.
package twap
import (
"github.com/c9s/bbgo/pkg/types"
)
func (m *BboMonitor) OnUpdate(cb func(bid types.PriceVolume, ask types.PriceVolume)) {
m.updateCallbacks = append(m.updateCallbacks, cb)
}
func (m *BboMonitor) EmitUpdate(bid types.PriceVolume, ask types.PriceVolume) {
for _, cb := range m.updateCallbacks {
cb(bid, ask)
}
}

View File

@ -0,0 +1,634 @@
package twap
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
var defaultUpdateInterval = time.Minute
type DoneSignal struct {
doneC chan struct{}
mu sync.Mutex
}
func NewDoneSignal() *DoneSignal {
return &DoneSignal{
doneC: make(chan struct{}),
}
}
func (e *DoneSignal) Emit() {
e.mu.Lock()
if e.doneC == nil {
e.doneC = make(chan struct{})
}
close(e.doneC)
e.mu.Unlock()
}
// Chan returns a channel that emits a signal when the execution is done.
func (e *DoneSignal) Chan() (c <-chan struct{}) {
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
e.mu.Lock()
if e.doneC == nil {
e.doneC = make(chan struct{})
c = e.doneC
} else {
c = e.doneC
}
e.mu.Unlock()
return c
}
// FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API.
// It uses a fixed target quantity to place orders.
type FixedQuantityExecutor struct {
exchange types.Exchange
// configuration fields
symbol string
side types.SideType
targetQuantity, sliceQuantity fixedpoint.Value
// updateInterval is a fixed update interval for placing new order
updateInterval time.Duration
// delayInterval is the delay interval between each order placement
delayInterval time.Duration
// numOfTicks is the number of price ticks behind the best bid to place the order
numOfTicks int
// stopPrice is the price limit for the order
// for buy-orders, the price limit is the maximum price
// for sell-orders, the price limit is the minimum price
stopPrice fixedpoint.Value
// deadlineTime is the deadline time for the order execution
deadlineTime *time.Time
executionCtx context.Context
cancelExecution context.CancelFunc
userDataStreamCtx context.Context
cancelUserDataStream context.CancelFunc
market types.Market
marketDataStream types.Stream
orderBook *types.StreamOrderBook
userDataStream types.Stream
activeMakerOrders *bbgo.ActiveOrderBook
orderStore *core.OrderStore
position *types.Position
tradeCollector *core.TradeCollector
logger logrus.FieldLogger
mu sync.Mutex
userDataStreamConnectC chan struct{}
marketDataStreamConnectC chan struct{}
done *DoneSignal
}
func NewStreamExecutor(
exchange types.Exchange,
symbol string,
market types.Market,
side types.SideType,
targetQuantity, sliceQuantity fixedpoint.Value,
) *FixedQuantityExecutor {
marketDataStream := exchange.NewStream()
marketDataStream.SetPublicOnly()
marketDataStream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
})
orderBook := types.NewStreamBook(symbol)
orderBook.BindStream(marketDataStream)
userDataStream := exchange.NewStream()
orderStore := core.NewOrderStore(symbol)
position := types.NewPositionFromMarket(market)
tradeCollector := core.NewTradeCollector(symbol, position, orderStore)
orderStore.BindStream(userDataStream)
activeMakerOrders := bbgo.NewActiveOrderBook(symbol)
e := &FixedQuantityExecutor{
exchange: exchange,
symbol: symbol,
side: side,
market: market,
targetQuantity: targetQuantity,
sliceQuantity: sliceQuantity,
updateInterval: defaultUpdateInterval,
logger: logrus.WithFields(logrus.Fields{
"executor": "twapStream",
"symbol": symbol,
}),
marketDataStream: marketDataStream,
orderBook: orderBook,
userDataStream: userDataStream,
activeMakerOrders: activeMakerOrders,
orderStore: orderStore,
tradeCollector: tradeCollector,
position: position,
done: NewDoneSignal(),
userDataStreamConnectC: make(chan struct{}),
marketDataStreamConnectC: make(chan struct{}),
}
e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
e.logger.Info(trade.String())
})
e.tradeCollector.BindStream(e.userDataStream)
activeMakerOrders.OnFilled(e.handleFilledOrder)
activeMakerOrders.BindStream(e.userDataStream)
e.marketDataStream.OnConnect(func() {
e.logger.Info("market data stream on connect")
close(e.marketDataStreamConnectC)
e.logger.Infof("marketDataStreamConnectC closed")
})
// private channels
e.userDataStream.OnAuth(func() {
e.logger.Info("user data stream on auth")
close(e.userDataStreamConnectC)
e.logger.Info("userDataStreamConnectC closed")
})
return e
}
func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) {
e.deadlineTime = &t
}
func (e *FixedQuantityExecutor) SetDelayInterval(delayInterval time.Duration) {
e.delayInterval = delayInterval
}
func (e *FixedQuantityExecutor) SetUpdateInterval(updateInterval time.Duration) {
e.updateInterval = updateInterval
}
func (e *FixedQuantityExecutor) connectMarketData(ctx context.Context) {
e.logger.Infof("connecting market data stream...")
if err := e.marketDataStream.Connect(ctx); err != nil {
e.logger.WithError(err).Errorf("market data stream connect error")
}
}
func (e *FixedQuantityExecutor) connectUserData(ctx context.Context) {
e.logger.Infof("connecting user data stream...")
if err := e.userDataStream.Connect(ctx); err != nil {
e.logger.WithError(err).Errorf("user data stream connect error")
}
}
func (e *FixedQuantityExecutor) handleFilledOrder(order types.Order) {
e.logger.Info(order.String())
// filled event triggers the order removal from the active order store
// we need to ensure we received every order update event before the execution is done.
e.cancelContextIfTargetQuantityFilled()
}
func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool {
// ensure that the trades are processed
e.tradeCollector.Process()
// now get the base quantity from the position
base := e.position.GetBase()
if base.Abs().Sub(e.targetQuantity).Compare(e.market.MinQuantity.Neg()) >= 0 {
e.logger.Infof("position is filled with target quantity, canceling the order execution context")
e.cancelExecution()
return true
}
return false
}
func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second)
defer gracefulCancel()
return e.activeMakerOrders.GracefulCancel(gracefulCtx, e.exchange)
}
func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
// updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2)
defer func() {
if err := e.cancelActiveOrders(ctx); err != nil {
e.logger.WithError(err).Error("cancel active orders error")
}
e.cancelUserDataStream()
e.done.Emit()
}()
ticker := time.NewTimer(e.updateInterval)
defer ticker.Stop()
monitor := NewBboMonitor()
for {
select {
case <-ctx.Done():
return
case <-e.orderBook.C:
changed := monitor.OnUpdateFromBook(e.orderBook)
if !changed {
continue
}
// orderBook.C sends a signal when any price or quantity changes in the order book
/*
if !updateLimiter.Allow() {
break
}
*/
if e.cancelContextIfTargetQuantityFilled() {
return
}
e.logger.Infof("%s order book changed, checking order...", e.symbol)
if err := e.updateOrder(ctx); err != nil {
e.logger.WithError(err).Errorf("order update failed")
}
case <-ticker.C:
changed := monitor.OnUpdateFromBook(e.orderBook)
if !changed {
continue
}
/*
if !updateLimiter.Allow() {
break
}
*/
if e.cancelContextIfTargetQuantityFilled() {
return
}
if err := e.updateOrder(ctx); err != nil {
e.logger.WithError(err).Errorf("order update failed")
}
}
}
}
func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
book := e.orderBook.Copy()
sideBook := book.SideBook(e.side)
first, ok := sideBook.First()
if !ok {
return fmt.Errorf("empty %s %s side book", e.symbol, e.side)
}
// if there is no gap between the first price entry and the second price entry
second, ok := sideBook.Second()
if !ok {
return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.symbol, e.side)
}
tickSize := e.market.TickSize
numOfTicks := fixedpoint.NewFromInt(int64(e.numOfTicks))
tickSpread := tickSize.Mul(numOfTicks)
// check and see if we need to cancel the existing active orders
for e.activeMakerOrders.NumOfOrders() > 0 {
orders := e.activeMakerOrders.Orders()
if len(orders) > 1 {
e.logger.Warnf("found more than 1 %s open orders on the orderbook", e.symbol)
}
// get the first active order
order := orders[0]
orderPrice := order.Price
// quantity := fixedpoint.NewFromFloat(order.Quantity)
remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity)
if remainingQuantity.Compare(e.market.MinQuantity) <= 0 {
logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String())
return nil
}
// if the first bid price or first ask price is the same to the current active order
// we should skip updating the order
// DO NOT UPDATE IF:
// tickSpread > 0 AND current order price == second price + tickSpread
// current order price == first price
logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String())
switch e.side {
case types.SideTypeBuy:
if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) {
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
return nil
} else if orderPrice == first.Price {
logrus.Infof("the current order is already on the best bid price %s", orderPrice.String())
return nil
}
case types.SideTypeSell:
if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) {
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
return nil
} else if orderPrice == first.Price {
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
return nil
}
}
if err := e.cancelActiveOrders(ctx); err != nil {
e.logger.Warnf("cancel active orders error: %v", err)
}
}
e.tradeCollector.Process()
orderForm, err := e.generateOrder()
if err != nil {
return err
} else if orderForm == nil {
return nil
}
createdOrder, err := e.exchange.SubmitOrder(ctx, *orderForm)
if err != nil {
return err
}
if createdOrder != nil {
e.orderStore.Add(*createdOrder)
e.activeMakerOrders.Add(*createdOrder)
e.tradeCollector.Process()
}
return nil
}
func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) {
newPrice := fixedpoint.Zero
book := e.orderBook.Copy()
sideBook := book.SideBook(e.side)
first, ok := sideBook.First()
if !ok {
return newPrice, fmt.Errorf("empty %s %s side book", e.symbol, e.side)
}
newPrice = first.Price
spread, ok := book.Spread()
if !ok {
return newPrice, errors.New("can not calculate spread, neither bid price or ask price exists")
}
tickSize := e.market.TickSize
tickSpread := tickSize.Mul(fixedpoint.NewFromInt(int64(e.numOfTicks)))
if spread.Compare(tickSize) > 0 {
// there is a gap in the spread
tickSpread = fixedpoint.Min(tickSpread, spread.Sub(tickSize))
switch e.side {
case types.SideTypeSell:
newPrice = newPrice.Sub(tickSpread)
case types.SideTypeBuy:
newPrice = newPrice.Add(tickSpread)
}
}
if e.stopPrice.Sign() > 0 {
switch e.side {
case types.SideTypeSell:
if newPrice.Compare(e.stopPrice) < 0 {
logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s",
e.symbol,
newPrice.String(),
e.stopPrice.String(),
e.stopPrice.String())
newPrice = e.stopPrice
}
case types.SideTypeBuy:
if newPrice.Compare(e.stopPrice) > 0 {
logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s",
e.symbol,
newPrice.String(),
e.stopPrice.String(),
e.stopPrice.String())
newPrice = e.stopPrice
}
}
}
return newPrice, nil
}
func (e *FixedQuantityExecutor) getRemainingQuantity() fixedpoint.Value {
base := e.position.GetBase()
return e.targetQuantity.Sub(base.Abs())
}
func (e *FixedQuantityExecutor) isDeadlineExceeded() bool {
if e.deadlineTime != nil && !e.deadlineTime.IsZero() {
return time.Since(*e.deadlineTime) > 0
}
return false
}
func (e *FixedQuantityExecutor) calculateNewOrderQuantity(price fixedpoint.Value) (fixedpoint.Value, error) {
minQuantity := e.market.MinQuantity
remainingQuantity := e.getRemainingQuantity()
if remainingQuantity.Sign() <= 0 {
e.cancelExecution()
return fixedpoint.Zero, nil
}
if remainingQuantity.Compare(minQuantity) < 0 {
e.logger.Warnf("can not continue placing orders, the remaining quantity %s is less than the min quantity %s", remainingQuantity.String(), minQuantity.String())
e.cancelExecution()
return fixedpoint.Zero, nil
}
// if deadline exceeded, we should return the remaining quantity
if e.isDeadlineExceeded() {
return remainingQuantity, nil
}
// when slice = 1000, if we only have 998, we should adjust our quantity to 998
orderQuantity := fixedpoint.Min(e.sliceQuantity, remainingQuantity)
// if the remaining quantity in the next round is not enough, we should merge the remaining quantity into this round
// if there are rest slices
nextRemainingQuantity := remainingQuantity.Sub(e.sliceQuantity)
if nextRemainingQuantity.Sign() > 0 && e.market.IsDustQuantity(nextRemainingQuantity, price) {
orderQuantity = remainingQuantity
}
orderQuantity = e.market.AdjustQuantityByMinNotional(orderQuantity, price)
return orderQuantity, nil
}
func (e *FixedQuantityExecutor) generateOrder() (*types.SubmitOrder, error) {
newPrice, err := e.getNewPrice()
if err != nil {
return nil, err
}
orderQuantity, err := e.calculateNewOrderQuantity(newPrice)
if err != nil {
return nil, err
}
balances, err := e.exchange.QueryAccountBalances(e.executionCtx)
if err != nil {
return nil, err
}
switch e.side {
case types.SideTypeSell:
// check base balance for sell, try to sell as more as possible
if b, ok := balances[e.market.BaseCurrency]; ok {
orderQuantity = fixedpoint.Min(b.Available, orderQuantity)
}
case types.SideTypeBuy:
// check base balance for sell, try to sell as more as possible
if b, ok := balances[e.market.QuoteCurrency]; ok {
orderQuantity = e.market.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available)
}
}
if e.isDeadlineExceeded() {
return &types.SubmitOrder{
Symbol: e.symbol,
Side: e.side,
Type: types.OrderTypeMarket,
Quantity: orderQuantity,
Market: e.market,
}, nil
}
return &types.SubmitOrder{
Symbol: e.symbol,
Side: e.side,
Type: types.OrderTypeLimitMaker,
Quantity: orderQuantity,
Price: newPrice,
Market: e.market,
TimeInForce: types.TimeInForceGTC,
}, nil
}
func (e *FixedQuantityExecutor) Start(ctx context.Context) error {
if e.executionCtx != nil {
return errors.New("executionCtx is not nil, you can't start the executor twice")
}
e.executionCtx, e.cancelExecution = context.WithCancel(ctx)
e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx)
go e.connectMarketData(e.executionCtx)
go e.connectUserData(e.userDataStreamCtx)
e.logger.Infof("waiting for connections ready...")
if err := e.WaitForConnection(ctx); err != nil {
e.cancelExecution()
return err
}
e.logger.Infof("connections ready, starting order updater...")
go e.orderUpdater(e.executionCtx)
return nil
}
func (e *FixedQuantityExecutor) WaitForConnection(ctx context.Context) error {
if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) {
return fmt.Errorf("market data stream connection timeout")
}
if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) {
return fmt.Errorf("user data stream connection timeout")
}
return nil
}
// Done returns a channel that emits a signal when the execution is done.
func (e *FixedQuantityExecutor) Done() <-chan struct{} {
return e.done.Chan()
}
// Shutdown stops the execution
// If we call this method, it means the execution is still running,
// We need it to:
// 1. Stop the order updater (by using the execution context)
// 2. The order updater cancels all open orders and closes the user data stream
func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) {
e.mu.Lock()
if e.cancelExecution != nil {
e.cancelExecution()
}
e.mu.Unlock()
for {
select {
case <-shutdownCtx.Done():
return
case <-e.done.Chan():
return
}
}
}
func selectSignalOrTimeout(ctx context.Context, c chan struct{}, timeout time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(timeout):
return false
case <-c:
return true
}
}

View File

@ -0,0 +1,316 @@
package twap
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"github.com/c9s/bbgo/pkg/fixedpoint"
. "github.com/c9s/bbgo/pkg/testing/testhelper"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
)
func getTestMarket() types.Market {
market := types.Market{
Symbol: "BTCUSDT",
PricePrecision: 8,
VolumePrecision: 8,
QuoteCurrency: "USDT",
BaseCurrency: "BTC",
MinNotional: fixedpoint.MustNewFromString("0.001"),
MinAmount: fixedpoint.MustNewFromString("10.0"),
MinQuantity: fixedpoint.MustNewFromString("0.001"),
}
return market
}
type OrderMatcher struct {
Order types.Order
}
func MatchOrder(o types.Order) *OrderMatcher {
return &OrderMatcher{
Order: o,
}
}
func (m *OrderMatcher) Matches(x interface{}) bool {
order, ok := x.(types.Order)
if !ok {
return false
}
return m.Order.OrderID == order.OrderID && m.Order.Price.Compare(m.Order.Price) == 0
}
func (m *OrderMatcher) String() string {
return fmt.Sprintf("OrderMatcher expects %+v", m.Order)
}
type CatchMatcher struct {
f func(x any)
}
func Catch(f func(x any)) *CatchMatcher {
return &CatchMatcher{
f: f,
}
}
func (m *CatchMatcher) Matches(x interface{}) bool {
m.f(x)
return true
}
func (m *CatchMatcher) String() string {
return "CatchMatcher"
}
func bindMockMarketDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) {
mockStream.EXPECT().OnBookSnapshot(Catch(func(x any) {
stream.OnBookSnapshot(x.(func(book types.SliceOrderBook)))
})).AnyTimes()
mockStream.EXPECT().OnBookUpdate(Catch(func(x any) {
stream.OnBookUpdate(x.(func(book types.SliceOrderBook)))
})).AnyTimes()
mockStream.EXPECT().OnConnect(Catch(func(x any) {
stream.OnConnect(x.(func()))
})).AnyTimes()
}
func bindMockUserDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) {
mockStream.EXPECT().OnOrderUpdate(Catch(func(x any) {
stream.OnOrderUpdate(x.(func(order types.Order)))
})).AnyTimes()
mockStream.EXPECT().OnTradeUpdate(Catch(func(x any) {
stream.OnTradeUpdate(x.(func(order types.Trade)))
})).AnyTimes()
mockStream.EXPECT().OnBalanceUpdate(Catch(func(x any) {
stream.OnBalanceUpdate(x.(func(m types.BalanceMap)))
})).AnyTimes()
mockStream.EXPECT().OnConnect(Catch(func(x any) {
stream.OnConnect(x.(func()))
})).AnyTimes()
mockStream.EXPECT().OnAuth(Catch(func(x any) {
stream.OnAuth(x.(func()))
}))
}
func TestNewStreamExecutor(t *testing.T) {
exchangeName := types.ExchangeBinance
symbol := "BTCUSDT"
market := getTestMarket()
targetQuantity := Number(100)
sliceQuantity := Number(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockEx := mocks.NewMockExchange(mockCtrl)
marketDataStream := &types.StandardStream{}
userDataStream := &types.StandardStream{}
mockMarketDataStream := mocks.NewMockStream(mockCtrl)
mockMarketDataStream.EXPECT().SetPublicOnly()
mockMarketDataStream.EXPECT().Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
})
bindMockMarketDataStream(mockMarketDataStream, marketDataStream)
mockMarketDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx))
mockUserDataStream := mocks.NewMockStream(mockCtrl)
bindMockUserDataStream(mockUserDataStream, userDataStream)
mockUserDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx))
initialBalances := types.BalanceMap{
"BTC": types.Balance{
Available: Number(2),
},
"USDT": types.Balance{
Available: Number(20_000),
},
}
mockEx.EXPECT().NewStream().Return(mockMarketDataStream)
mockEx.EXPECT().NewStream().Return(mockUserDataStream)
mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil)
// first order
firstSubmitOrder := types.SubmitOrder{
Symbol: symbol,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimitMaker,
Quantity: Number(1),
Price: Number(19400),
Market: market,
TimeInForce: types.TimeInForceGTC,
}
firstSubmitOrderTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
firstOrder := types.Order{
SubmitOrder: firstSubmitOrder,
Exchange: exchangeName,
OrderID: 1,
Status: types.OrderStatusNew,
ExecutedQuantity: Number(0.0),
IsWorking: true,
CreationTime: types.Time(firstSubmitOrderTime),
UpdateTime: types.Time(firstSubmitOrderTime),
}
mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), firstSubmitOrder).Return(&firstOrder, nil)
executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, targetQuantity, sliceQuantity)
executor.SetUpdateInterval(200 * time.Millisecond)
go func() {
err := executor.Start(ctx)
assert.NoError(t, err)
}()
go func() {
time.Sleep(500 * time.Millisecond)
marketDataStream.EmitConnect()
userDataStream.EmitConnect()
userDataStream.EmitAuth()
}()
err := executor.WaitForConnection(ctx)
assert.NoError(t, err)
t.Logf("sending book snapshot...")
snapshotTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
marketDataStream.EmitBookSnapshot(types.SliceOrderBook{
Symbol: symbol,
Bids: types.PriceVolumeSlice{
{Price: Number(19400), Volume: Number(1)},
{Price: Number(19300), Volume: Number(2)},
{Price: Number(19200), Volume: Number(3)},
},
Asks: types.PriceVolumeSlice{
{Price: Number(19450), Volume: Number(1)},
{Price: Number(19550), Volume: Number(2)},
{Price: Number(19650), Volume: Number(3)},
},
Time: snapshotTime,
LastUpdateId: 101,
})
time.Sleep(500 * time.Millisecond)
t.Logf("sending book update...")
// we expect the second order will be placed when the order update is received
secondSubmitOrder := types.SubmitOrder{
Symbol: symbol,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimitMaker,
Quantity: Number(1),
Price: Number(19420),
Market: market,
TimeInForce: types.TimeInForceGTC,
}
secondSubmitOrderTime := time.Date(2021, 1, 1, 0, 1, 0, 0, time.UTC)
secondOrder := types.Order{
SubmitOrder: secondSubmitOrder,
Exchange: exchangeName,
OrderID: 2,
Status: types.OrderStatusNew,
ExecutedQuantity: Number(0.0),
IsWorking: true,
CreationTime: types.Time(secondSubmitOrderTime),
UpdateTime: types.Time(secondSubmitOrderTime),
}
mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(firstOrder)).DoAndReturn(func(
ctx context.Context, orders ...types.Order,
) error {
orderUpdate := firstOrder
orderUpdate.Status = types.OrderStatusCanceled
userDataStream.EmitOrderUpdate(orderUpdate)
t.Logf("emit order update: %+v", orderUpdate)
return nil
})
mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil)
mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), secondSubmitOrder).Return(&secondOrder, nil)
t.Logf("waiting for the order update...")
time.Sleep(500 * time.Millisecond)
{
orders := executor.orderStore.Orders()
assert.Len(t, orders, 1, "should have 1 order in the order store")
}
marketDataStream.EmitBookUpdate(types.SliceOrderBook{
Symbol: symbol,
Bids: types.PriceVolumeSlice{
{Price: Number(19420), Volume: Number(1)},
{Price: Number(19300), Volume: Number(2)},
{Price: Number(19200), Volume: Number(3)},
},
Asks: types.PriceVolumeSlice{
{Price: Number(19450), Volume: Number(1)},
{Price: Number(19550), Volume: Number(2)},
{Price: Number(19650), Volume: Number(3)},
},
Time: snapshotTime,
LastUpdateId: 101,
})
t.Logf("waiting for the next order update...")
time.Sleep(500 * time.Millisecond)
{
orders := executor.orderStore.Orders()
assert.Len(t, orders, 1, "should have 1 order in the order store")
}
t.Logf("emitting trade update...")
userDataStream.EmitTradeUpdate(types.Trade{
ID: 1,
OrderID: 2,
Exchange: exchangeName,
Price: Number(19420.0),
Quantity: Number(100.0),
QuoteQuantity: Number(100.0 * 19420.0),
Symbol: symbol,
Side: types.SideTypeBuy,
IsBuyer: true,
IsMaker: true,
Time: types.Time(secondSubmitOrderTime),
})
t.Logf("waiting for the trade callbacks...")
time.Sleep(500 * time.Millisecond)
executor.tradeCollector.Process()
assert.Equal(t, Number(100), executor.position.GetBase())
mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(secondOrder)).DoAndReturn(func(
ctx context.Context, orders ...types.Order,
) error {
orderUpdate := secondOrder
orderUpdate.Status = types.OrderStatusCanceled
userDataStream.EmitOrderUpdate(orderUpdate)
t.Logf("emit order #2 update: %+v", orderUpdate)
return nil
})
assert.True(t, executor.cancelContextIfTargetQuantityFilled(), "target quantity should be filled")
// finalizing and stop the executor
select {
case <-ctx.Done():
case <-time.After(10 * time.Second):
case <-executor.Done():
}
t.Logf("executor done")
}

View File

@ -247,6 +247,19 @@ func (m Market) AdjustQuantityByMinNotional(quantity, currentPrice fixedpoint.Va
return quantity return quantity
} }
// AdjustQuantityByMaxAmount adjusts the quantity to make the amount less than the given maxAmount
func (m Market) AdjustQuantityByMaxAmount(quantity, currentPrice, maxAmount fixedpoint.Value) fixedpoint.Value {
// modify quantity for the min amount
amount := currentPrice.Mul(quantity)
if amount.Compare(maxAmount) < 0 {
return quantity
}
ratio := maxAmount.Div(amount)
quantity = quantity.Mul(ratio)
return m.TruncateQuantity(quantity)
}
type MarketMap map[string]Market type MarketMap map[string]Market
func (m MarketMap) Add(market Market) { func (m MarketMap) Add(market Market) {

View File

@ -0,0 +1,375 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/c9s/bbgo/pkg/types (interfaces: Stream)
//
// Generated by this command:
//
// mockgen -destination=mocks/mock_stream.go -package=mocks . Stream
//
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
types "github.com/c9s/bbgo/pkg/types"
gomock "go.uber.org/mock/gomock"
)
// MockStream is a mock of Stream interface.
type MockStream struct {
ctrl *gomock.Controller
recorder *MockStreamMockRecorder
}
// MockStreamMockRecorder is the mock recorder for MockStream.
type MockStreamMockRecorder struct {
mock *MockStream
}
// NewMockStream creates a new mock instance.
func NewMockStream(ctrl *gomock.Controller) *MockStream {
mock := &MockStream{ctrl: ctrl}
mock.recorder = &MockStreamMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockStream) EXPECT() *MockStreamMockRecorder {
return m.recorder
}
// Close mocks base method.
func (m *MockStream) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockStreamMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStream)(nil).Close))
}
// Connect mocks base method.
func (m *MockStream) Connect(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Connect", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Connect indicates an expected call of Connect.
func (mr *MockStreamMockRecorder) Connect(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockStream)(nil).Connect), arg0)
}
// GetPublicOnly mocks base method.
func (m *MockStream) GetPublicOnly() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPublicOnly")
ret0, _ := ret[0].(bool)
return ret0
}
// GetPublicOnly indicates an expected call of GetPublicOnly.
func (mr *MockStreamMockRecorder) GetPublicOnly() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPublicOnly", reflect.TypeOf((*MockStream)(nil).GetPublicOnly))
}
// GetSubscriptions mocks base method.
func (m *MockStream) GetSubscriptions() []types.Subscription {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetSubscriptions")
ret0, _ := ret[0].([]types.Subscription)
return ret0
}
// GetSubscriptions indicates an expected call of GetSubscriptions.
func (mr *MockStreamMockRecorder) GetSubscriptions() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptions", reflect.TypeOf((*MockStream)(nil).GetSubscriptions))
}
// OnAggTrade mocks base method.
func (m *MockStream) OnAggTrade(arg0 func(types.Trade)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnAggTrade", arg0)
}
// OnAggTrade indicates an expected call of OnAggTrade.
func (mr *MockStreamMockRecorder) OnAggTrade(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAggTrade", reflect.TypeOf((*MockStream)(nil).OnAggTrade), arg0)
}
// OnAuth mocks base method.
func (m *MockStream) OnAuth(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnAuth", arg0)
}
// OnAuth indicates an expected call of OnAuth.
func (mr *MockStreamMockRecorder) OnAuth(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAuth", reflect.TypeOf((*MockStream)(nil).OnAuth), arg0)
}
// OnBalanceSnapshot mocks base method.
func (m *MockStream) OnBalanceSnapshot(arg0 func(types.BalanceMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBalanceSnapshot", arg0)
}
// OnBalanceSnapshot indicates an expected call of OnBalanceSnapshot.
func (mr *MockStreamMockRecorder) OnBalanceSnapshot(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceSnapshot", reflect.TypeOf((*MockStream)(nil).OnBalanceSnapshot), arg0)
}
// OnBalanceUpdate mocks base method.
func (m *MockStream) OnBalanceUpdate(arg0 func(types.BalanceMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBalanceUpdate", arg0)
}
// OnBalanceUpdate indicates an expected call of OnBalanceUpdate.
func (mr *MockStreamMockRecorder) OnBalanceUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceUpdate", reflect.TypeOf((*MockStream)(nil).OnBalanceUpdate), arg0)
}
// OnBookSnapshot mocks base method.
func (m *MockStream) OnBookSnapshot(arg0 func(types.SliceOrderBook)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBookSnapshot", arg0)
}
// OnBookSnapshot indicates an expected call of OnBookSnapshot.
func (mr *MockStreamMockRecorder) OnBookSnapshot(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookSnapshot", reflect.TypeOf((*MockStream)(nil).OnBookSnapshot), arg0)
}
// OnBookTickerUpdate mocks base method.
func (m *MockStream) OnBookTickerUpdate(arg0 func(types.BookTicker)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBookTickerUpdate", arg0)
}
// OnBookTickerUpdate indicates an expected call of OnBookTickerUpdate.
func (mr *MockStreamMockRecorder) OnBookTickerUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookTickerUpdate", reflect.TypeOf((*MockStream)(nil).OnBookTickerUpdate), arg0)
}
// OnBookUpdate mocks base method.
func (m *MockStream) OnBookUpdate(arg0 func(types.SliceOrderBook)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBookUpdate", arg0)
}
// OnBookUpdate indicates an expected call of OnBookUpdate.
func (mr *MockStreamMockRecorder) OnBookUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookUpdate", reflect.TypeOf((*MockStream)(nil).OnBookUpdate), arg0)
}
// OnConnect mocks base method.
func (m *MockStream) OnConnect(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnConnect", arg0)
}
// OnConnect indicates an expected call of OnConnect.
func (mr *MockStreamMockRecorder) OnConnect(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnConnect", reflect.TypeOf((*MockStream)(nil).OnConnect), arg0)
}
// OnDisconnect mocks base method.
func (m *MockStream) OnDisconnect(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnDisconnect", arg0)
}
// OnDisconnect indicates an expected call of OnDisconnect.
func (mr *MockStreamMockRecorder) OnDisconnect(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnDisconnect", reflect.TypeOf((*MockStream)(nil).OnDisconnect), arg0)
}
// OnForceOrder mocks base method.
func (m *MockStream) OnForceOrder(arg0 func(types.LiquidationInfo)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnForceOrder", arg0)
}
// OnForceOrder indicates an expected call of OnForceOrder.
func (mr *MockStreamMockRecorder) OnForceOrder(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnForceOrder", reflect.TypeOf((*MockStream)(nil).OnForceOrder), arg0)
}
// OnFuturesPositionSnapshot mocks base method.
func (m *MockStream) OnFuturesPositionSnapshot(arg0 func(types.FuturesPositionMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnFuturesPositionSnapshot", arg0)
}
// OnFuturesPositionSnapshot indicates an expected call of OnFuturesPositionSnapshot.
func (mr *MockStreamMockRecorder) OnFuturesPositionSnapshot(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionSnapshot", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionSnapshot), arg0)
}
// OnFuturesPositionUpdate mocks base method.
func (m *MockStream) OnFuturesPositionUpdate(arg0 func(types.FuturesPositionMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnFuturesPositionUpdate", arg0)
}
// OnFuturesPositionUpdate indicates an expected call of OnFuturesPositionUpdate.
func (mr *MockStreamMockRecorder) OnFuturesPositionUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionUpdate", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionUpdate), arg0)
}
// OnKLine mocks base method.
func (m *MockStream) OnKLine(arg0 func(types.KLine)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnKLine", arg0)
}
// OnKLine indicates an expected call of OnKLine.
func (mr *MockStreamMockRecorder) OnKLine(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLine", reflect.TypeOf((*MockStream)(nil).OnKLine), arg0)
}
// OnKLineClosed mocks base method.
func (m *MockStream) OnKLineClosed(arg0 func(types.KLine)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnKLineClosed", arg0)
}
// OnKLineClosed indicates an expected call of OnKLineClosed.
func (mr *MockStreamMockRecorder) OnKLineClosed(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLineClosed", reflect.TypeOf((*MockStream)(nil).OnKLineClosed), arg0)
}
// OnMarketTrade mocks base method.
func (m *MockStream) OnMarketTrade(arg0 func(types.Trade)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnMarketTrade", arg0)
}
// OnMarketTrade indicates an expected call of OnMarketTrade.
func (mr *MockStreamMockRecorder) OnMarketTrade(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMarketTrade", reflect.TypeOf((*MockStream)(nil).OnMarketTrade), arg0)
}
// OnOrderUpdate mocks base method.
func (m *MockStream) OnOrderUpdate(arg0 func(types.Order)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnOrderUpdate", arg0)
}
// OnOrderUpdate indicates an expected call of OnOrderUpdate.
func (mr *MockStreamMockRecorder) OnOrderUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnOrderUpdate", reflect.TypeOf((*MockStream)(nil).OnOrderUpdate), arg0)
}
// OnRawMessage mocks base method.
func (m *MockStream) OnRawMessage(arg0 func([]byte)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnRawMessage", arg0)
}
// OnRawMessage indicates an expected call of OnRawMessage.
func (mr *MockStreamMockRecorder) OnRawMessage(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRawMessage", reflect.TypeOf((*MockStream)(nil).OnRawMessage), arg0)
}
// OnStart mocks base method.
func (m *MockStream) OnStart(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnStart", arg0)
}
// OnStart indicates an expected call of OnStart.
func (mr *MockStreamMockRecorder) OnStart(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnStart", reflect.TypeOf((*MockStream)(nil).OnStart), arg0)
}
// OnTradeUpdate mocks base method.
func (m *MockStream) OnTradeUpdate(arg0 func(types.Trade)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnTradeUpdate", arg0)
}
// OnTradeUpdate indicates an expected call of OnTradeUpdate.
func (mr *MockStreamMockRecorder) OnTradeUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnTradeUpdate", reflect.TypeOf((*MockStream)(nil).OnTradeUpdate), arg0)
}
// Reconnect mocks base method.
func (m *MockStream) Reconnect() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Reconnect")
}
// Reconnect indicates an expected call of Reconnect.
func (mr *MockStreamMockRecorder) Reconnect() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconnect", reflect.TypeOf((*MockStream)(nil).Reconnect))
}
// Resubscribe mocks base method.
func (m *MockStream) Resubscribe(arg0 func([]types.Subscription) ([]types.Subscription, error)) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Resubscribe", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Resubscribe indicates an expected call of Resubscribe.
func (mr *MockStreamMockRecorder) Resubscribe(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resubscribe", reflect.TypeOf((*MockStream)(nil).Resubscribe), arg0)
}
// SetPublicOnly mocks base method.
func (m *MockStream) SetPublicOnly() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetPublicOnly")
}
// SetPublicOnly indicates an expected call of SetPublicOnly.
func (mr *MockStreamMockRecorder) SetPublicOnly() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPublicOnly", reflect.TypeOf((*MockStream)(nil).SetPublicOnly))
}
// Subscribe mocks base method.
func (m *MockStream) Subscribe(arg0 types.Channel, arg1 string, arg2 types.SubscribeOptions) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Subscribe", arg0, arg1, arg2)
}
// Subscribe indicates an expected call of Subscribe.
func (mr *MockStreamMockRecorder) Subscribe(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockStream)(nil).Subscribe), arg0, arg1, arg2)
}

View File

@ -4,19 +4,38 @@ import (
"encoding/json" "encoding/json"
"strings" "strings"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/fixedpoint"
) )
type PriceType string type PriceType string
const ( const (
PriceTypeLast PriceType = "LAST" // PriceTypeLast uses the last price from the given ticker
PriceTypeBuy PriceType = "BUY" // BID PriceTypeLast PriceType = "LAST"
PriceTypeSell PriceType = "SELL" // ASK
PriceTypeMid PriceType = "MID" // PriceTypeBid uses the bid price from the given ticker
PriceTypeBid PriceType = "BID"
// PriceTypeAsk uses the ask price from the given ticker
PriceTypeAsk PriceType = "ASK"
// PriceTypeMid calculates the middle price from the given ticker
PriceTypeMid PriceType = "MID"
PriceTypeMaker PriceType = "MAKER" PriceTypeMaker PriceType = "MAKER"
PriceTypeTaker PriceType = "TAKER" PriceTypeTaker PriceType = "TAKER"
// See best bid offer types
// https://www.binance.com/en/support/faq/understanding-and-using-bbo-orders-on-binance-futures-7f93c89ef09042678cfa73e8a28612e8
PriceTypeBestBidOfferCounterParty1 PriceType = "COUNTERPARTY1"
PriceTypeBestBidOfferCounterParty5 PriceType = "COUNTERPARTY5"
PriceTypeBestBidOfferQueue1 PriceType = "QUEUE1"
PriceTypeBestBidOfferQueue5 PriceType = "QUEUE5"
) )
var ErrInvalidPriceType = errors.New("invalid price type") var ErrInvalidPriceType = errors.New("invalid price type")
@ -24,7 +43,10 @@ var ErrInvalidPriceType = errors.New("invalid price type")
func ParsePriceType(s string) (p PriceType, err error) { func ParsePriceType(s string) (p PriceType, err error) {
p = PriceType(strings.ToUpper(s)) p = PriceType(strings.ToUpper(s))
switch p { switch p {
case PriceTypeLast, PriceTypeBuy, PriceTypeSell, PriceTypeMid, PriceTypeMaker, PriceTypeTaker: case PriceTypeLast, PriceTypeBid, PriceTypeAsk,
PriceTypeMid, PriceTypeMaker, PriceTypeTaker,
PriceTypeBestBidOfferCounterParty1, PriceTypeBestBidOfferCounterParty5,
PriceTypeBestBidOfferQueue1, PriceTypeBestBidOfferQueue5:
return p, err return p, err
} }
return p, ErrInvalidPriceType return p, ErrInvalidPriceType
@ -47,25 +69,31 @@ func (p *PriceType) UnmarshalJSON(data []byte) error {
return nil return nil
} }
func (p PriceType) Map(ticker *Ticker, side SideType) fixedpoint.Value { // GetPrice returns the price from the given ticker based on the price type
func (p PriceType) GetPrice(ticker *Ticker, side SideType) fixedpoint.Value {
switch p {
case PriceTypeBestBidOfferQueue5, PriceTypeBestBidOfferCounterParty5:
log.Warnf("price type %s is not supported with ticker", p)
}
price := ticker.Last price := ticker.Last
switch p { switch p {
case PriceTypeLast: case PriceTypeLast:
price = ticker.Last price = ticker.Last
case PriceTypeBuy: case PriceTypeBid:
price = ticker.Buy price = ticker.Buy
case PriceTypeSell: case PriceTypeAsk:
price = ticker.Sell price = ticker.Sell
case PriceTypeMid: case PriceTypeMid:
price = ticker.Buy.Add(ticker.Sell).Div(fixedpoint.NewFromInt(2)) price = ticker.Buy.Add(ticker.Sell).Div(fixedpoint.NewFromInt(2))
case PriceTypeMaker: case PriceTypeMaker, PriceTypeBestBidOfferQueue1, PriceTypeBestBidOfferQueue5:
if side == SideTypeBuy { if side == SideTypeBuy {
price = ticker.Buy price = ticker.Buy
} else if side == SideTypeSell { } else if side == SideTypeSell {
price = ticker.Sell price = ticker.Sell
} }
case PriceTypeTaker: case PriceTypeTaker, PriceTypeBestBidOfferCounterParty1, PriceTypeBestBidOfferCounterParty5:
if side == SideTypeBuy { if side == SideTypeBuy {
price = ticker.Sell price = ticker.Sell
} else if side == SideTypeSell { } else if side == SideTypeSell {

View File

@ -24,6 +24,7 @@ var defaultDialer = &websocket.Dialer{
ReadBufferSize: 4096, ReadBufferSize: 4096,
} }
//go:generate mockgen -destination=mocks/mock_stream.go -package=mocks . Stream
type Stream interface { type Stream interface {
StandardStreamEventHub StandardStreamEventHub