Merge pull request #1438 from c9s/feature/xdepthmaker

STRATEGY: add xdepthmaker strategy
This commit is contained in:
c9s 2023-12-08 09:37:40 +08:00 committed by GitHub
commit a4ae414c1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1378 additions and 256 deletions

View File

@ -16,9 +16,9 @@ jobs:
strategy:
matrix:
redis-version:
- 6.2
- "6.2"
go-version:
- 1.18
- "1.20"
env:
MYSQL_DATABASE: bbgo
MYSQL_USER: "root"

View File

@ -20,7 +20,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.19
- name: Install Node
uses: actions/setup-node@v2
with:

67
config/xdepthmaker.yaml Normal file
View File

@ -0,0 +1,67 @@
---
notifications:
slack:
defaultChannel: "dev-bbgo"
errorChannel: "bbgo-error"
switches:
trade: true
orderUpdate: false
submitOrder: false
persistence:
json:
directory: var/data
redis:
host: 127.0.0.1
port: 6379
db: 0
logging:
trade: true
order: true
fields:
env: staging
sessions:
max:
exchange: max
envVarPrefix: max
binance:
exchange: binance
envVarPrefix: binance
crossExchangeStrategies:
- xdepthmaker:
symbol: "BTCUSDT"
makerExchange: max
hedgeExchange: binance
# disableHedge disables the hedge orders on the source exchange
# disableHedge: true
hedgeInterval: 10s
notifyTrade: true
margin: 0.004
askMargin: 0.4%
bidMargin: 0.4%
depthScale:
byLayer:
linear:
domain: [1, 30]
range: [50, 20_000]
# numLayers means how many order we want to place on each side. 3 means we want 3 bid orders and 3 ask orders
numLayers: 30
# pips is the fraction numbers between each order. for BTC, 1 pip is 0.1,
# 0.1 pip is 0.01, here we use 10, so we will get 18000.00, 18001.00 and
# 18002.00
pips: 10
persistence:
type: redis

View File

