Merge pull request #1753 from c9s/c9s/xdepthmaker/improvements
Some checks are pending
Go / build (1.21, 6.2) (push) Waiting to run
golang-lint / lint (push) Waiting to run

IMPROVE: [xdepthmaker] use order query to update the canceled order, fix depth price, fix symbol column lengths, fix covered position
This commit is contained in:
c9s 2024-09-26 15:06:12 +08:00 committed by GitHub
commit 0c842e0eb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 251 additions and 60 deletions

View File

@ -7,12 +7,12 @@ CREATE TABLE `trades`
`id` BIGINT UNSIGNED,
`order_id` BIGINT UNSIGNED NOT NULL,
`exchange` VARCHAR(24) NOT NULL DEFAULT '',
`symbol` VARCHAR(20) NOT NULL,
`symbol` VARCHAR(32) NOT NULL,
`price` DECIMAL(16, 8) UNSIGNED NOT NULL,
`quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,
`quote_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,
`fee` DECIMAL(16, 8) UNSIGNED NOT NULL,
`fee_currency` VARCHAR(10) NOT NULL,
`fee_currency` VARCHAR(16) NOT NULL,
`is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,
`is_maker` BOOLEAN NOT NULL DEFAULT FALSE,
`side` VARCHAR(4) NOT NULL DEFAULT '',

View File

@ -9,7 +9,7 @@ CREATE TABLE `orders`
`order_id` BIGINT UNSIGNED NOT NULL,
`client_order_id` VARCHAR(122) NOT NULL DEFAULT '',
`order_type` VARCHAR(16) NOT NULL,
`symbol` VARCHAR(20) NOT NULL,
`symbol` VARCHAR(32) NOT NULL,
`status` VARCHAR(12) NOT NULL,
`time_in_force` VARCHAR(4) NOT NULL,
`price` DECIMAL(16, 8) UNSIGNED NOT NULL,

View File

@ -6,7 +6,7 @@ CREATE TABLE `profits`
`strategy` VARCHAR(32) NOT NULL,
`strategy_instance_id` VARCHAR(64) NOT NULL,
`symbol` VARCHAR(8) NOT NULL,
`symbol` VARCHAR(32) NOT NULL,
-- average_cost is the position average cost
`average_cost` DECIMAL(16, 8) UNSIGNED NOT NULL,
@ -25,7 +25,7 @@ CREATE TABLE `profits`
`quote_currency` VARCHAR(10) NOT NULL,
`base_currency` VARCHAR(10) NOT NULL,
`base_currency` VARCHAR(16) NOT NULL,
-- -------------------------------------------------------
-- embedded trade data --
@ -61,7 +61,7 @@ CREATE TABLE `profits`
-- fee
`fee_in_usd` DECIMAL(16, 8),
`fee` DECIMAL(16, 8) NOT NULL,
`fee_currency` VARCHAR(10) NOT NULL,
`fee_currency` VARCHAR(16) NOT NULL,
PRIMARY KEY (`gid`),
UNIQUE KEY `trade_id` (`trade_id`)

View File

@ -6,9 +6,9 @@ CREATE TABLE `positions`
`strategy` VARCHAR(32) NOT NULL,
`strategy_instance_id` VARCHAR(64) NOT NULL,
`symbol` VARCHAR(20) NOT NULL,
`symbol` VARCHAR(32) NOT NULL,
`quote_currency` VARCHAR(10) NOT NULL,
`base_currency` VARCHAR(10) NOT NULL,
`base_currency` VARCHAR(16) NOT NULL,
-- average_cost is the position average cost
`average_cost` DECIMAL(16, 8) UNSIGNED NOT NULL,
@ -19,7 +19,7 @@ CREATE TABLE `positions`
-- trade related columns
`trade_id` BIGINT UNSIGNED NOT NULL, -- the trade id in the exchange
`side` VARCHAR(4) NOT NULL, -- side of the trade
`exchange` VARCHAR(12) NOT NULL, -- exchange of the trade
`exchange` VARCHAR(20) NOT NULL, -- exchange of the trade
`traded_at` DATETIME(3) NOT NULL, -- millisecond timestamp
PRIMARY KEY (`gid`),

View File

@ -1,7 +1,6 @@
-- +up
-- +begin
ALTER TABLE profits
CHANGE symbol symbol VARCHAR(20) NOT NULL;
ALTER TABLE profits CHANGE symbol symbol VARCHAR(32) NOT NULL;
-- +end
-- +down

View File

@ -0,0 +1,26 @@
-- +up
-- +begin
ALTER TABLE profits MODIFY COLUMN symbol VARCHAR(32) NOT NULL;
-- +end
-- +begin
ALTER TABLE profits MODIFY COLUMN base_currency VARCHAR(16) NOT NULL;
-- +end
-- +begin
ALTER TABLE profits MODIFY COLUMN fee_currency VARCHAR(16) NOT NULL;
-- +end
-- +begin
ALTER TABLE positions MODIFY COLUMN base_currency VARCHAR(16) NOT NULL;
-- +end
-- +begin
ALTER TABLE positions MODIFY COLUMN symbol VARCHAR(32) NOT NULL;
-- +end
-- +down
-- +begin
SELECT 1;
-- +end

View File

@ -0,0 +1,10 @@
-- +up
-- +begin
SELECT 1;
-- +end
-- +down
-- +begin
SELECT 1;
-- +end

View File

@ -3,17 +3,21 @@ package bbgo
import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types"
)
const DefaultCancelOrderWaitTime = 20 * time.Millisecond
const DefaultOrderCancelTimeout = 5 * time.Second
// ActiveOrderBook manages the local active order books.
//
@ -35,6 +39,7 @@ type ActiveOrderBook struct {
mu sync.Mutex
cancelOrderWaitTime time.Duration
cancelOrderTimeout time.Duration
}
func NewActiveOrderBook(symbol string) *ActiveOrderBook {
@ -44,6 +49,7 @@ func NewActiveOrderBook(symbol string) *ActiveOrderBook {
pendingOrderUpdates: types.NewSyncOrderMap(),
C: sigchan.New(1),
cancelOrderWaitTime: DefaultCancelOrderWaitTime,
cancelOrderTimeout: DefaultOrderCancelTimeout,
}
}
@ -146,12 +152,11 @@ func (b *ActiveOrderBook) FastCancel(ctx context.Context, ex types.Exchange, ord
// optimize order cancel for back-testing
if IsBackTesting {
return ex.CancelOrders(context.Background(), orders...)
return ex.CancelOrders(ctx, orders...)
}
log.Debugf("[ActiveOrderBook] no wait cancelling %s orders...", b.Symbol)
// since ctx might be canceled, we should use background context here
if err := ex.CancelOrders(context.Background(), orders...); err != nil {
if err := ex.CancelOrders(ctx, orders...); err != nil {
log.WithError(err).Errorf("[ActiveOrderBook] no wait can not cancel %s orders", b.Symbol)
}
@ -175,7 +180,7 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
hasSymbol := b.Symbol != ""
for _, o := range orders {
if hasSymbol && o.Symbol != b.Symbol {
return errors.New("[ActiveOrderBook] cancel " + b.Symbol + " orderbook with different symbol: " + o.Symbol)
return fmt.Errorf("[ActiveOrderBook] canceling %s orderbook with different symbol: %s", b.Symbol, o.Symbol)
}
}
}
@ -187,7 +192,6 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
log.Debugf("[ActiveOrderBook] gracefully cancelling %s orders...", b.Symbol)
waitTime := b.cancelOrderWaitTime
orderCancelTimeout := 5 * time.Second
startTime := time.Now()
// ensure every order is canceled
@ -205,7 +209,7 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
log.Debugf("[ActiveOrderBook] waiting %s for %d %s orders to be cancelled...", waitTime, len(orders), b.Symbol)
if cancelAll {
clear, err := b.waitAllClear(ctx, waitTime, orderCancelTimeout)
clear, err := b.waitAllClear(ctx, waitTime, b.cancelOrderTimeout)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.WithError(err).Errorf("order cancel error")
@ -231,36 +235,63 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
}
// verify the current open orders via the RESTful API
log.Warnf("[ActiveOrderBook] using open orders API to verify the active orders...")
if orderQueryService, ok := ex.(types.ExchangeOrderQueryService); ok {
for idx, o := range orders {
retOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: o.Symbol,
OrderID: strconv.FormatUint(o.OrderID, 10),
})
var symbolOrdersMap = categorizeOrderBySymbol(orders)
var errOccurred bool
var leftOrders types.OrderSlice
for symbol, symbolOrders := range symbolOrdersMap {
openOrders, err := ex.QueryOpenOrders(ctx, symbol)
if err != nil {
errOccurred = true
log.WithError(err).Errorf("can not query %s open orders", symbol)
break
}
if err != nil {
log.WithError(err).Errorf("unable to update order #%d", o.OrderID)
continue
} else if retOrder != nil {
b.Update(*retOrder)
openOrderMap := types.NewOrderMap(openOrders...)
for _, o := range symbolOrders {
// if it's not on the order book (open orders),
// we should remove it from our local side
if !openOrderMap.Exists(o.OrderID) {
b.Remove(o)
} else {
leftOrders.Add(o)
orders[idx] = *retOrder
}
}
if cancelAll {
orders = b.Orders()
} else {
// for partial cancel
orders = filterCanceledOrders(orders)
}
} else {
log.Warnf("[ActiveOrderBook] using open orders API to verify the active orders...")
var symbolOrdersMap = categorizeOrderBySymbol(orders)
var errOccurred bool
var leftOrders types.OrderSlice
for symbol, symbolOrders := range symbolOrdersMap {
openOrders, err := ex.QueryOpenOrders(ctx, symbol)
if err != nil {
errOccurred = true
log.WithError(err).Errorf("can not query %s open orders", symbol)
break
}
openOrderMap := types.NewOrderMap(openOrders...)
for _, o := range symbolOrders {
// if it's not on the order book (open orders),
// we should remove it from our local side
if !openOrderMap.Exists(o.OrderID) {
b.Remove(o)
} else {
leftOrders.Add(o)
}
}
}
// if an error occurs, we cannot update the orders because it will result in an empty order slice.
if !errOccurred {
// update order slice for the next try
orders = leftOrders
}
}
// if an error occurs, we cannot update the orders because it will result in an empty order slice.
if !errOccurred {
// update order slice for the next try
orders = leftOrders
}
}
log.Debugf("[ActiveOrderBook] all %s orders are cancelled successfully in %s", b.Symbol, time.Since(startTime))
@ -491,3 +522,15 @@ func categorizeOrderBySymbol(orders types.OrderSlice) map[string]types.OrderSlic
return orderMap
}
func filterCanceledOrders(orders types.OrderSlice) (ret types.OrderSlice) {
for _, o := range orders {
if o.Status == types.OrderStatusCanceled {
continue
}
ret = append(ret, o)
}
return ret
}

View File

@ -12,7 +12,7 @@ func init() {
func up_main_trades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `trades`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `id` BIGINT UNSIGNED,\n `order_id` BIGINT UNSIGNED NOT NULL,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n `symbol` VARCHAR(20) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quote_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee_currency` VARCHAR(10) NOT NULL,\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `traded_at` DATETIME(3) NOT NULL,\n `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,\n `strategy` VARCHAR(32) NULL,\n `pnl` DECIMAL NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `id` (`exchange`, `symbol`, `side`, `id`)\n);")
_, err = tx.ExecContext(ctx, "CREATE TABLE `trades`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `id` BIGINT UNSIGNED,\n `order_id` BIGINT UNSIGNED NOT NULL,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n `symbol` VARCHAR(32) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quote_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee_currency` VARCHAR(16) NOT NULL,\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `traded_at` DATETIME(3) NOT NULL,\n `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,\n `strategy` VARCHAR(32) NULL,\n `pnl` DECIMAL NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `id` (`exchange`, `symbol`, `side`, `id`)\n);")
if err != nil {
return err
}

View File

@ -12,7 +12,7 @@ func init() {
func up_main_orders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `orders`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- order_id is the order id returned from the exchange\n `order_id` BIGINT UNSIGNED NOT NULL,\n `client_order_id` VARCHAR(122) NOT NULL DEFAULT '',\n `order_type` VARCHAR(16) NOT NULL,\n `symbol` VARCHAR(20) NOT NULL,\n `status` VARCHAR(12) NOT NULL,\n `time_in_force` VARCHAR(4) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `stop_price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `is_working` BOOL NOT NULL DEFAULT FALSE,\n `created_at` DATETIME(3) NOT NULL,\n `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),\n `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,\n PRIMARY KEY (`gid`)\n);")
_, err = tx.ExecContext(ctx, "CREATE TABLE `orders`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- order_id is the order id returned from the exchange\n `order_id` BIGINT UNSIGNED NOT NULL,\n `client_order_id` VARCHAR(122) NOT NULL DEFAULT '',\n `order_type` VARCHAR(16) NOT NULL,\n `symbol` VARCHAR(32) NOT NULL,\n `status` VARCHAR(12) NOT NULL,\n `time_in_force` VARCHAR(4) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `stop_price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `is_working` BOOL NOT NULL DEFAULT FALSE,\n `created_at` DATETIME(3) NOT NULL,\n `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),\n `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,\n PRIMARY KEY (`gid`)\n);")
if err != nil {
return err
}

