mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 06:53:52 +00:00
Merge pull request #1699 from c9s/c9s/refactor-twap
REFACTOR: [twap] upgrade twap command and add optional order update rate limiter
This commit is contained in:
commit
055cfbb3ff
221
pkg/cmd/execute_order.go
Normal file
221
pkg/cmd/execute_order.go
Normal file
|
@ -0,0 +1,221 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
|
_ "github.com/c9s/bbgo/pkg/twap"
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
"github.com/c9s/bbgo/pkg/util"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/twap/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
executeOrderCmd.Flags().String("session", "", "the exchange session name for sync")
|
||||||
|
executeOrderCmd.Flags().String("symbol", "", "the trading pair, like btcusdt")
|
||||||
|
executeOrderCmd.Flags().String("side", "", "the trading side: buy or sell")
|
||||||
|
executeOrderCmd.Flags().String("target-quantity", "", "target quantity")
|
||||||
|
executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity")
|
||||||
|
executeOrderCmd.Flags().String("stop-price", "0", "stop price")
|
||||||
|
executeOrderCmd.Flags().String("order-update-rate-limit", "1s", "order update rate limit, syntax: 1+1/1m")
|
||||||
|
executeOrderCmd.Flags().Duration("update-interval", time.Second*10, "order update time")
|
||||||
|
executeOrderCmd.Flags().Duration("delay-interval", time.Second*3, "order delay time after filled")
|
||||||
|
executeOrderCmd.Flags().Duration("deadline", 0, "deadline duration of the order execution, e.g. 1h")
|
||||||
|
executeOrderCmd.Flags().Int("price-ticks", 0, "the number of price tick for the jump spread, default to 0")
|
||||||
|
RootCmd.AddCommand(executeOrderCmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
var executeOrderCmd = &cobra.Command{
|
||||||
|
Use: "execute-order --session SESSION --symbol SYMBOL --side SIDE --target-quantity TOTAL_QUANTITY --slice-quantity SLICE_QUANTITY",
|
||||||
|
Short: "execute buy/sell on the balance/position you have on specific symbol",
|
||||||
|
SilenceUsage: true,
|
||||||
|
PreRunE: cobraInitRequired([]string{
|
||||||
|
"symbol",
|
||||||
|
"side",
|
||||||
|
"target-quantity",
|
||||||
|
"slice-quantity",
|
||||||
|
}),
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
sessionName, err := cmd.Flags().GetString("session")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
symbol, err := cmd.Flags().GetString("symbol")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can not get the symbol from flags: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if symbol == "" {
|
||||||
|
return fmt.Errorf("symbol not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
sideS, err := cmd.Flags().GetString("side")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't get side: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
side, err := types.StrToSideType(sideS)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
targetQuantityS, err := cmd.Flags().GetString("target-quantity")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(targetQuantityS) == 0 {
|
||||||
|
return errors.New("--target-quantity can not be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
targetQuantity, err := fixedpoint.NewFromString(targetQuantityS)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sliceQuantityS, err := cmd.Flags().GetString("slice-quantity")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(sliceQuantityS) == 0 {
|
||||||
|
return errors.New("--slice-quantity can not be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
sliceQuantity, err := fixedpoint.NewFromString(sliceQuantityS)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfPriceTicks, err := cmd.Flags().GetInt("price-ticks")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stopPriceS, err := cmd.Flags().GetString("stop-price")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stopPrice, err := fixedpoint.NewFromString(stopPriceS)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
orderUpdateRateLimitStr, err := cmd.Flags().GetString("order-update-rate-limit")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
updateInterval, err := cmd.Flags().GetDuration("update-interval")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
delayInterval, err := cmd.Flags().GetDuration("delay-interval")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
deadlineDuration, err := cmd.Flags().GetDuration("deadline")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var deadlineTime time.Time
|
||||||
|
if deadlineDuration > 0 {
|
||||||
|
deadlineTime = time.Now().Add(deadlineDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
environ := bbgo.NewEnvironment()
|
||||||
|
if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := environ.Init(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
session, ok := environ.Session(sessionName)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("session %s not found", sessionName)
|
||||||
|
}
|
||||||
|
|
||||||
|
executionCtx, cancelExecution := context.WithCancel(ctx)
|
||||||
|
defer cancelExecution()
|
||||||
|
|
||||||
|
market, ok := session.Market(symbol)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("market %s not found", symbol)
|
||||||
|
}
|
||||||
|
|
||||||
|
executor := twap.NewFixedQuantityExecutor(session.Exchange, symbol, market, side, targetQuantity, sliceQuantity)
|
||||||
|
|
||||||
|
if updateInterval > 0 {
|
||||||
|
executor.SetUpdateInterval(updateInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(orderUpdateRateLimitStr) > 0 {
|
||||||
|
rateLimit, err := util.ParseRateLimitSyntax(orderUpdateRateLimitStr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
executor.SetOrderUpdateRateLimit(rateLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
if delayInterval > 0 {
|
||||||
|
executor.SetDelayInterval(delayInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stopPrice.Sign() > 0 {
|
||||||
|
executor.SetStopPrice(stopPrice)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NumOfTicks: numOfPriceTicks,
|
||||||
|
if !deadlineTime.IsZero() {
|
||||||
|
executor.SetDeadlineTime(deadlineTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
if numOfPriceTicks > 0 {
|
||||||
|
executor.SetNumOfTicks(numOfPriceTicks)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := executor.Start(executionCtx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var sigC = make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigC, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
defer signal.Stop(sigC)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
|
||||||
|
case sig := <-sigC:
|
||||||
|
logrus.Warnf("signal %v", sig)
|
||||||
|
logrus.Infof("shutting down order executor...")
|
||||||
|
shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
|
||||||
|
executor.Shutdown(shutdownCtx)
|
||||||
|
cancelShutdown()
|
||||||
|
|
||||||
|
case <-executor.Done():
|
||||||
|
logrus.Infof("the order execution is completed")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
|
@ -3,20 +3,14 @@ package cmd
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
|
||||||
"github.com/c9s/bbgo/pkg/twap"
|
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,155 +140,6 @@ var listOrdersCmd = &cobra.Command{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var executeOrderCmd = &cobra.Command{
|
|
||||||
Use: "execute-order --session SESSION --symbol SYMBOL --side SIDE --target-quantity TOTAL_QUANTITY --slice-quantity SLICE_QUANTITY",
|
|
||||||
Short: "execute buy/sell on the balance/position you have on specific symbol",
|
|
||||||
SilenceUsage: true,
|
|
||||||
PreRunE: cobraInitRequired([]string{
|
|
||||||
"symbol",
|
|
||||||
"side",
|
|
||||||
"target-quantity",
|
|
||||||
"slice-quantity",
|
|
||||||
}),
|
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
sessionName, err := cmd.Flags().GetString("session")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
symbol, err := cmd.Flags().GetString("symbol")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can not get the symbol from flags: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if symbol == "" {
|
|
||||||
return fmt.Errorf("symbol not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
sideS, err := cmd.Flags().GetString("side")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't get side: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
side, err := types.StrToSideType(sideS)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
targetQuantityS, err := cmd.Flags().GetString("target-quantity")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(targetQuantityS) == 0 {
|
|
||||||
return errors.New("--target-quantity can not be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
targetQuantity, err := fixedpoint.NewFromString(targetQuantityS)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
sliceQuantityS, err := cmd.Flags().GetString("slice-quantity")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(sliceQuantityS) == 0 {
|
|
||||||
return errors.New("--slice-quantity can not be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
sliceQuantity, err := fixedpoint.NewFromString(sliceQuantityS)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfPriceTicks, err := cmd.Flags().GetInt("price-ticks")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
stopPriceS, err := cmd.Flags().GetString("stop-price")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
stopPrice, err := fixedpoint.NewFromString(stopPriceS)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
updateInterval, err := cmd.Flags().GetDuration("update-interval")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
deadlineDuration, err := cmd.Flags().GetDuration("deadline")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var deadlineTime time.Time
|
|
||||||
if deadlineDuration > 0 {
|
|
||||||
deadlineTime = time.Now().Add(deadlineDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
environ := bbgo.NewEnvironment()
|
|
||||||
if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := environ.Init(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
session, ok := environ.Session(sessionName)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("session %s not found", sessionName)
|
|
||||||
}
|
|
||||||
|
|
||||||
executionCtx, cancelExecution := context.WithCancel(ctx)
|
|
||||||
defer cancelExecution()
|
|
||||||
|
|
||||||
execution := &twap.StreamExecutor{
|
|
||||||
Session: session,
|
|
||||||
Symbol: symbol,
|
|
||||||
Side: side,
|
|
||||||
TargetQuantity: targetQuantity,
|
|
||||||
SliceQuantity: sliceQuantity,
|
|
||||||
StopPrice: stopPrice,
|
|
||||||
NumOfTicks: numOfPriceTicks,
|
|
||||||
UpdateInterval: updateInterval,
|
|
||||||
DeadlineTime: deadlineTime,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := execution.Run(executionCtx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var sigC = make(chan os.Signal, 1)
|
|
||||||
signal.Notify(sigC, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
defer signal.Stop(sigC)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case sig := <-sigC:
|
|
||||||
log.Warnf("signal %v", sig)
|
|
||||||
log.Infof("shutting down order executor...")
|
|
||||||
shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
|
|
||||||
execution.Shutdown(shutdownCtx)
|
|
||||||
cancelShutdown()
|
|
||||||
|
|
||||||
case <-execution.Done():
|
|
||||||
log.Infof("the order execution is completed")
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// go run ./cmd/bbgo submit-order --session=ftx --symbol=BTCUSDT --side=buy --price=18000 --quantity=0.001
|
// go run ./cmd/bbgo submit-order --session=ftx --symbol=BTCUSDT --side=buy --price=18000 --quantity=0.001
|
||||||
var submitOrderCmd = &cobra.Command{
|
var submitOrderCmd = &cobra.Command{
|
||||||
Use: "submit-order --session SESSION --symbol SYMBOL --side SIDE --quantity QUANTITY [--price PRICE]",
|
Use: "submit-order --session SESSION --symbol SYMBOL --side SIDE --quantity QUANTITY [--price PRICE]",
|
||||||
|
@ -414,18 +259,7 @@ func init() {
|
||||||
submitOrderCmd.Flags().Bool("market", false, "submit order as a market order")
|
submitOrderCmd.Flags().Bool("market", false, "submit order as a market order")
|
||||||
submitOrderCmd.Flags().String("margin-side-effect", "", "margin order side effect")
|
submitOrderCmd.Flags().String("margin-side-effect", "", "margin order side effect")
|
||||||
|
|
||||||
executeOrderCmd.Flags().String("session", "", "the exchange session name for sync")
|
|
||||||
executeOrderCmd.Flags().String("symbol", "", "the trading pair, like btcusdt")
|
|
||||||
executeOrderCmd.Flags().String("side", "", "the trading side: buy or sell")
|
|
||||||
executeOrderCmd.Flags().String("target-quantity", "", "target quantity")
|
|
||||||
executeOrderCmd.Flags().String("slice-quantity", "", "slice quantity")
|
|
||||||
executeOrderCmd.Flags().String("stop-price", "0", "stop price")
|
|
||||||
executeOrderCmd.Flags().Duration("update-interval", time.Second*10, "order update time")
|
|
||||||
executeOrderCmd.Flags().Duration("deadline", 0, "deadline of the order execution")
|
|
||||||
executeOrderCmd.Flags().Int("price-ticks", 0, "the number of price tick for the jump spread, default to 0")
|
|
||||||
|
|
||||||
RootCmd.AddCommand(listOrdersCmd)
|
RootCmd.AddCommand(listOrdersCmd)
|
||||||
RootCmd.AddCommand(getOrderCmd)
|
RootCmd.AddCommand(getOrderCmd)
|
||||||
RootCmd.AddCommand(submitOrderCmd)
|
RootCmd.AddCommand(submitOrderCmd)
|
||||||
RootCmd.AddCommand(executeOrderCmd)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ func NewBboMonitor() *BboMonitor {
|
||||||
return &BboMonitor{}
|
return &BboMonitor{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *BboMonitor) OnUpdateFromBook(book *types.StreamOrderBook) bool {
|
func (m *BboMonitor) UpdateFromBook(book *types.StreamOrderBook) bool {
|
||||||
bestBid, ok1 := book.BestBid()
|
bestBid, ok1 := book.BestBid()
|
||||||
bestAsk, ok2 := book.BestAsk()
|
bestAsk, ok2 := book.BestAsk()
|
||||||
if !ok1 || !ok2 {
|
if !ok1 || !ok2 {
|
||||||
|
|
39
pkg/twap/v2/done.go
Normal file
39
pkg/twap/v2/done.go
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
package twap
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type DoneSignal struct {
|
||||||
|
doneC chan struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDoneSignal() *DoneSignal {
|
||||||
|
return &DoneSignal{
|
||||||
|
doneC: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *DoneSignal) Emit() {
|
||||||
|
e.mu.Lock()
|
||||||
|
if e.doneC == nil {
|
||||||
|
e.doneC = make(chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
close(e.doneC)
|
||||||
|
e.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Chan returns a channel that emits a signal when the execution is done.
|
||||||
|
func (e *DoneSignal) Chan() (c <-chan struct{}) {
|
||||||
|
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
|
||||||
|
e.mu.Lock()
|
||||||
|
if e.doneC == nil {
|
||||||
|
e.doneC = make(chan struct{})
|
||||||
|
c = e.doneC
|
||||||
|
} else {
|
||||||
|
c = e.doneC
|
||||||
|
}
|
||||||
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
"github.com/c9s/bbgo/pkg/core"
|
"github.com/c9s/bbgo/pkg/core"
|
||||||
|
@ -17,42 +18,6 @@ import (
|
||||||
|
|
||||||
var defaultUpdateInterval = time.Minute
|
var defaultUpdateInterval = time.Minute
|
||||||
|
|
||||||
type DoneSignal struct {
|
|
||||||
doneC chan struct{}
|
|
||||||
mu sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDoneSignal() *DoneSignal {
|
|
||||||
return &DoneSignal{
|
|
||||||
doneC: make(chan struct{}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *DoneSignal) Emit() {
|
|
||||||
e.mu.Lock()
|
|
||||||
if e.doneC == nil {
|
|
||||||
e.doneC = make(chan struct{})
|
|
||||||
}
|
|
||||||
|
|
||||||
close(e.doneC)
|
|
||||||
e.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Chan returns a channel that emits a signal when the execution is done.
|
|
||||||
func (e *DoneSignal) Chan() (c <-chan struct{}) {
|
|
||||||
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
|
|
||||||
e.mu.Lock()
|
|
||||||
if e.doneC == nil {
|
|
||||||
e.doneC = make(chan struct{})
|
|
||||||
c = e.doneC
|
|
||||||
} else {
|
|
||||||
c = e.doneC
|
|
||||||
}
|
|
||||||
e.mu.Unlock()
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
// FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API.
|
// FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API.
|
||||||
// It uses a fixed target quantity to place orders.
|
// It uses a fixed target quantity to place orders.
|
||||||
type FixedQuantityExecutor struct {
|
type FixedQuantityExecutor struct {
|
||||||
|
@ -94,10 +59,11 @@ type FixedQuantityExecutor struct {
|
||||||
|
|
||||||
userDataStream types.Stream
|
userDataStream types.Stream
|
||||||
|
|
||||||
activeMakerOrders *bbgo.ActiveOrderBook
|
orderUpdateRateLimit *rate.Limiter
|
||||||
orderStore *core.OrderStore
|
activeMakerOrders *bbgo.ActiveOrderBook
|
||||||
position *types.Position
|
orderStore *core.OrderStore
|
||||||
tradeCollector *core.TradeCollector
|
position *types.Position
|
||||||
|
tradeCollector *core.TradeCollector
|
||||||
|
|
||||||
logger logrus.FieldLogger
|
logger logrus.FieldLogger
|
||||||
|
|
||||||
|
@ -108,7 +74,7 @@ type FixedQuantityExecutor struct {
|
||||||
done *DoneSignal
|
done *DoneSignal
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStreamExecutor(
|
func NewFixedQuantityExecutor(
|
||||||
exchange types.Exchange,
|
exchange types.Exchange,
|
||||||
symbol string,
|
symbol string,
|
||||||
market types.Market,
|
market types.Market,
|
||||||
|
@ -197,6 +163,14 @@ func (e *FixedQuantityExecutor) SetUpdateInterval(updateInterval time.Duration)
|
||||||
e.updateInterval = updateInterval
|
e.updateInterval = updateInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *FixedQuantityExecutor) SetNumOfTicks(numOfTicks int) {
|
||||||
|
e.numOfTicks = numOfTicks
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *FixedQuantityExecutor) SetStopPrice(price fixedpoint.Value) {
|
||||||
|
e.stopPrice = price
|
||||||
|
}
|
||||||
|
|
||||||
func (e *FixedQuantityExecutor) connectMarketData(ctx context.Context) {
|
func (e *FixedQuantityExecutor) connectMarketData(ctx context.Context) {
|
||||||
e.logger.Infof("connecting market data stream...")
|
e.logger.Infof("connecting market data stream...")
|
||||||
if err := e.marketDataStream.Connect(ctx); err != nil {
|
if err := e.marketDataStream.Connect(ctx); err != nil {
|
||||||
|
@ -234,6 +208,10 @@ func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *FixedQuantityExecutor) SetOrderUpdateRateLimit(rateLimit *rate.Limiter) {
|
||||||
|
e.orderUpdateRateLimit = rateLimit
|
||||||
|
}
|
||||||
|
|
||||||
func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
|
func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
|
||||||
gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second)
|
gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
defer gracefulCancel()
|
defer gracefulCancel()
|
||||||
|
@ -241,8 +219,6 @@ func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
// updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2)
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := e.cancelActiveOrders(ctx); err != nil {
|
if err := e.cancelActiveOrders(ctx); err != nil {
|
||||||
e.logger.WithError(err).Error("cancel active orders error")
|
e.logger.WithError(err).Error("cancel active orders error")
|
||||||
|
@ -263,18 +239,12 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
return
|
return
|
||||||
|
|
||||||
case <-e.orderBook.C:
|
case <-e.orderBook.C:
|
||||||
changed := monitor.OnUpdateFromBook(e.orderBook)
|
changed := monitor.UpdateFromBook(e.orderBook)
|
||||||
if !changed {
|
if !changed {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// orderBook.C sends a signal when any price or quantity changes in the order book
|
// orderBook.C sends a signal when any price or quantity changes in the order book
|
||||||
/*
|
|
||||||
if !updateLimiter.Allow() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
if e.cancelContextIfTargetQuantityFilled() {
|
if e.cancelContextIfTargetQuantityFilled() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -286,17 +256,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
changed := monitor.OnUpdateFromBook(e.orderBook)
|
changed := monitor.UpdateFromBook(e.orderBook)
|
||||||
if !changed {
|
if !changed {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
if !updateLimiter.Allow() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
if e.cancelContextIfTargetQuantityFilled() {
|
if e.cancelContextIfTargetQuantityFilled() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -309,6 +273,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
||||||
|
if e.orderUpdateRateLimit != nil && !e.orderUpdateRateLimit.Allow() {
|
||||||
|
e.logger.Infof("rate limit exceeded, skip updating order")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
book := e.orderBook.Copy()
|
book := e.orderBook.Copy()
|
||||||
sideBook := book.SideBook(e.side)
|
sideBook := book.SideBook(e.side)
|
||||||
|
|
||||||
|
@ -350,24 +319,28 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
||||||
// DO NOT UPDATE IF:
|
// DO NOT UPDATE IF:
|
||||||
// tickSpread > 0 AND current order price == second price + tickSpread
|
// tickSpread > 0 AND current order price == second price + tickSpread
|
||||||
// current order price == first price
|
// current order price == first price
|
||||||
logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String())
|
logrus.Infof("orderPrice = %s, best price = %s, second level price = %s, tickSpread = %s",
|
||||||
|
orderPrice.String(),
|
||||||
|
first.Price.String(),
|
||||||
|
second.Price.String(),
|
||||||
|
tickSpread.String())
|
||||||
|
|
||||||
switch e.side {
|
switch e.side {
|
||||||
case types.SideTypeBuy:
|
case types.SideTypeBuy:
|
||||||
if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) {
|
if tickSpread.Sign() > 0 && orderPrice.Compare(second.Price.Add(tickSpread)) == 0 {
|
||||||
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
e.logger.Infof("the current order is already on the best ask price %s, skip update", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
} else if orderPrice == first.Price {
|
} else if orderPrice == first.Price {
|
||||||
logrus.Infof("the current order is already on the best bid price %s", orderPrice.String())
|
e.logger.Infof("the current order is already on the best bid price %s, skip update", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
case types.SideTypeSell:
|
case types.SideTypeSell:
|
||||||
if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) {
|
if tickSpread.Sign() > 0 && orderPrice.Compare(second.Price.Sub(tickSpread)) == 0 {
|
||||||
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
e.logger.Infof("the current order is already on the best ask price %s, skip update", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
} else if orderPrice == first.Price {
|
} else if orderPrice == first.Price {
|
||||||
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
e.logger.Infof("the current order is already on the best ask price %s, skip update", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -379,6 +352,10 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
||||||
|
|
||||||
e.tradeCollector.Process()
|
e.tradeCollector.Process()
|
||||||
|
|
||||||
|
if e.delayInterval > 0 {
|
||||||
|
time.Sleep(e.delayInterval)
|
||||||
|
}
|
||||||
|
|
||||||
orderForm, err := e.generateOrder()
|
orderForm, err := e.generateOrder()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -386,7 +363,11 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
createdOrder, err := e.exchange.SubmitOrder(ctx, *orderForm)
|
return e.submitOrder(ctx, *orderForm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *FixedQuantityExecutor) submitOrder(ctx context.Context, orderForm types.SubmitOrder) error {
|
||||||
|
createdOrder, err := e.exchange.SubmitOrder(ctx, orderForm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -603,6 +584,8 @@ func (e *FixedQuantityExecutor) Done() <-chan struct{} {
|
||||||
// 1. Stop the order updater (by using the execution context)
|
// 1. Stop the order updater (by using the execution context)
|
||||||
// 2. The order updater cancels all open orders and closes the user data stream
|
// 2. The order updater cancels all open orders and closes the user data stream
|
||||||
func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) {
|
func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) {
|
||||||
|
e.tradeCollector.Process()
|
||||||
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
if e.cancelExecution != nil {
|
if e.cancelExecution != nil {
|
||||||
e.cancelExecution()
|
e.cancelExecution()
|
||||||
|
|
|
@ -170,7 +170,7 @@ func TestNewStreamExecutor(t *testing.T) {
|
||||||
}
|
}
|
||||||
mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), firstSubmitOrder).Return(&firstOrder, nil)
|
mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), firstSubmitOrder).Return(&firstOrder, nil)
|
||||||
|
|
||||||
executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, targetQuantity, sliceQuantity)
|
executor := NewFixedQuantityExecutor(mockEx, symbol, market, types.SideTypeBuy, targetQuantity, sliceQuantity)
|
||||||
executor.SetUpdateInterval(200 * time.Millisecond)
|
executor.SetUpdateInterval(200 * time.Millisecond)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,3 +18,41 @@ func NewValidLimiter(r rate.Limit, b int) (*rate.Limiter, error) {
|
||||||
}
|
}
|
||||||
return rate.NewLimiter(r, b), nil
|
return rate.NewLimiter(r, b), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseRateLimitSyntax parses the rate limit syntax into the rate.Limiter parameters
|
||||||
|
// sample inputs:
|
||||||
|
//
|
||||||
|
// 2+1/5s (2 initial tokens, 1 token per 5 seconds)
|
||||||
|
// 5+3/1m (5 initial tokens, 3 tokens per minute)
|
||||||
|
// 3m (3 tokens per minute)
|
||||||
|
// 1/3m (1 token per 3 minutes)
|
||||||
|
func ParseRateLimitSyntax(desc string) (*rate.Limiter, error) {
|
||||||
|
var b = 0
|
||||||
|
var r = 1.0
|
||||||
|
var durStr string
|
||||||
|
var duration time.Duration
|
||||||
|
|
||||||
|
_, err := fmt.Sscanf(desc, "%d+%f/%s", &b, &r, &durStr)
|
||||||
|
if err != nil {
|
||||||
|
b = 0
|
||||||
|
r = 1.0
|
||||||
|
_, err = fmt.Sscanf(desc, "%f/%s", &r, &durStr)
|
||||||
|
if err != nil {
|
||||||
|
durStr = desc
|
||||||
|
// need to reset
|
||||||
|
b = 1
|
||||||
|
r = 1.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
duration, err = time.ParseDuration(durStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid rate limit syntax: b+n/duration, err: %v", err)
|
||||||
|
}
|
||||||
|
if r == 1.0 {
|
||||||
|
return NewValidLimiter(rate.Every(duration), b)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("%v %v", duration, r)
|
||||||
|
return NewValidLimiter(rate.Every(time.Duration(float64(duration)/r)), b)
|
||||||
|
}
|
||||||
|
|
|
@ -41,3 +41,32 @@ func TestShouldDelay(t *testing.T) {
|
||||||
assert.True(t, ShouldDelay(limiter, minInterval) > 0)
|
assert.True(t, ShouldDelay(limiter, minInterval) > 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseRateLimitSyntax(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
desc string
|
||||||
|
expectedBurst int
|
||||||
|
expectedRate float64
|
||||||
|
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{"2+1/5s", 2, 1.0 / 5.0, false},
|
||||||
|
{"5+1/3m", 5, 1 / 3 * 60.0, false},
|
||||||
|
{"1m", 1, 1.0 / 60.0, false},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
|
limiter, err := ParseRateLimitSyntax(test.desc)
|
||||||
|
if test.wantErr {
|
||||||
|
assert.Error(t, err)
|
||||||
|
} else if assert.NoError(t, err) {
|
||||||
|
assert.NotNil(t, limiter)
|
||||||
|
burst := limiter.Burst()
|
||||||
|
assert.Equal(t, test.expectedBurst, burst)
|
||||||
|
|
||||||
|
limit := limiter.Limit()
|
||||||
|
assert.InDeltaf(t, test.expectedRate, float64(limit), 0.01, "expected rate %f, got %f", test.expectedRate, limit)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user