@ -10,7 +10,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types"
)
@ -179,7 +178,7 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
waitTime := CancelOrderWaitTime
startTime := time.Now()
// ensure every order is cancelled
// ensure every order is canceled
for {
// Some orders in the variable are not created on the server side yet,
// If we cancel these orders directly, we will get an unsent order error
@ -204,25 +203,28 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
// verify the current open orders via the RESTful API
log.Warnf("[ActiveOrderBook] using REStful API to verify active orders...")
var symbols = map[string]struct{}{}
var symbolOrdersMap = map[string]types.OrderSlice{}
for _, order := range orders {
symbols[order.Symbol] = struct{}{}
symbolOrdersMap[order.Symbol] = append(symbolOrdersMap[order.Symbol], order)
}
var leftOrders []types.Order
for symbol := range symbols {
var leftOrders []types.Order
for symbol := range symbolOrdersMap {
symbolOrders, ok := symbolOrdersMap[symbol]
if !ok {
continue
}
openOrders, err := ex.QueryOpenOrders(ctx, symbol)
if err != nil {
log.WithError(err).Errorf("can not query %s open orders", symbol)
continue
}
openOrderStore := core.NewOrderStore(symbol)
openOrderStore.Add(openOrders...)
for _, o := range orders {
orderMap := types.NewOrderMap(openOrders...)
for _, o := range symbolOrders {
// if it's not on the order book (open orders), we should remove it from our local side
if !openOrderStore.Exists(o.OrderID) {
if !orderMap.Exists(o.OrderID) {
b.Remove(o)
} else {
leftOrders = append(leftOrders, o)
@ -230,6 +232,7 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
}
}
// update order slice for the next try
orders = leftOrders
}

View File

@ -68,8 +68,12 @@ type GeneralOrderExecutor struct {
disableNotify bool
}
// NewGeneralOrderExecutor allocates a GeneralOrderExecutor
// which has its own order store, trade collector
func NewGeneralOrderExecutor(
session *ExchangeSession, symbol, strategy, strategyInstanceID string, position *types.Position,
session *ExchangeSession,
symbol, strategy, strategyInstanceID string,
position *types.Position,
) *GeneralOrderExecutor {
// Always update the position fields
position.Strategy = strategy

View File

@ -1,6 +1,7 @@
package bbgo
import (
"encoding/json"
"fmt"
"math"
@ -318,6 +319,18 @@ type LayerScale struct {
LayerRule *SlideRule `json:"byLayer"`
}
func (s *LayerScale) UnmarshalJSON(data []byte) error {
type T LayerScale
var p T
err := json.Unmarshal(data, &p)
if err != nil {
return err
}
*s = LayerScale(p)
return nil
}
func (s *LayerScale) Scale(layer int) (quantity float64, err error) {
if s.LayerRule == nil {
err = errors.New("either price or volume scale is not defined")

View File

@ -1,6 +1,7 @@
package bbgo
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
@ -8,6 +9,24 @@ import (
const delta = 1e-9
func TestLayerScale_UnmarshalJSON(t *testing.T) {
var s LayerScale
err := json.Unmarshal([]byte(`{
"byLayer": {
"linear": {
"domain": [ 1, 3 ],
"range": [ 10000.0, 30000.0 ]
}
}
}`), &s)
assert.NoError(t, err)
if assert.NotNil(t, s.LayerRule) {
assert.NotNil(t, s.LayerRule.LinearScale.Range)
assert.NotNil(t, s.LayerRule.LinearScale.Domain)
}
}
func TestExponentialScale(t *testing.T) {
// graph see: https://www.desmos.com/calculator/ip0ijbcbbf
scale := ExponentialScale{

View File

@ -45,10 +45,10 @@ import (
_ "github.com/c9s/bbgo/pkg/strategy/wall"
_ "github.com/c9s/bbgo/pkg/strategy/xalign"
_ "github.com/c9s/bbgo/pkg/strategy/xbalance"
_ "github.com/c9s/bbgo/pkg/strategy/xdepthmaker"
_ "github.com/c9s/bbgo/pkg/strategy/xfixedmaker"
_ "github.com/c9s/bbgo/pkg/strategy/xfunding"
_ "github.com/c9s/bbgo/pkg/strategy/xgap"
_ "github.com/c9s/bbgo/pkg/strategy/xmaker"
_ "github.com/c9s/bbgo/pkg/strategy/xnav"
_ "github.com/c9s/bbgo/pkg/strategy/xpuremaker"
)

View File

@ -75,7 +75,7 @@ func (s *Stream) syncSubscriptions(opType WsEventType) error {
}
logger := log.WithField("opType", opType)
args := []WsArg{}
var args []WsArg
for _, subscription := range s.Subscriptions {
arg, err := convertSubscription(subscription)
if err != nil {
@ -244,9 +244,11 @@ func convertSubscription(sub types.Subscription) (WsArg, error) {
arg.Channel = ChannelOrderBook5
switch sub.Options.Depth {
case types.DepthLevel15:
case types.DepthLevel5:
arg.Channel = ChannelOrderBook5
case types.DepthLevel15, types.DepthLevelMedium:
arg.Channel = ChannelOrderBook15
case types.DepthLevel200:
case types.DepthLevel200, types.DepthLevelFull:
log.Warn("*** The subscription events for the order book may return fewer than 200 bids/asks at a depth of 200. ***")
arg.Channel = ChannelOrderBook
}

View File

@ -3,6 +3,7 @@ package retry
import (
"context"
"errors"
"fmt"
"strconv"
"github.com/cenkalti/backoff/v4"
@ -16,6 +17,34 @@ type advancedOrderCancelService interface {
CancelOrdersByGroupID(ctx context.Context, groupID uint32) ([]types.Order, error)
}
func QueryOrderUntilCanceled(
ctx context.Context, queryOrderService types.ExchangeOrderQueryService, symbol string, orderId uint64,
) (o *types.Order, err error) {
var op = func() (err2 error) {
o, err2 = queryOrderService.QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(orderId, 10),
})
if err2 != nil {
return err2
}
if o == nil {
return fmt.Errorf("order #%d response is nil", orderId)
}
if o.Status == types.OrderStatusCanceled || o.Status == types.OrderStatusFilled {
return nil
}
return fmt.Errorf("order #%d is not canceled yet: %s", o.OrderID, o.Status)
}
err = GeneralBackoff(ctx, op)
return o, err
}
func QueryOrderUntilFilled(
ctx context.Context, queryOrderService types.ExchangeOrderQueryService, symbol string, orderId uint64,
) (o *types.Order, err error) {

View File

@ -1197,7 +1197,7 @@ func align(x, y *Value) bool {
}
yshift = e
// check(0 <= yshift && yshift <= 20)
//y.coef = (y.coef + halfpow10[yshift]) / pow10[yshift]
// y.coef = (y.coef + halfpow10[yshift]) / pow10[yshift]
y.coef = (y.coef) / pow10[yshift]
// check(int(y.exp)+yshift == int(x.exp))
return true
@ -1364,4 +1364,4 @@ func (x Value) Clamp(min, max Value) Value {
return max
}
return x
}
}

View File

@ -0,0 +1,32 @@
package xdepthmaker
import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func aggregatePrice(pvs types.PriceVolumeSlice, requiredQuantity fixedpoint.Value) (price fixedpoint.Value) {
q := requiredQuantity
totalAmount := fixedpoint.Zero
if len(pvs) == 0 {
price = fixedpoint.Zero
return price
} else if pvs[0].Volume.Compare(requiredQuantity) >= 0 {
return pvs[0].Price
}
for i := 0; i < len(pvs); i++ {
pv := pvs[i]
if pv.Volume.Compare(q) >= 0 {
totalAmount = totalAmount.Add(q.Mul(pv.Price))
break
}
q = q.Sub(pv.Volume)
totalAmount = totalAmount.Add(pv.Volume.Mul(pv.Price))
}
price = totalAmount.Div(requiredQuantity.Sub(q))
return price
}

View File

@ -0,0 +1,68 @@
package xdepthmaker
import (
"sync"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type State struct {
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty"`
// Deprecated:
Position *types.Position `json:"position,omitempty"`
// Deprecated:
ProfitStats ProfitStats `json:"profitStats,omitempty"`
}
type ProfitStats struct {
*types.ProfitStats
lock sync.Mutex
MakerExchange types.ExchangeName `json:"makerExchange"`
AccumulatedMakerVolume fixedpoint.Value `json:"accumulatedMakerVolume,omitempty"`
AccumulatedMakerBidVolume fixedpoint.Value `json:"accumulatedMakerBidVolume,omitempty"`
AccumulatedMakerAskVolume fixedpoint.Value `json:"accumulatedMakerAskVolume,omitempty"`
TodayMakerVolume fixedpoint.Value `json:"todayMakerVolume,omitempty"`
TodayMakerBidVolume fixedpoint.Value `json:"todayMakerBidVolume,omitempty"`
TodayMakerAskVolume fixedpoint.Value `json:"todayMakerAskVolume,omitempty"`
}
func (s *ProfitStats) AddTrade(trade types.Trade) {
s.ProfitStats.AddTrade(trade)
if trade.Exchange == s.MakerExchange {
s.lock.Lock()
s.AccumulatedMakerVolume = s.AccumulatedMakerVolume.Add(trade.Quantity)
s.TodayMakerVolume = s.TodayMakerVolume.Add(trade.Quantity)
switch trade.Side {
case types.SideTypeSell:
s.AccumulatedMakerAskVolume = s.AccumulatedMakerAskVolume.Add(trade.Quantity)
s.TodayMakerAskVolume = s.TodayMakerAskVolume.Add(trade.Quantity)
case types.SideTypeBuy:
s.AccumulatedMakerBidVolume = s.AccumulatedMakerBidVolume.Add(trade.Quantity)
s.TodayMakerBidVolume = s.TodayMakerBidVolume.Add(trade.Quantity)
}
s.lock.Unlock()
}
}
func (s *ProfitStats) ResetToday() {
s.ProfitStats.ResetToday(time.Now())
s.lock.Lock()
s.TodayMakerVolume = fixedpoint.Zero
s.TodayMakerBidVolume = fixedpoint.Zero
s.TodayMakerAskVolume = fixedpoint.Zero
s.lock.Unlock()
}

View File

@ -0,0 +1,822 @@
package xdepthmaker
import (
"context"
stderrors "errors"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
var lastPriceModifier = fixedpoint.NewFromFloat(1.001)
var minGap = fixedpoint.NewFromFloat(1.02)
var defaultMargin = fixedpoint.NewFromFloat(0.003)
var Two = fixedpoint.NewFromInt(2)
const priceUpdateTimeout = 30 * time.Second
const ID = "xdepthmaker"
var log = logrus.WithField("strategy", ID)
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
func notifyTrade(trade types.Trade, _, _ fixedpoint.Value) {
bbgo.Notify(trade)
}
type CrossExchangeMarketMakingStrategy struct {
ctx, parent context.Context
cancel context.CancelFunc
Environ *bbgo.Environment
makerSession, hedgeSession *bbgo.ExchangeSession
makerMarket, hedgeMarket types.Market
// persistence fields
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
MakerOrderExecutor, HedgeOrderExecutor *bbgo.GeneralOrderExecutor
// orderStore is a shared order store between the maker session and the hedge session
orderStore *core.OrderStore
// tradeCollector is a shared trade collector between the maker session and the hedge session
tradeCollector *core.TradeCollector
}
func (s *CrossExchangeMarketMakingStrategy) Initialize(
ctx context.Context, environ *bbgo.Environment,
makerSession, hedgeSession *bbgo.ExchangeSession,
symbol, strategyID, instanceID string,
) error {
s.parent = ctx
s.ctx, s.cancel = context.WithCancel(ctx)
s.Environ = environ
s.makerSession = makerSession
s.hedgeSession = hedgeSession
var ok bool
s.hedgeMarket, ok = s.hedgeSession.Market(symbol)
if !ok {
return fmt.Errorf("source session market %s is not defined", symbol)
}
s.makerMarket, ok = s.makerSession.Market(symbol)
if !ok {
return fmt.Errorf("maker session market %s is not defined", symbol)
}
if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(s.makerMarket)
}
if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.makerMarket)
}
// Always update the position fields
s.Position.Strategy = strategyID
s.Position.StrategyInstanceID = instanceID
// if anyone of the fee rate is defined, this assumes that both are defined.
// so that zero maker fee could be applied
for _, ses := range []*bbgo.ExchangeSession{makerSession, hedgeSession} {
if ses.MakerFeeRate.Sign() > 0 || ses.TakerFeeRate.Sign() > 0 {
s.Position.SetExchangeFeeRate(ses.ExchangeName, types.ExchangeFee{
MakerFeeRate: ses.MakerFeeRate,
TakerFeeRate: ses.TakerFeeRate,
})
}
}
s.MakerOrderExecutor = bbgo.NewGeneralOrderExecutor(
makerSession,
s.makerMarket.Symbol,
strategyID, instanceID,
s.Position)
s.MakerOrderExecutor.BindEnvironment(environ)
s.MakerOrderExecutor.BindProfitStats(s.ProfitStats)
s.MakerOrderExecutor.Bind()
s.MakerOrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
// bbgo.Sync(ctx, s)
})
s.HedgeOrderExecutor = bbgo.NewGeneralOrderExecutor(
hedgeSession,
s.hedgeMarket.Symbol,
strategyID, instanceID,
s.Position)
s.HedgeOrderExecutor.BindEnvironment(environ)
s.HedgeOrderExecutor.BindProfitStats(s.ProfitStats)
s.HedgeOrderExecutor.Bind()
s.HedgeOrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
// bbgo.Sync(ctx, s)
})
s.orderStore = core.NewOrderStore(s.Position.Symbol)
s.orderStore.BindStream(hedgeSession.UserDataStream)
s.orderStore.BindStream(makerSession.UserDataStream)
s.tradeCollector = core.NewTradeCollector(symbol, s.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
c := trade.PositionChange()
// sync covered position
// sell trade -> negative delta ->
// 1) long position -> reduce long position
// 2) short position -> increase short position
// buy trade -> positive delta ->
// 1) short position -> reduce short position
// 2) short position -> increase short position
if trade.Exchange == s.hedgeSession.ExchangeName {
// TODO: make this atomic
s.CoveredPosition = s.CoveredPosition.Add(c)
}
s.ProfitStats.AddTrade(trade)
if profit.Compare(fixedpoint.Zero) == 0 {
s.Environ.RecordPosition(s.Position, trade, nil)
} else {
log.Infof("%s generated profit: %v", symbol, profit)
p := s.Position.NewProfit(trade, profit, netProfit)
bbgo.Notify(&p)
s.ProfitStats.AddProfit(p)
s.Environ.RecordPosition(s.Position, trade, &p)
}
})
s.tradeCollector.BindStream(s.hedgeSession.UserDataStream)
s.tradeCollector.BindStream(s.makerSession.UserDataStream)
return nil
}
type Strategy struct {
*CrossExchangeMarketMakingStrategy
Environment *bbgo.Environment
Symbol string `json:"symbol"`
// HedgeExchange session name
HedgeExchange string `json:"hedgeExchange"`
// MakerExchange session name
MakerExchange string `json:"makerExchange"`
UpdateInterval types.Duration `json:"updateInterval"`
HedgeInterval types.Duration `json:"hedgeInterval"`
FullReplenishInterval types.Duration `json:"fullReplenishInterval"`
OrderCancelWaitTime types.Duration `json:"orderCancelWaitTime"`
Margin fixedpoint.Value `json:"margin"`
BidMargin fixedpoint.Value `json:"bidMargin"`
AskMargin fixedpoint.Value `json:"askMargin"`
StopHedgeQuoteBalance fixedpoint.Value `json:"stopHedgeQuoteBalance"`
StopHedgeBaseBalance fixedpoint.Value `json:"stopHedgeBaseBalance"`
// Quantity is used for fixed quantity of the first layer
Quantity fixedpoint.Value `json:"quantity"`
// QuantityScale helps user to define the quantity by layer scale
QuantityScale *bbgo.LayerScale `json:"quantityScale,omitempty"`
// DepthScale helps user to define the depth by layer scale
DepthScale *bbgo.LayerScale `json:"depthScale,omitempty"`
// MaxExposurePosition defines the unhedged quantity of stop
MaxExposurePosition fixedpoint.Value `json:"maxExposurePosition"`
NotifyTrade bool `json:"notifyTrade"`
// RecoverTrade tries to find the missing trades via the REStful API
RecoverTrade bool `json:"recoverTrade"`
RecoverTradeScanPeriod types.Duration `json:"recoverTradeScanPeriod"`
NumLayers int `json:"numLayers"`
// Pips is the pips of the layer prices
Pips fixedpoint.Value `json:"pips"`
// --------------------------------
// private fields
// --------------------------------
// pricingBook is the order book (depth) from the hedging session
pricingBook *types.StreamOrderBook
hedgeErrorLimiter *rate.Limiter
hedgeErrorRateReservation *rate.Reservation
askPriceHeartBeat, bidPriceHeartBeat *types.PriceHeartBeat
lastPrice fixedpoint.Value
stopC chan struct{}
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s", ID, s.Symbol)
}
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
makerSession, hedgeSession, err := selectSessions2(sessions, s.MakerExchange, s.HedgeExchange)
if err != nil {
panic(err)
}
hedgeSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
Speed: types.SpeedLow,
})
hedgeSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
}
func (s *Strategy) Validate() error {
if s.MakerExchange == "" {
return errors.New("maker exchange is not configured")
}
if s.HedgeExchange == "" {
return errors.New("maker exchange is not configured")
}
if s.DepthScale == nil {
return errors.New("depthScale can not be empty")
}
if len(s.Symbol) == 0 {
return errors.New("symbol is required")
}
return nil
}
func (s *Strategy) Defaults() error {
if s.UpdateInterval == 0 {
s.UpdateInterval = types.Duration(time.Second)
}
if s.FullReplenishInterval == 0 {
s.FullReplenishInterval = types.Duration(15 * time.Minute)
}
if s.HedgeInterval == 0 {
s.HedgeInterval = types.Duration(3 * time.Second)
}
if s.NumLayers == 0 {
s.NumLayers = 1
}
if s.Margin.IsZero() {
s.Margin = defaultMargin
}
if s.BidMargin.IsZero() {
if !s.Margin.IsZero() {
s.BidMargin = s.Margin
} else {
s.BidMargin = defaultMargin
}
}
if s.AskMargin.IsZero() {
if !s.Margin.IsZero() {
s.AskMargin = s.Margin
} else {
s.AskMargin = defaultMargin
}
}
s.hedgeErrorLimiter = rate.NewLimiter(rate.Every(1*time.Minute), 1)
return nil
}
func (s *Strategy) Initialize() error {
s.bidPriceHeartBeat = types.NewPriceHeartBeat(priceUpdateTimeout)
s.askPriceHeartBeat = types.NewPriceHeartBeat(priceUpdateTimeout)
return nil
}
func (s *Strategy) CrossRun(
ctx context.Context, _ bbgo.OrderExecutionRouter,
sessions map[string]*bbgo.ExchangeSession,
) error {
makerSession, hedgeSession, err := selectSessions2(sessions, s.MakerExchange, s.HedgeExchange)
if err != nil {
return err
}
s.CrossExchangeMarketMakingStrategy = &CrossExchangeMarketMakingStrategy{}
if err := s.CrossExchangeMarketMakingStrategy.Initialize(ctx, s.Environment, makerSession, hedgeSession, s.Symbol, ID, s.InstanceID()); err != nil {
return err
}
s.pricingBook = types.NewStreamBook(s.Symbol)
s.pricingBook.BindStream(s.hedgeSession.MarketDataStream)
if s.NotifyTrade {
s.tradeCollector.OnTrade(notifyTrade)
}
s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
bbgo.Notify(position)
})
s.stopC = make(chan struct{})
if s.RecoverTrade {
s.tradeCollector.OnRecover(func(trade types.Trade) {
bbgo.Notify("Recovered trade", trade)
})
go s.runTradeRecover(ctx)
}
go func() {
posTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
defer posTicker.Stop()
fullReplenishTicker := time.NewTicker(util.MillisecondsJitter(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop()
s.updateQuote(ctx, 0)
for {
select {
case <-s.stopC:
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol)
return
case <-fullReplenishTicker.C:
s.updateQuote(ctx, 0)
case sig, ok := <-s.pricingBook.C:
// when any book change event happened
if !ok {
return
}
switch sig.Type {
case types.BookSignalSnapshot:
s.updateQuote(ctx, 0)
case types.BookSignalUpdate:
s.updateQuote(ctx, 5)
}
case <-posTicker.C:
// For positive position and positive covered position:
// uncover position = +5 - +3 (covered position) = 2
//
// For positive position and negative covered position:
// uncover position = +5 - (-3) (covered position) = 8
//
// meaning we bought 5 on MAX and sent buy order with 3 on binance
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process()
position := s.Position.GetBase()
uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs()
if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol,
position,
s.CoveredPosition,
uncoverPosition,
)
s.Hedge(ctx, uncoverPosition.Neg())
}
}
}
}()
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)
// wait for the quoter to stop
time.Sleep(s.UpdateInterval.Duration())
shutdownCtx, cancelShutdown := context.WithTimeout(context.TODO(), time.Minute)
defer cancelShutdown()
if err := s.MakerOrderExecutor.GracefulCancel(shutdownCtx); err != nil {
log.WithError(err).Errorf("graceful cancel %s order error", s.Symbol)
}
if err := s.HedgeOrderExecutor.GracefulCancel(shutdownCtx); err != nil {
log.WithError(err).Errorf("graceful cancel %s order error", s.Symbol)
}
bbgo.Notify("%s: %s position", ID, s.Symbol, s.Position)
})
return nil
}
func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
side := types.SideTypeBuy
if pos.IsZero() {
return
}
quantity := pos.Abs()
if pos.Sign() < 0 {
side = types.SideTypeSell
}
lastPrice := s.lastPrice
sourceBook := s.pricingBook.CopyDepth(1)
switch side {
case types.SideTypeBuy:
if bestAsk, ok := sourceBook.BestAsk(); ok {
lastPrice = bestAsk.Price
}
case types.SideTypeSell:
if bestBid, ok := sourceBook.BestBid(); ok {
lastPrice = bestBid.Price
}
}
notional := quantity.Mul(lastPrice)
if notional.Compare(s.hedgeMarket.MinNotional) <= 0 {
log.Warnf("%s %v less than min notional, skipping hedge", s.Symbol, notional)
return
}
// adjust quantity according to the balances
account := s.hedgeSession.GetAccount()
switch side {
case types.SideTypeBuy:
// check quote quantity
if quote, ok := account.Balance(s.hedgeMarket.QuoteCurrency); ok {
if quote.Available.Compare(notional) < 0 {
// adjust price to higher 0.1%, so that we can ensure that the order can be executed
quantity = bbgo.AdjustQuantityByMaxAmount(quantity, lastPrice.Mul(lastPriceModifier), quote.Available)
quantity = s.hedgeMarket.TruncateQuantity(quantity)
}
}
case types.SideTypeSell:
// check quote quantity
if base, ok := account.Balance(s.hedgeMarket.BaseCurrency); ok {
if base.Available.Compare(quantity) < 0 {
quantity = base.Available
}
}
}
// truncate quantity for the supported precision
quantity = s.hedgeMarket.TruncateQuantity(quantity)
if notional.Compare(s.hedgeMarket.MinNotional.Mul(minGap)) <= 0 {
log.Warnf("the adjusted amount %v is less than minimal notional %v, skipping hedge", notional, s.hedgeMarket.MinNotional)
return
}
if quantity.Compare(s.hedgeMarket.MinQuantity.Mul(minGap)) <= 0 {
log.Warnf("the adjusted quantity %v is less than minimal quantity %v, skipping hedge", quantity, s.hedgeMarket.MinQuantity)
return
}
if s.hedgeErrorRateReservation != nil {
if !s.hedgeErrorRateReservation.OK() {
return
}
bbgo.Notify("Hit hedge error rate limit, waiting...")
time.Sleep(s.hedgeErrorRateReservation.Delay())
s.hedgeErrorRateReservation = nil
}
log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
createdOrders, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Market: s.hedgeMarket,
Symbol: s.Symbol,
Type: types.OrderTypeMarket,
Side: side,
Quantity: quantity,
})
if err != nil {
s.hedgeErrorRateReservation = s.hedgeErrorLimiter.Reserve()
log.WithError(err).Errorf("market order submit error: %s", err.Error())
return
}
s.orderStore.Add(createdOrders...)
// if the hedge is on sell side, then we should add positive position
switch side {
case types.SideTypeSell:
s.CoveredPosition = s.CoveredPosition.Add(quantity)
case types.SideTypeBuy:
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
}
}
func (s *Strategy) runTradeRecover(ctx context.Context) {
tradeScanInterval := s.RecoverTradeScanPeriod.Duration()
if tradeScanInterval == 0 {
tradeScanInterval = 30 * time.Minute
}
tradeScanOverlapBufferPeriod := 5 * time.Minute
tradeScanTicker := time.NewTicker(tradeScanInterval)
defer tradeScanTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-tradeScanTicker.C:
log.Infof("scanning trades from %s ago...", tradeScanInterval)
if s.RecoverTrade {
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)
if err := s.tradeCollector.Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
}
}
}
}
func (s *Strategy) generateMakerOrders(pricingBook *types.StreamOrderBook, maxLayer int) ([]types.SubmitOrder, error) {
bestBid, bestAsk, hasPrice := pricingBook.BestBidAndAsk()
if !hasPrice {
return nil, nil
}
bestBidPrice := bestBid.Price
bestAskPrice := bestAsk.Price
lastMidPrice := bestBidPrice.Add(bestAskPrice).Div(Two)
_ = lastMidPrice
var submitOrders []types.SubmitOrder
var accumulatedBidQuantity = fixedpoint.Zero
var accumulatedAskQuantity = fixedpoint.Zero
var accumulatedBidQuoteQuantity = fixedpoint.Zero
dupPricingBook := pricingBook.CopyDepth(0)
if maxLayer == 0 || maxLayer > s.NumLayers {
maxLayer = s.NumLayers
}
for _, side := range []types.SideType{types.SideTypeBuy, types.SideTypeSell} {
for i := 1; i <= maxLayer; i++ {
requiredDepthFloat, err := s.DepthScale.Scale(i)
if err != nil {
return nil, errors.Wrapf(err, "depthScale scale error")
}
// requiredDepth is the required depth in quote currency
requiredDepth := fixedpoint.NewFromFloat(requiredDepthFloat)
sideBook := dupPricingBook.SideBook(side)
index := sideBook.IndexByQuoteVolumeDepth(requiredDepth)
pvs := types.PriceVolumeSlice{}
if index == -1 {
pvs = sideBook[:]
} else {
pvs = sideBook[0 : index+1]
}
log.Infof("required depth: %f, pvs: %+v", requiredDepth.Float64(), pvs)
depthPrice, err := averageDepthPrice(pvs)
if err != nil {
log.WithError(err).Errorf("error aggregating depth price")
continue
}
switch side {
case types.SideTypeBuy:
if s.BidMargin.Sign() > 0 {
depthPrice = depthPrice.Mul(fixedpoint.One.Sub(s.BidMargin))
}
depthPrice = depthPrice.Round(s.makerMarket.PricePrecision+1, fixedpoint.Down)
case types.SideTypeSell:
if s.AskMargin.Sign() > 0 {
depthPrice = depthPrice.Mul(fixedpoint.One.Add(s.AskMargin))
}
depthPrice = depthPrice.Round(s.makerMarket.PricePrecision+1, fixedpoint.Up)
}
depthPrice = s.makerMarket.TruncatePrice(depthPrice)
quantity := requiredDepth.Div(depthPrice)
quantity = s.makerMarket.TruncateQuantity(quantity)
log.Infof("side: %s required depth: %f price: %f quantity: %f", side, requiredDepth.Float64(), depthPrice.Float64(), quantity.Float64())
switch side {
case types.SideTypeBuy:
quantity = quantity.Sub(accumulatedBidQuantity)
accumulatedBidQuantity = accumulatedBidQuantity.Add(quantity)
quoteQuantity := fixedpoint.Mul(quantity, depthPrice)
quoteQuantity = quoteQuantity.Round(s.makerMarket.PricePrecision, fixedpoint.Up)
accumulatedBidQuoteQuantity = accumulatedBidQuoteQuantity.Add(quoteQuantity)
case types.SideTypeSell:
quantity = quantity.Sub(accumulatedAskQuantity)
accumulatedAskQuantity = accumulatedAskQuantity.Add(quantity)
}
submitOrders = append(submitOrders, types.SubmitOrder{
Symbol: s.Symbol,
Type: types.OrderTypeLimitMaker,
Market: s.makerMarket,
Side: side,
Price: depthPrice,
Quantity: quantity,
})
}
}
return submitOrders, nil
}
func (s *Strategy) partiallyCancelOrders(ctx context.Context, maxLayer int) error {
buyOrders, sellOrders := s.MakerOrderExecutor.ActiveMakerOrders().Orders().SeparateBySide()
buyOrders = types.SortOrdersByPrice(buyOrders, true)
sellOrders = types.SortOrdersByPrice(sellOrders, false)
buyOrdersToCancel := buyOrders[0:min(maxLayer, len(buyOrders))]
sellOrdersToCancel := sellOrders[0:min(maxLayer, len(sellOrders))]
err1 := s.MakerOrderExecutor.GracefulCancel(ctx, buyOrdersToCancel...)
err2 := s.MakerOrderExecutor.GracefulCancel(ctx, sellOrdersToCancel...)
return stderrors.Join(err1, err2)
}
func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
if maxLayer == 0 {
if err := s.MakerOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Warnf("there are some %s orders not canceled, skipping placing maker orders", s.Symbol)
s.MakerOrderExecutor.ActiveMakerOrders().Print()
return
}
} else {
if err := s.partiallyCancelOrders(ctx, maxLayer); err != nil {
log.WithError(err).Warnf("%s partial order cancel failed", s.Symbol)
return
}
}
numOfMakerOrders := s.MakerOrderExecutor.ActiveMakerOrders().NumOfOrders()
if numOfMakerOrders > 0 {
log.Warnf("maker orders are not all canceled")
return
}
bestBid, bestAsk, hasPrice := s.pricingBook.BestBidAndAsk()
if !hasPrice {
return
}
bestBidPrice := bestBid.Price
bestAskPrice := bestAsk.Price
log.Infof("%s book ticker: best ask / best bid = %v / %v", s.Symbol, bestAskPrice, bestBidPrice)
s.lastPrice = bestBidPrice.Add(bestAskPrice).Div(Two)
bookLastUpdateTime := s.pricingBook.LastUpdateTime()
if _, err := s.bidPriceHeartBeat.Update(bestBid); err != nil {
log.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol,
time.Since(bookLastUpdateTime))
return
}
if _, err := s.askPriceHeartBeat.Update(bestAsk); err != nil {
log.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol,
time.Since(bookLastUpdateTime))
return
}
submitOrders, err := s.generateMakerOrders(s.pricingBook, maxLayer)
if err != nil {
log.WithError(err).Errorf("generate order error")
return
}
if len(submitOrders) == 0 {
log.Warnf("no orders are generated")
return
}
createdOrders, err := s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil {
log.WithError(err).Errorf("order error: %s", err.Error())
return
}
s.orderStore.Add(createdOrders...)
}
func selectSessions2(
sessions map[string]*bbgo.ExchangeSession, n1, n2 string,
) (s1, s2 *bbgo.ExchangeSession, err error) {
for _, n := range []string{n1, n2} {
if _, ok := sessions[n]; !ok {
return nil, nil, fmt.Errorf("session %s is not defined", n)
}
}
s1 = sessions[n1]
s2 = sessions[n2]
return s1, s2, nil
}
func averageDepthPrice(pvs types.PriceVolumeSlice) (price fixedpoint.Value, err error) {
if len(pvs) == 0 {
return fixedpoint.Zero, fmt.Errorf("empty pv slice")
}
totalQuoteAmount := fixedpoint.Zero
totalQuantity := fixedpoint.Zero
for i := 0; i < len(pvs); i++ {
pv := pvs[i]
quoteAmount := fixedpoint.Mul(pv.Volume, pv.Price)
totalQuoteAmount = totalQuoteAmount.Add(quoteAmount)
totalQuantity = totalQuantity.Add(pv.Volume)
}
price = totalQuoteAmount.Div(totalQuantity)
return price, nil
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@ -0,0 +1,74 @@
//go:build !dnum
package xdepthmaker
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/bbgo"
. "github.com/c9s/bbgo/pkg/testing/testhelper"
"github.com/c9s/bbgo/pkg/types"
)
func newTestBTCUSDTMarket() types.Market {
return types.Market{
BaseCurrency: "BTC",
QuoteCurrency: "USDT",
TickSize: Number(0.01),
StepSize: Number(0.000001),
PricePrecision: 2,
VolumePrecision: 8,
MinNotional: Number(8.0),
MinQuantity: Number(0.0003),
}
}
func TestStrategy_generateMakerOrders(t *testing.T) {
s := &Strategy{
Symbol: "BTCUSDT",
NumLayers: 3,
DepthScale: &bbgo.LayerScale{
LayerRule: &bbgo.SlideRule{
LinearScale: &bbgo.LinearScale{
Domain: [2]float64{1.0, 3.0},
Range: [2]float64{1000.0, 15000.0},
},
},
},
CrossExchangeMarketMakingStrategy: &CrossExchangeMarketMakingStrategy{
makerMarket: newTestBTCUSDTMarket(),
},
}
pricingBook := types.NewStreamBook("BTCUSDT")
pricingBook.OrderBook.Load(types.SliceOrderBook{
Symbol: "BTCUSDT",
Bids: types.PriceVolumeSlice{
{Price: Number("25000.00"), Volume: Number("0.1")},
{Price: Number("24900.00"), Volume: Number("0.2")},
{Price: Number("24800.00"), Volume: Number("0.3")},
{Price: Number("24700.00"), Volume: Number("0.4")},
},
Asks: types.PriceVolumeSlice{
{Price: Number("25100.00"), Volume: Number("0.1")},
{Price: Number("25200.00"), Volume: Number("0.2")},
{Price: Number("25300.00"), Volume: Number("0.3")},
{Price: Number("25400.00"), Volume: Number("0.4")},
},
Time: time.Now(),
})
orders, err := s.generateMakerOrders(pricingBook, 0)
assert.NoError(t, err)
AssertOrdersPriceSideQuantity(t, []PriceSideQuantityAssert{
{Side: types.SideTypeBuy, Price: Number("25000"), Quantity: Number("0.04")}, // =~ $1000.00
{Side: types.SideTypeBuy, Price: Number("24866.66"), Quantity: Number("0.281715")}, // =~ $7005.3111219, accumulated amount =~ $1000.00 + $7005.3111219 = $8005.3111219
{Side: types.SideTypeBuy, Price: Number("24800"), Quantity: Number("0.283123")}, // =~ $7021.4504, accumulated amount =~ $1000.00 + $7005.3111219 + $7021.4504 = $8005.3111219 + $7021.4504 =~ $15026.7615219
{Side: types.SideTypeSell, Price: Number("25100"), Quantity: Number("0.03984")},
{Side: types.SideTypeSell, Price: Number("25233.33"), Quantity: Number("0.2772")},
{Side: types.SideTypeSell, Price: Number("25233.33"), Quantity: Number("0.277411")},
}, orders)
}