View File

@ -12,7 +12,7 @@ func init() {
func up_main_addProfitTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `profits`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `strategy` VARCHAR(32) NOT NULL,\n `strategy_instance_id` VARCHAR(64) NOT NULL,\n `symbol` VARCHAR(8) NOT NULL,\n -- average_cost is the position average cost\n `average_cost` DECIMAL(16, 8) UNSIGNED NOT NULL,\n -- profit is the pnl (profit and loss)\n `profit` DECIMAL(16, 8) NOT NULL,\n -- net_profit is the pnl (profit and loss)\n `net_profit` DECIMAL(16, 8) NOT NULL,\n -- profit_margin is the pnl (profit and loss)\n `profit_margin` DECIMAL(16, 8) NOT NULL,\n -- net_profit_margin is the pnl (profit and loss)\n `net_profit_margin` DECIMAL(16, 8) NOT NULL,\n `quote_currency` VARCHAR(10) NOT NULL,\n `base_currency` VARCHAR(10) NOT NULL,\n -- -------------------------------------------------------\n -- embedded trade data --\n -- -------------------------------------------------------\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n `is_futures` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,\n `trade_id` BIGINT UNSIGNED NOT NULL,\n -- side is the side of the trade that makes profit\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n -- price is the price of the trade that makes profit\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n -- quantity is the quantity of the trade that makes profit\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n -- quote_quantity is the quote quantity of the trade that makes profit\n `quote_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `traded_at` DATETIME(3) NOT NULL,\n -- fee\n `fee_in_usd` DECIMAL(16, 8),\n `fee` DECIMAL(16, 8) NOT NULL,\n `fee_currency` VARCHAR(10) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `trade_id` (`trade_id`)\n);")
_, err = tx.ExecContext(ctx, "CREATE TABLE `profits`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `strategy` VARCHAR(32) NOT NULL,\n `strategy_instance_id` VARCHAR(64) NOT NULL,\n `symbol` VARCHAR(32) NOT NULL,\n -- average_cost is the position average cost\n `average_cost` DECIMAL(16, 8) UNSIGNED NOT NULL,\n -- profit is the pnl (profit and loss)\n `profit` DECIMAL(16, 8) NOT NULL,\n -- net_profit is the pnl (profit and loss)\n `net_profit` DECIMAL(16, 8) NOT NULL,\n -- profit_margin is the pnl (profit and loss)\n `profit_margin` DECIMAL(16, 8) NOT NULL,\n -- net_profit_margin is the pnl (profit and loss)\n `net_profit_margin` DECIMAL(16, 8) NOT NULL,\n `quote_currency` VARCHAR(10) NOT NULL,\n `base_currency` VARCHAR(16) NOT NULL,\n -- -------------------------------------------------------\n -- embedded trade data --\n -- -------------------------------------------------------\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n `is_futures` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,\n `trade_id` BIGINT UNSIGNED NOT NULL,\n -- side is the side of the trade that makes profit\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n -- price is the price of the trade that makes profit\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n -- quantity is the quantity of the trade that makes profit\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n -- quote_quantity is the quote quantity of the trade that makes profit\n `quote_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `traded_at` DATETIME(3) NOT NULL,\n -- fee\n `fee_in_usd` DECIMAL(16, 8),\n `fee` DECIMAL(16, 8) NOT NULL,\n `fee_currency` VARCHAR(16) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `trade_id` (`trade_id`)\n);")
if err != nil {
return err
}