View File

@ -112,7 +112,7 @@ type Strategy struct {
orderStore *core.OrderStore
tradeCollector *core.TradeCollector
askPriceHeartBeat, bidPriceHeartBeat types.PriceHeartBeat
askPriceHeartBeat, bidPriceHeartBeat *types.PriceHeartBeat
lastPrice fixedpoint.Value
groupID uint32
@ -170,6 +170,12 @@ func aggregatePrice(pvs types.PriceVolumeSlice, requiredQuantity fixedpoint.Valu
return price
}
func (s *Strategy) Initialize() error {
s.bidPriceHeartBeat = types.NewPriceHeartBeat(priceUpdateTimeout)
s.askPriceHeartBeat = types.NewPriceHeartBeat(priceUpdateTimeout)
return nil
}
func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter) {
if err := s.activeMakerOrders.GracefulCancel(ctx, s.makerSession.Exchange); err != nil {
log.Warnf("there are some %s orders not canceled, skipping placing maker orders", s.Symbol)
@ -191,14 +197,14 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
bookLastUpdateTime := s.book.LastUpdateTime()
if _, err := s.bidPriceHeartBeat.Update(bestBid, priceUpdateTimeout); err != nil {
if _, err := s.bidPriceHeartBeat.Update(bestBid); err != nil {
log.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol,
time.Since(bookLastUpdateTime))
return
}
if _, err := s.askPriceHeartBeat.Update(bestAsk, priceUpdateTimeout); err != nil {
if _, err := s.askPriceHeartBeat.Update(bestAsk); err != nil {
log.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol,
time.Since(bookLastUpdateTime))
@ -639,7 +645,9 @@ func (s *Strategy) Validate() error {
return nil
}
func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
func (s *Strategy) CrossRun(
ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession,
) error {
if s.BollBandInterval == "" {
s.BollBandInterval = types.Interval1m
}

View File

@ -1,197 +0,0 @@
package xpuremaker
import (
"context"
"math"
"time"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
const ID = "xpuremaker"
var Ten = fixedpoint.NewFromInt(10)
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
type Strategy struct {
Symbol string `json:"symbol"`
Side string `json:"side"`
NumOrders int `json:"numOrders"`
BehindVolume fixedpoint.Value `json:"behindVolume"`
PriceTick fixedpoint.Value `json:"priceTick"`
BaseQuantity fixedpoint.Value `json:"baseQuantity"`
BuySellRatio float64 `json:"buySellRatio"`
book *types.StreamOrderBook
activeOrders map[string]types.Order
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{})
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
s.book = types.NewStreamBook(s.Symbol)
s.book.BindStream(session.UserDataStream)
s.activeOrders = make(map[string]types.Order)
// We can move the go routine to the parent level.
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
s.update(orderExecutor, session)
for {
select {
case <-ctx.Done():
return
case <-s.book.C:
s.update(orderExecutor, session)
case <-ticker.C:
s.update(orderExecutor, session)
}
}
}()
return nil
}
func (s *Strategy) cancelOrders(session *bbgo.ExchangeSession) {
var deletedIDs []string
for clientOrderID, o := range s.activeOrders {
log.Infof("canceling order: %+v", o)
if err := session.Exchange.CancelOrders(context.Background(), o); err != nil {
log.WithError(err).Error("cancel order error")
continue
}
deletedIDs = append(deletedIDs, clientOrderID)
}
s.book.C.Drain(1*time.Second, 3*time.Second)
for _, id := range deletedIDs {
delete(s.activeOrders, id)
}
}
func (s *Strategy) update(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
s.cancelOrders(session)
switch s.Side {
case "buy":
s.updateOrders(orderExecutor, session, types.SideTypeBuy)
case "sell":
s.updateOrders(orderExecutor, session, types.SideTypeSell)
case "both":
s.updateOrders(orderExecutor, session, types.SideTypeBuy)
s.updateOrders(orderExecutor, session, types.SideTypeSell)
default:
log.Panicf("undefined side: %s", s.Side)
}
s.book.C.Drain(1*time.Second, 3*time.Second)
}
func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession, side types.SideType) {
var book = s.book.Copy()
var pvs = book.SideBook(side)
if len(pvs) == 0 {
log.Warnf("empty side: %s", side)
return
}
log.Infof("placing order behind volume: %f", s.BehindVolume.Float64())
idx := pvs.IndexByVolumeDepth(s.BehindVolume)
if idx == -1 || idx > len(pvs)-1 {
// do not place orders
log.Warn("depth is not enough")
return
}
var depthPrice = pvs[idx].Price
var orders = s.generateOrders(s.Symbol, side, depthPrice, s.PriceTick, s.BaseQuantity, s.NumOrders)
if len(orders) == 0 {
log.Warn("empty orders")
return
}
createdOrders, err := orderExecutor.SubmitOrders(context.Background(), orders...)
if err != nil {
log.WithError(err).Errorf("order submit error")
return
}
// add created orders to the list
for i, o := range createdOrders {
s.activeOrders[o.ClientOrderID] = createdOrders[i]
}
}
func (s *Strategy) generateOrders(symbol string, side types.SideType, price, priceTick, baseQuantity fixedpoint.Value, numOrders int) (orders []types.SubmitOrder) {
var expBase = fixedpoint.Zero
switch side {
case types.SideTypeBuy:
if priceTick.Sign() > 0 {
priceTick = priceTick.Neg()
}
case types.SideTypeSell:
if priceTick.Sign() < 0 {
priceTick = priceTick.Neg()
}
}
decdigits := priceTick.Abs().NumIntDigits()
step := priceTick.Abs().MulExp(-decdigits + 1)
for i := 0; i < numOrders; i++ {
quantityExp := fixedpoint.NewFromFloat(math.Exp(expBase.Float64()))
volume := baseQuantity.Mul(quantityExp)
amount := volume.Mul(price)
// skip order less than 10usd
if amount.Compare(Ten) < 0 {
log.Warnf("amount too small (< 10usd). price=%s volume=%s amount=%s",
price.String(), volume.String(), amount.String())
continue
}
orders = append(orders, types.SubmitOrder{
Symbol: symbol,
Side: side,
Type: types.OrderTypeLimit,
Price: price,
Quantity: volume,
})
log.Infof("%s order: %s @ %s", side, volume.String(), price.String())
if len(orders) >= numOrders {
break
}
price = price.Add(priceTick)
expBase = expBase.Add(step)
}
return orders
}

View File

@ -32,7 +32,7 @@ type PriceSideQuantityAssert struct {
func AssertOrdersPriceSideQuantity(
t *testing.T, asserts []PriceSideQuantityAssert, orders []types.SubmitOrder,
) {
assert.Equalf(t, len(orders), len(asserts), "expecting %d orders", len(asserts))
assert.Equalf(t, len(asserts), len(orders), "expecting %d orders", len(asserts))
var assertPrices, orderPrices fixedpoint.Slice
var assertPricesFloat, orderPricesFloat []float64

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan"
)
type OrderBook interface {
@ -114,13 +113,26 @@ func (b *MutexOrderBook) Update(update SliceOrderBook) {
b.Unlock()
}
//go:generate callbackgen -type StreamOrderBook
type BookSignalType string
const (
BookSignalSnapshot BookSignalType = "snapshot"
BookSignalUpdate BookSignalType = "update"
)
type BookSignal struct {
Type BookSignalType
Book SliceOrderBook
}
// StreamOrderBook receives streaming data from websocket connection and
// update the order book with mutex lock, so you can safely access it.
//
//go:generate callbackgen -type StreamOrderBook
type StreamOrderBook struct {
*MutexOrderBook
C sigchan.Chan
C chan BookSignal
updateCallbacks []func(update SliceOrderBook)
snapshotCallbacks []func(snapshot SliceOrderBook)
@ -129,7 +141,7 @@ type StreamOrderBook struct {
func NewStreamBook(symbol string) *StreamOrderBook {
return &StreamOrderBook{
MutexOrderBook: NewMutexOrderBook(symbol),
C: sigchan.New(60),
C: make(chan BookSignal, 1),
}
}
@ -141,7 +153,9 @@ func (sb *StreamOrderBook) BindStream(stream Stream) {
sb.Load(book)
sb.EmitSnapshot(book)
sb.C.Emit()
// when it's snapshot, it's very important to push the snapshot signal to the caller
sb.C <- BookSignal{Type: BookSignalSnapshot, Book: book}
})
stream.OnBookUpdate(func(book SliceOrderBook) {
@ -151,6 +165,10 @@ func (sb *StreamOrderBook) BindStream(stream Stream) {
sb.Update(book)
sb.EmitUpdate(book)
sb.C.Emit()
select {
case sb.C <- BookSignal{Type: BookSignalUpdate, Book: book}:
default:
}
})
}

View File

@ -8,6 +8,14 @@ import (
// OrderMap is used for storing orders by their order id
type OrderMap map[uint64]Order
func NewOrderMap(os ...Order) OrderMap {
m := OrderMap{}
if len(os) > 0 {
m.Add(os...)
}
return m
}
func (m OrderMap) Backup() (orderForms []SubmitOrder) {
for _, order := range m {
orderForms = append(orderForms, order.Backup())
@ -17,8 +25,10 @@ func (m OrderMap) Backup() (orderForms []SubmitOrder) {
}
// Add the order the the map
func (m OrderMap) Add(o Order) {
m[o.OrderID] = o
func (m OrderMap) Add(os ...Order) {
for _, o := range os {
m[o.OrderID] = o
}
}
// Update only updates the order when the order ID exists in the map
@ -243,3 +253,16 @@ func (m *SyncOrderMap) Orders() (slice OrderSlice) {
}
type OrderSlice []Order
func (s OrderSlice) SeparateBySide() (buyOrders, sellOrders []Order) {
for _, o := range s {
switch o.Side {
case SideTypeBuy:
buyOrders = append(buyOrders, o)
case SideTypeSell:
sellOrders = append(sellOrders, o)
}
}
return buyOrders, sellOrders
}

View File

@ -7,24 +7,38 @@ import (
// PriceHeartBeat is used for monitoring the price volume update.
type PriceHeartBeat struct {
PriceVolume PriceVolume
LastTime time.Time
last PriceVolume
lastUpdatedTime time.Time
timeout time.Duration
}
func NewPriceHeartBeat(timeout time.Duration) *PriceHeartBeat {
return &PriceHeartBeat{
timeout: timeout,
}
}
func (b *PriceHeartBeat) Last() PriceVolume {
return b.last
}
// Update updates the price volume object and the last update time
// It returns (bool, error), when the price is successfully updated, it returns true.
// If the price is not updated (same price) and the last time exceeded the timeout,
// Then false, and an error will be returned
func (b *PriceHeartBeat) Update(pv PriceVolume, timeout time.Duration) (bool, error) {
if b.PriceVolume.Price.IsZero() || b.PriceVolume != pv {
b.PriceVolume = pv
b.LastTime = time.Now()
func (b *PriceHeartBeat) Update(current PriceVolume) (bool, error) {
if b.last.Price.IsZero() || b.last != current {
b.last = current
b.lastUpdatedTime = time.Now()
return true, nil // successfully updated
} else if time.Since(b.LastTime) > timeout {
return false, fmt.Errorf("price %s has not been updating for %s, last update: %s, skip quoting",
b.PriceVolume.String(),
time.Since(b.LastTime),
b.LastTime)
} else {
// if price and volume is not changed
if b.last.Equals(current) && time.Since(b.lastUpdatedTime) > b.timeout {
return false, fmt.Errorf("price %s has not been updating for %s, last update: %s, skip quoting",
b.last.String(),
time.Since(b.lastUpdatedTime),
b.lastUpdatedTime)
}
}
return false, nil

View File

@ -10,20 +10,21 @@ import (
)
func TestPriceHeartBeat_Update(t *testing.T) {
hb := PriceHeartBeat{}
updated, err := hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(22.0), Volume: fixedpoint.NewFromFloat(100.0)}, time.Minute)
hb := NewPriceHeartBeat(time.Minute)
updated, err := hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(22.0), Volume: fixedpoint.NewFromFloat(100.0)})
assert.NoError(t, err)
assert.True(t, updated)
updated, err = hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(22.0), Volume: fixedpoint.NewFromFloat(100.0)}, time.Minute)
updated, err = hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(22.0), Volume: fixedpoint.NewFromFloat(100.0)})
assert.NoError(t, err)
assert.False(t, updated, "should not be updated when pv is not changed")
updated, err = hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(23.0), Volume: fixedpoint.NewFromFloat(100.0)}, time.Minute)
updated, err = hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(23.0), Volume: fixedpoint.NewFromFloat(100.0)})
assert.NoError(t, err)
assert.True(t, updated, "should be updated when the price is changed")
updated, err = hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(23.0), Volume: fixedpoint.NewFromFloat(200.0)}, time.Minute)
updated, err = hb.Update(PriceVolume{Price: fixedpoint.NewFromFloat(23.0), Volume: fixedpoint.NewFromFloat(200.0)})
assert.NoError(t, err)
assert.True(t, updated, "should be updated when the volume is changed")
}

View File

@ -38,7 +38,7 @@ func (slice PriceVolumeSlice) Trim() (pvs PriceVolumeSlice) {
}
func (slice PriceVolumeSlice) CopyDepth(depth int) PriceVolumeSlice {
if depth > len(slice) {
if depth == 0 || depth > len(slice) {
return slice.Copy()
}
@ -67,8 +67,43 @@ func (slice PriceVolumeSlice) First() (PriceVolume, bool) {
return PriceVolume{}, false
}
func (slice PriceVolumeSlice) IndexByQuoteVolumeDepth(requiredQuoteVolume fixedpoint.Value) int {
var totalQuoteVolume = fixedpoint.Zero
for x, pv := range slice {
// this should use float64 multiply
quoteVolume := fixedpoint.Mul(pv.Volume, pv.Price)
totalQuoteVolume = totalQuoteVolume.Add(quoteVolume)
if totalQuoteVolume.Compare(requiredQuoteVolume) >= 0 {
return x
}
}
// depth not enough
return -1
}
func (slice PriceVolumeSlice) SumDepth() fixedpoint.Value {
var total = fixedpoint.Zero
for _, pv := range slice {
total = total.Add(pv.Volume)
}
return total
}
func (slice PriceVolumeSlice) SumDepthInQuote() fixedpoint.Value {
var total = fixedpoint.Zero
for _, pv := range slice {
quoteVolume := fixedpoint.Mul(pv.Price, pv.Volume)
total = total.Add(quoteVolume)
}
return total
}
func (slice PriceVolumeSlice) IndexByVolumeDepth(requiredVolume fixedpoint.Value) int {
var tv fixedpoint.Value = fixedpoint.Zero
var tv = fixedpoint.Zero
for x, el := range slice {
tv = tv.Add(el.Volume)
if tv.Compare(requiredVolume) >= 0 {
@ -76,7 +111,7 @@ func (slice PriceVolumeSlice) IndexByVolumeDepth(requiredVolume fixedpoint.Value
}
}
// not deep enough
// depth not enough
return -1
}

View File

@ -147,6 +147,24 @@ func (p *Profit) PlainText() string {
)
}
// PeriodProfitStats defined the profit stats for a period
// TODO: replace AccumulatedPnL and TodayPnL fields from the ProfitStats struct
type PeriodProfitStats struct {
PnL fixedpoint.Value `json:"pnl,omitempty"`
NetProfit fixedpoint.Value `json:"netProfit,omitempty"`
GrossProfit fixedpoint.Value `json:"grossProfit,omitempty"`
GrossLoss fixedpoint.Value `json:"grossLoss,omitempty"`
Volume fixedpoint.Value `json:"volume,omitempty"`
VolumeInQuote fixedpoint.Value `json:"volumeInQuote,omitempty"`
MakerVolume fixedpoint.Value `json:"makerVolume,omitempty"`
TakerVolume fixedpoint.Value `json:"takerVolume,omitempty"`
// time fields
LastTradeTime time.Time `json:"lastTradeTime,omitempty"`
StartTime time.Time `json:"startTime,omitempty"`
EndTime time.Time `json:"endTime,omitempty"`
}
type ProfitStats struct {
Symbol string `json:"symbol"`
QuoteCurrency string `json:"quoteCurrency"`
@ -164,9 +182,6 @@ type ProfitStats struct {
TodayGrossProfit fixedpoint.Value `json:"todayGrossProfit,omitempty"`
TodayGrossLoss fixedpoint.Value `json:"todayGrossLoss,omitempty"`
TodaySince int64 `json:"todaySince,omitempty"`
//StartTime time.Time
//EndTime time.Time
}
func NewProfitStats(market Market) *ProfitStats {
@ -185,8 +200,8 @@ func NewProfitStats(market Market) *ProfitStats {
TodayGrossProfit: fixedpoint.Zero,
TodayGrossLoss: fixedpoint.Zero,
TodaySince: 0,
//StartTime: time.Now().UTC(),
//EndTime: time.Now().UTC(),
// StartTime: time.Now().UTC(),
// EndTime: time.Now().UTC(),
}
}
@ -229,7 +244,7 @@ func (s *ProfitStats) AddProfit(profit Profit) {
s.TodayGrossLoss = s.TodayGrossLoss.Add(profit.Profit)
}
//s.EndTime = profit.TradedAt.UTC()
// s.EndTime = profit.TradedAt.UTC()
}
func (s *ProfitStats) AddTrade(trade Trade) {

View File

@ -20,6 +20,24 @@ func SortOrdersAscending(orders []Order) []Order {
return orders
}
// SortOrdersByPrice sorts by creation time ascending-ly
func SortOrdersByPrice(orders []Order, descending bool) []Order {
var f func(i, j int) bool
if descending {
f = func(i, j int) bool {
return orders[i].Price.Compare(orders[j].Price) > 0
}
} else {
f = func(i, j int) bool {
return orders[i].Price.Compare(orders[j].Price) < 0
}
}
sort.Slice(orders, f)
return orders
}
// SortOrdersAscending sorts by update time ascending-ly
func SortOrdersUpdateTimeAscending(orders []Order) []Order {
sort.Slice(orders, func(i, j int) bool {

View File

@ -5,6 +5,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
func TestSortTradesAscending(t *testing.T) {
@ -29,3 +31,52 @@ func TestSortTradesAscending(t *testing.T) {
trades = SortTradesAscending(trades)
assert.True(t, trades[0].Time.Before(trades[1].Time.Time()))
}
func getOrderPrices(orders []Order) (prices fixedpoint.Slice) {
for _, o := range orders {
prices = append(prices, o.Price)
}
return prices
}
func TestSortOrdersByPrice(t *testing.T) {
t.Run("ascending", func(t *testing.T) {
orders := []Order{
{SubmitOrder: SubmitOrder{Price: number("10.0")}},
{SubmitOrder: SubmitOrder{Price: number("30.0")}},
{SubmitOrder: SubmitOrder{Price: number("20.0")}},
{SubmitOrder: SubmitOrder{Price: number("25.0")}},
{SubmitOrder: SubmitOrder{Price: number("15.0")}},
}
orders = SortOrdersByPrice(orders, false)
prices := getOrderPrices(orders)
assert.Equal(t, fixedpoint.Slice{
number(10.0),
number(15.0),
number(20.0),
number(25.0),
number(30.0),
}, prices)
})
t.Run("descending", func(t *testing.T) {
orders := []Order{
{SubmitOrder: SubmitOrder{Price: number("10.0")}},
{SubmitOrder: SubmitOrder{Price: number("30.0")}},
{SubmitOrder: SubmitOrder{Price: number("20.0")}},
{SubmitOrder: SubmitOrder{Price: number("25.0")}},
{SubmitOrder: SubmitOrder{Price: number("15.0")}},
}
orders = SortOrdersByPrice(orders, true)
prices := getOrderPrices(orders)
assert.Equal(t, fixedpoint.Slice{
number(30.0),
number(25.0),
number(20.0),
number(15.0),
number(10.0),
}, prices)
})
}

View File

@ -120,6 +120,9 @@ func (trade Trade) CsvRecords() [][]string {
}
}
// PositionChange returns the position delta of this trade
// BUY trade -> positive quantity
// SELL trade -> negative quantity
func (trade Trade) PositionChange() fixedpoint.Value {
q := trade.Quantity
switch trade.Side {