View File

@ -12,7 +12,7 @@ func init() {
func up_main_addPositions(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `positions`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `strategy` VARCHAR(32) NOT NULL,\n `strategy_instance_id` VARCHAR(64) NOT NULL,\n `symbol` VARCHAR(20) NOT NULL,\n `quote_currency` VARCHAR(10) NOT NULL,\n `base_currency` VARCHAR(10) NOT NULL,\n -- average_cost is the position average cost\n `average_cost` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `base` DECIMAL(16, 8) NOT NULL,\n `quote` DECIMAL(16, 8) NOT NULL,\n `profit` DECIMAL(16, 8) NULL,\n -- trade related columns\n `trade_id` BIGINT UNSIGNED NOT NULL, -- the trade id in the exchange\n `side` VARCHAR(4) NOT NULL, -- side of the trade\n `exchange` VARCHAR(12) NOT NULL, -- exchange of the trade\n `traded_at` DATETIME(3) NOT NULL, -- millisecond timestamp\n PRIMARY KEY (`gid`),\n UNIQUE KEY `trade_id` (`trade_id`, `side`, `exchange`)\n);")
_, err = tx.ExecContext(ctx, "CREATE TABLE `positions`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `strategy` VARCHAR(32) NOT NULL,\n `strategy_instance_id` VARCHAR(64) NOT NULL,\n `symbol` VARCHAR(32) NOT NULL,\n `quote_currency` VARCHAR(10) NOT NULL,\n `base_currency` VARCHAR(16) NOT NULL,\n -- average_cost is the position average cost\n `average_cost` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `base` DECIMAL(16, 8) NOT NULL,\n `quote` DECIMAL(16, 8) NOT NULL,\n `profit` DECIMAL(16, 8) NULL,\n -- trade related columns\n `trade_id` BIGINT UNSIGNED NOT NULL, -- the trade id in the exchange\n `side` VARCHAR(4) NOT NULL, -- side of the trade\n `exchange` VARCHAR(20) NOT NULL, -- exchange of the trade\n `traded_at` DATETIME(3) NOT NULL, -- millisecond timestamp\n PRIMARY KEY (`gid`),\n UNIQUE KEY `trade_id` (`trade_id`, `side`, `exchange`)\n);")
if err != nil {
return err
}

View File

@ -12,7 +12,7 @@ func init() {
func up_main_fixProfitSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE profits\n CHANGE symbol symbol VARCHAR(20) NOT NULL;")
_, err = tx.ExecContext(ctx, "ALTER TABLE profits CHANGE symbol symbol VARCHAR(32) NOT NULL;")
if err != nil {
return err
}

View File

@ -0,0 +1,45 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper/v2"
)
func init() {
AddMigration("main", up_main_fixSymbolLength2, down_main_fixSymbolLength2)
}
func up_main_fixSymbolLength2(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE profits MODIFY COLUMN symbol VARCHAR(32) NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE profits MODIFY COLUMN base_currency VARCHAR(16) NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE profits MODIFY COLUMN fee_currency VARCHAR(16) NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE positions MODIFY COLUMN base_currency VARCHAR(16) NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE positions MODIFY COLUMN symbol VARCHAR(32) NOT NULL;")
if err != nil {
return err
}
return err
}
func down_main_fixSymbolLength2(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,29 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper/v2"
)
func init() {
AddMigration("main", up_main_fixSymbolLength2, down_main_fixSymbolLength2)
}
func up_main_fixSymbolLength2(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}
func down_main_fixSymbolLength2(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}

View File

@ -148,6 +148,23 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// bbgo.Sync(ctx, s)
})
s.HedgeOrderExecutor.ActiveMakerOrders().OnCanceled(func(o types.Order) {
remaining := o.Quantity.Sub(o.ExecutedQuantity)
log.Infof("canceled order #%d, remaining quantity: %f", o.OrderID, remaining.Float64())
switch o.Side {
case types.SideTypeSell:
remaining = remaining.Neg()
}
remaining = remaining.Neg()
coveredPosition := s.CoveredPosition.Get()
s.CoveredPosition.Sub(remaining)
log.Infof("coveredPosition %f - %f => %f", coveredPosition.Float64(), remaining.Float64(), s.CoveredPosition.Get().Float64())
})
s.HedgeOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
c := trade.PositionChange()
@ -158,8 +175,6 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// buy trade -> positive delta ->
// 1) short position -> reduce short position
// 2) short position -> increase short position
// TODO: make this atomic
s.CoveredPosition.Add(c)
})
return nil
@ -197,6 +212,8 @@ type Strategy struct {
HedgeStrategy HedgeStrategy `json:"hedgeStrategy"`
HedgeMaxOrderQuantity fixedpoint.Value `json:"hedgeMaxOrderQuantity"`
FullReplenishInterval types.Duration `json:"fullReplenishInterval"`
OrderCancelWaitTime types.Duration `json:"orderCancelWaitTime"`
@ -589,6 +606,11 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) error {
quantity := pos.Abs()
if s.HedgeMaxOrderQuantity.Sign() > 0 && quantity.Compare(s.HedgeMaxOrderQuantity) > 0 {
s.logger.Infof("hedgeMaxOrderQuantity is set to %s, limiting the given quantity %s", s.HedgeMaxOrderQuantity.String(), quantity.String())
quantity = fixedpoint.Min(s.HedgeMaxOrderQuantity, quantity)
}
switch s.HedgeStrategy {
case HedgeStrategyMarket:
return s.executeHedgeMarket(ctx, side, quantity)
@ -866,6 +888,9 @@ func (s *Strategy) generateMakerOrders(
continue
}
accumulatedDepth := fixedpoint.Zero
lastMakerPrice := fixedpoint.Zero
layerLoop:
for i := 1; i <= maxLayer; i++ {
// simple break, we need to check the market minNotional and minQuantity later
@ -882,8 +907,9 @@ func (s *Strategy) generateMakerOrders(
// requiredDepth is the required depth in quote currency
requiredDepth := fixedpoint.NewFromFloat(requiredDepthFloat)
accumulatedDepth = accumulatedDepth.Add(requiredDepth)
index := sideBook.IndexByQuoteVolumeDepth(requiredDepth)
index := sideBook.IndexByQuoteVolumeDepth(accumulatedDepth)
pvs := types.PriceVolumeSlice{}
if index == -1 {
@ -896,9 +922,7 @@ func (s *Strategy) generateMakerOrders(
continue
}
log.Infof("side: %s required depth: %f, pvs: %+v", side, requiredDepth.Float64(), pvs)
depthPrice := pvs.AverageDepthPriceByQuote(fixedpoint.Zero, 0)
depthPrice := pvs.AverageDepthPriceByQuote(accumulatedDepth, 0)
switch side {
case types.SideTypeBuy:
@ -918,9 +942,19 @@ func (s *Strategy) generateMakerOrders(
depthPrice = s.makerMarket.TruncatePrice(depthPrice)
if lastMakerPrice.Sign() > 0 && depthPrice.Compare(lastMakerPrice) == 0 {
switch side {
case types.SideTypeBuy:
depthPrice = depthPrice.Sub(s.makerMarket.TickSize)
case types.SideTypeSell:
depthPrice = depthPrice.Add(s.makerMarket.TickSize)
}
}
quantity := requiredDepth.Div(depthPrice)
quantity = s.makerMarket.TruncateQuantity(quantity)
log.Infof("side: %s required depth: %f price: %f quantity: %f", side, requiredDepth.Float64(), depthPrice.Float64(), quantity.Float64())
s.logger.Infof("%d) %s required depth: %f %s@%s", i, side, accumulatedDepth.Float64(), quantity.String(), depthPrice.String())
switch side {
case types.SideTypeBuy:
@ -969,6 +1003,8 @@ func (s *Strategy) generateMakerOrders(
Price: depthPrice,
Quantity: quantity,
})
lastMakerPrice = depthPrice
}
}
@ -1035,7 +1071,7 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
balances, err := s.MakerOrderExecutor.Session().Exchange.QueryAccountBalances(ctx)
if err != nil {
log.WithError(err).Errorf("balance query error")
s.logger.WithError(err).Errorf("balance query error")
return
}
@ -1051,22 +1087,22 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
return
}
log.Infof("quote balance: %s, base balance: %s", quoteBalance, baseBalance)
s.logger.Infof("quote balance: %s, base balance: %s", quoteBalance, baseBalance)
submitOrders, err := s.generateMakerOrders(s.sourceBook, maxLayer, baseBalance.Available, quoteBalance.Available)
if err != nil {
log.WithError(err).Errorf("generate order error")
s.logger.WithError(err).Errorf("generate order error")
return
}
if len(submitOrders) == 0 {
log.Warnf("no orders are generated")
s.logger.Warnf("no orders are generated")
return
}
_, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil {
log.WithError(err).Errorf("order error: %s", err.Error())
s.logger.WithError(err).Errorf("order error: %s", err.Error())
return
}
}

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/bbgo"
@ -42,6 +43,7 @@ func TestStrategy_generateMakerOrders(t *testing.T) {
CrossExchangeMarketMakingStrategy: &CrossExchangeMarketMakingStrategy{
makerMarket: newTestBTCUSDTMarket(),
},
logger: logrus.New(),
}
pricingBook := types.NewStreamBook("BTCUSDT", types.ExchangeBinance)
@ -70,6 +72,6 @@ func TestStrategy_generateMakerOrders(t *testing.T) {
{Side: types.SideTypeBuy, Price: Number("24800"), Quantity: Number("0.283123")}, // =~ $7021.4504, accumulated amount =~ $1000.00 + $7005.3111219 + $7021.4504 = $8005.3111219 + $7021.4504 =~ $15026.7615219
{Side: types.SideTypeSell, Price: Number("25100"), Quantity: Number("0.03984")},
{Side: types.SideTypeSell, Price: Number("25233.33"), Quantity: Number("0.2772")},
{Side: types.SideTypeSell, Price: Number("25233.33"), Quantity: Number("0.277411")},
{Side: types.SideTypeSell, Price: Number("25300"), Quantity: Number("0.275845")},
}, orders)
}

View File

@ -168,7 +168,7 @@ func trimTrailingZero(a float64) string {
// String is for console output
func (trade Trade) String() string {
return fmt.Sprintf("TRADE %s %s %4s %-4s @ %-6s | AMOUNT %s | FEE %s %s | OrderID %d | TID %d | %s",
return fmt.Sprintf("TRADE %s %s %4s %-4s @ %-6s | AMOUNT %s | FEE %s %s | OrderID %d | TID %d | %s | %s",
trade.Exchange.String(),
trade.Symbol,
trade.Side,
@ -180,6 +180,7 @@ func (trade Trade) String() string {
trade.OrderID,
trade.ID,
trade.Time.Time().Format(time.StampMilli),
trade.Liquidity(),
)
}