2021-03-21 02:44:06 +00:00
package xmaker
import (
"context"
"fmt"
"math"
2021-05-09 12:03:06 +00:00
"math/rand"
2021-03-21 02:44:06 +00:00
"sync"
"time"
2021-05-09 18:52:41 +00:00
"github.com/pkg/errors"
2021-03-21 02:44:06 +00:00
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
2021-03-22 09:27:07 +00:00
"github.com/c9s/bbgo/pkg/exchange/max"
2021-03-21 02:44:06 +00:00
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
var defaultMargin = fixedpoint . NewFromFloat ( 0.01 )
2021-05-10 15:27:08 +00:00
var localTimeZone * time . Location
2021-03-21 02:44:06 +00:00
const ID = "xmaker"
const stateKey = "state-v1"
var log = logrus . WithField ( "strategy" , ID )
func init ( ) {
bbgo . RegisterStrategy ( ID , & Strategy { } )
2021-05-10 15:27:08 +00:00
var err error
localTimeZone , err = time . LoadLocation ( "Local" )
if err != nil {
panic ( err )
}
2021-03-21 02:44:06 +00:00
}
type State struct {
2021-05-09 15:56:54 +00:00
HedgePosition fixedpoint . Value ` json:"hedgePosition" `
Position * bbgo . Position ` json:"position,omitempty" `
AccumulatedVolume fixedpoint . Value ` json:"accumulatedVolume,omitempty" `
2021-05-10 12:22:33 +00:00
AccumulatedPnL fixedpoint . Value ` json:"accumulatedPnL,omitempty" `
2021-05-09 15:56:54 +00:00
AccumulatedProfit fixedpoint . Value ` json:"accumulatedProfit,omitempty" `
2021-05-10 12:22:33 +00:00
AccumulatedLoss fixedpoint . Value ` json:"accumulatedLoss,omitempty" `
AccumulatedSince int64 ` json:"accumulatedSince,omitempty" `
2021-03-21 02:44:06 +00:00
}
type Strategy struct {
* bbgo . Graceful
* bbgo . Notifiability
* bbgo . Persistence
Symbol string ` json:"symbol" `
SourceExchange string ` json:"sourceExchange" `
MakerExchange string ` json:"makerExchange" `
2021-05-09 13:14:51 +00:00
UpdateInterval types . Duration ` json:"updateInterval" `
HedgeInterval types . Duration ` json:"hedgeInterval" `
2021-05-09 10:55:56 +00:00
OrderCancelWaitTime types . Duration ` json:"orderCancelWaitTime" `
2021-03-21 02:44:06 +00:00
2021-05-09 18:52:41 +00:00
Margin fixedpoint . Value ` json:"margin" `
BidMargin fixedpoint . Value ` json:"bidMargin" `
AskMargin fixedpoint . Value ` json:"askMargin" `
2021-05-11 04:47:45 +00:00
StopHedgeQuoteBalance fixedpoint . Value ` json:"stopHedgeQuoteBalance" `
StopHedgeBaseBalance fixedpoint . Value ` json:"stopHedgeBaseBalance" `
2021-05-09 18:52:41 +00:00
// Quantity is used for fixed quantity of the first layer
Quantity fixedpoint . Value ` json:"quantity" `
// QuantityMultiplier is the factor that multiplies the quantity of the previous layer
QuantityMultiplier fixedpoint . Value ` json:"quantityMultiplier" `
// QuantityScale helps user to define the quantity by layer scale
QuantityScale * bbgo . LayerScale ` json:"quantityScale,omitempty" `
2021-05-11 04:47:45 +00:00
// MaxExposurePosition defines the unhedged quantity of stop
2021-04-04 03:14:09 +00:00
MaxExposurePosition fixedpoint . Value ` json:"maxExposurePosition" `
2021-05-11 04:47:45 +00:00
DisableHedge bool ` json:"disableHedge" `
2021-03-21 02:44:06 +00:00
NumLayers int ` json:"numLayers" `
2021-05-11 04:47:45 +00:00
// Pips is the pips of the layer prices
Pips int ` json:"pips" `
// --------------------------------
// private field
2021-03-21 02:44:06 +00:00
makerSession * bbgo . ExchangeSession
sourceSession * bbgo . ExchangeSession
sourceMarket types . Market
makerMarket types . Market
state * State
book * types . StreamOrderBook
activeMakerOrders * bbgo . LocalActiveOrderBook
orderStore * bbgo . OrderStore
lastPrice float64
2021-03-22 09:27:07 +00:00
groupID uint32
2021-03-21 02:44:06 +00:00
stopC chan struct { }
}
2021-05-09 12:03:06 +00:00
func ( s * Strategy ) ID ( ) string {
return ID
}
2021-03-21 02:44:06 +00:00
func ( s * Strategy ) CrossSubscribe ( sessions map [ string ] * bbgo . ExchangeSession ) {
sourceSession , ok := sessions [ s . SourceExchange ]
if ! ok {
2021-03-21 04:43:41 +00:00
panic ( fmt . Errorf ( "source session %s is not defined" , s . SourceExchange ) )
2021-03-21 02:44:06 +00:00
}
sourceSession . Subscribe ( types . BookChannel , s . Symbol , types . SubscribeOptions { } )
2021-03-21 04:43:41 +00:00
makerSession , ok := sessions [ s . MakerExchange ]
if ! ok {
panic ( fmt . Errorf ( "maker session %s is not defined" , s . MakerExchange ) )
}
makerSession . Subscribe ( types . KLineChannel , s . Symbol , types . SubscribeOptions { Interval : "1m" } )
2021-03-21 02:44:06 +00:00
}
2021-05-10 16:58:11 +00:00
func aggregatePrice ( pvs types . PriceVolumeSlice , requiredQuantity fixedpoint . Value ) ( price fixedpoint . Value ) {
q := requiredQuantity
totalAmount := fixedpoint . Value ( 0 )
if len ( pvs ) == 0 {
price = 0
return price
} else if pvs [ 0 ] . Volume >= requiredQuantity {
return pvs [ 0 ] . Price
}
for i := 0 ; i < len ( pvs ) ; i ++ {
pv := pvs [ i ]
if pv . Volume >= q {
totalAmount += q . Mul ( pv . Price )
break
}
2021-05-10 16:10:49 +00:00
2021-05-10 16:58:11 +00:00
q -= pv . Volume
totalAmount += pv . Volume . Mul ( pv . Price )
}
price = totalAmount . Div ( requiredQuantity )
return price
2021-05-10 16:10:49 +00:00
}
2021-05-12 10:58:20 +00:00
func ( s * Strategy ) updateQuote ( ctx context . Context , orderExecutionRouter bbgo . OrderExecutionRouter ) {
2021-03-21 02:44:06 +00:00
if err := s . makerSession . Exchange . CancelOrders ( ctx , s . activeMakerOrders . Orders ( ) ... ) ; err != nil {
2021-05-10 16:10:49 +00:00
log . WithError ( err ) . Errorf ( "can not cancel %s orders" , s . Symbol )
2021-03-21 02:44:06 +00:00
return
}
2021-05-10 16:10:49 +00:00
// avoid unlock issue and wait for the balance update
2021-05-09 10:55:56 +00:00
if s . OrderCancelWaitTime > 0 {
time . Sleep ( s . OrderCancelWaitTime . Duration ( ) )
} else {
// use the default wait time
time . Sleep ( 500 * time . Millisecond )
}
2021-05-09 10:32:05 +00:00
if s . activeMakerOrders . NumOfAsks ( ) > 0 || s . activeMakerOrders . NumOfBids ( ) > 0 {
2021-05-09 10:55:56 +00:00
log . Warnf ( "there are some %s orders not canceled, skipping placing maker orders" , s . Symbol )
2021-05-09 12:03:06 +00:00
s . activeMakerOrders . Print ( )
2021-05-09 10:32:05 +00:00
return
}
2021-03-21 02:44:06 +00:00
sourceBook := s . book . Get ( )
if len ( sourceBook . Bids ) == 0 || len ( sourceBook . Asks ) == 0 {
return
}
if valid , err := sourceBook . IsValid ( ) ; ! valid {
2021-05-10 16:10:49 +00:00
log . WithError ( err ) . Errorf ( "%s invalid order book, skip quoting: %v" , s . Symbol , err )
2021-03-21 02:44:06 +00:00
return
}
2021-03-21 03:16:15 +00:00
var disableMakerBid = false
var disableMakerAsk = false
2021-03-21 02:44:06 +00:00
2021-05-10 16:10:49 +00:00
// check maker's balance quota
// we load the balances from the account while we're generating the orders,
2021-03-21 03:16:15 +00:00
// the balance may have a chance to be deducted by other strategies or manual orders submitted by the user
makerBalances := s . makerSession . Account . Balances ( )
2021-03-21 02:44:06 +00:00
makerQuota := & bbgo . QuotaTransaction { }
2021-03-21 03:16:15 +00:00
if b , ok := makerBalances [ s . makerMarket . BaseCurrency ] ; ok {
2021-05-10 16:10:49 +00:00
if b . Available . Float64 ( ) > s . makerMarket . MinQuantity {
makerQuota . BaseAsset . Add ( b . Available )
} else {
2021-03-21 04:43:41 +00:00
disableMakerAsk = true
}
2021-03-21 02:44:06 +00:00
}
2021-03-21 04:43:41 +00:00
2021-03-21 03:16:15 +00:00
if b , ok := makerBalances [ s . makerMarket . QuoteCurrency ] ; ok {
2021-05-10 16:10:49 +00:00
if b . Available . Float64 ( ) > s . makerMarket . MinNotional {
makerQuota . QuoteAsset . Add ( b . Available )
} else {
2021-04-04 03:14:09 +00:00
disableMakerBid = true
}
}
2021-03-21 02:44:06 +00:00
hedgeBalances := s . sourceSession . Account . Balances ( )
hedgeQuota := & bbgo . QuotaTransaction { }
if b , ok := hedgeBalances [ s . sourceMarket . BaseCurrency ] ; ok {
2021-03-21 04:43:41 +00:00
// to make bid orders, we need enough base asset in the foreign exchange,
2021-03-21 03:16:15 +00:00
// if the base asset balance is not enough for selling
2021-05-11 04:47:45 +00:00
if s . StopHedgeBaseBalance > 0 && b . Available > ( s . StopHedgeBaseBalance + fixedpoint . NewFromFloat ( s . sourceMarket . MinQuantity ) ) {
hedgeQuota . BaseAsset . Add ( b . Available - s . StopHedgeBaseBalance - fixedpoint . NewFromFloat ( s . sourceMarket . MinQuantity ) )
} else if b . Available . Float64 ( ) > s . sourceMarket . MinQuantity {
2021-05-10 16:10:49 +00:00
hedgeQuota . BaseAsset . Add ( b . Available )
} else {
2021-05-11 04:53:32 +00:00
log . Warnf ( "%s maker bid disabled: insufficient base balance %s" , s . Symbol , b . String ( ) )
2021-03-21 03:16:15 +00:00
disableMakerBid = true
}
2021-03-21 02:44:06 +00:00
}
2021-03-21 03:16:15 +00:00
2021-03-21 02:44:06 +00:00
if b , ok := hedgeBalances [ s . sourceMarket . QuoteCurrency ] ; ok {
2021-03-21 04:43:41 +00:00
// to make ask orders, we need enough quote asset in the foreign exchange,
2021-03-21 03:16:15 +00:00
// if the quote asset balance is not enough for buying
2021-05-11 04:47:45 +00:00
if s . StopHedgeQuoteBalance > 0 && b . Available > ( s . StopHedgeQuoteBalance + fixedpoint . NewFromFloat ( s . sourceMarket . MinNotional ) ) {
hedgeQuota . QuoteAsset . Add ( b . Available - s . StopHedgeQuoteBalance - fixedpoint . NewFromFloat ( s . sourceMarket . MinNotional ) )
} else if b . Available . Float64 ( ) > s . sourceMarket . MinNotional {
2021-05-10 16:10:49 +00:00
hedgeQuota . QuoteAsset . Add ( b . Available )
} else {
2021-05-11 04:53:32 +00:00
log . Warnf ( "%s maker ask disabled: insufficient quote balance %s" , s . Symbol , b . String ( ) )
2021-03-21 03:16:15 +00:00
disableMakerAsk = true
}
}
2021-03-21 02:44:06 +00:00
2021-05-10 16:10:49 +00:00
// if max exposure position is configured, we should not:
// 1. place bid orders when we already bought too much
// 2. place ask orders when we already sold too much
if s . MaxExposurePosition > 0 {
pos := s . state . HedgePosition . AtomicLoad ( )
if pos < - s . MaxExposurePosition {
// stop sell if we over-sell
disableMakerAsk = true
} else if pos > s . MaxExposurePosition {
// stop buy if we over buy
disableMakerBid = true
}
}
2021-03-21 04:43:41 +00:00
if disableMakerAsk && disableMakerBid {
2021-05-11 07:57:44 +00:00
log . Warn ( "bid/ask maker is disabled due to insufficient balances" )
2021-03-21 04:43:41 +00:00
return
}
2021-05-10 16:10:49 +00:00
bestBidPrice := sourceBook . Bids [ 0 ] . Price
bestAskPrice := sourceBook . Asks [ 0 ] . Price
log . Infof ( "%s best bid price %f, best ask price: %f" , s . Symbol , bestBidPrice . Float64 ( ) , bestAskPrice . Float64 ( ) )
var submitOrders [ ] types . SubmitOrder
2021-05-10 16:58:11 +00:00
var accumulativeBidQuantity , accumulativeAskQuantity fixedpoint . Value
2021-05-10 16:10:49 +00:00
var bidQuantity = s . Quantity
var askQuantity = s . Quantity
2021-03-21 02:44:06 +00:00
for i := 0 ; i < s . NumLayers ; i ++ {
2021-03-21 03:16:15 +00:00
// for maker bid orders
if ! disableMakerBid {
2021-05-09 18:52:41 +00:00
if s . QuantityScale != nil {
qf , err := s . QuantityScale . Scale ( i + 1 )
if err != nil {
log . WithError ( err ) . Errorf ( "quantityScale error" )
return
}
log . Infof ( "scaling quantity to %f by layer: %d" , qf , i + 1 )
// override the default bid quantity
bidQuantity = fixedpoint . NewFromFloat ( qf )
}
2021-05-10 16:58:11 +00:00
accumulativeBidQuantity += bidQuantity
bidPrice := aggregatePrice ( sourceBook . Bids , accumulativeBidQuantity )
bidPrice = bidPrice . MulFloat64 ( 1.0 - s . BidMargin . Float64 ( ) )
2021-05-10 17:06:39 +00:00
if i > 0 && s . Pips > 0 {
bidPrice -= fixedpoint . NewFromFloat ( s . makerMarket . TickSize * float64 ( s . Pips ) )
}
2021-05-10 16:58:11 +00:00
2021-03-21 03:16:15 +00:00
if makerQuota . QuoteAsset . Lock ( bidQuantity . Mul ( bidPrice ) ) && hedgeQuota . BaseAsset . Lock ( bidQuantity ) {
// if we bought, then we need to sell the base from the hedge session
submitOrders = append ( submitOrders , types . SubmitOrder {
Symbol : s . Symbol ,
Type : types . OrderTypeLimit ,
Side : types . SideTypeBuy ,
Price : bidPrice . Float64 ( ) ,
Quantity : bidQuantity . Float64 ( ) ,
TimeInForce : "GTC" ,
GroupID : s . groupID ,
} )
makerQuota . Commit ( )
hedgeQuota . Commit ( )
} else {
makerQuota . Rollback ( )
hedgeQuota . Rollback ( )
}
2021-05-09 18:52:41 +00:00
if s . QuantityMultiplier > 0 {
bidQuantity = bidQuantity . Mul ( s . QuantityMultiplier )
}
2021-03-21 02:44:06 +00:00
}
2021-03-21 03:16:15 +00:00
// for maker ask orders
if ! disableMakerAsk {
2021-05-09 18:52:41 +00:00
if s . QuantityScale != nil {
qf , err := s . QuantityScale . Scale ( i + 1 )
if err != nil {
log . WithError ( err ) . Errorf ( "quantityScale error" )
return
}
// override the default bid quantity
askQuantity = fixedpoint . NewFromFloat ( qf )
}
2021-05-10 16:58:11 +00:00
accumulativeAskQuantity += askQuantity
2021-05-13 01:55:53 +00:00
askPrice := aggregatePrice ( sourceBook . Asks , accumulativeAskQuantity )
2021-05-10 16:58:11 +00:00
askPrice = askPrice . MulFloat64 ( 1.0 + s . AskMargin . Float64 ( ) )
2021-05-10 17:06:39 +00:00
if i > 0 && s . Pips > 0 {
askPrice += fixedpoint . NewFromFloat ( s . makerMarket . TickSize * float64 ( s . Pips ) )
}
2021-05-09 18:52:41 +00:00
2021-03-21 03:16:15 +00:00
if makerQuota . BaseAsset . Lock ( askQuantity ) && hedgeQuota . QuoteAsset . Lock ( askQuantity . Mul ( askPrice ) ) {
// if we bought, then we need to sell the base from the hedge session
submitOrders = append ( submitOrders , types . SubmitOrder {
Symbol : s . Symbol ,
Type : types . OrderTypeLimit ,
Side : types . SideTypeSell ,
Price : askPrice . Float64 ( ) ,
Quantity : askQuantity . Float64 ( ) ,
TimeInForce : "GTC" ,
GroupID : s . groupID ,
} )
makerQuota . Commit ( )
hedgeQuota . Commit ( )
} else {
makerQuota . Rollback ( )
hedgeQuota . Rollback ( )
}
2021-05-09 18:52:41 +00:00
if s . QuantityMultiplier > 0 {
askQuantity = askQuantity . Mul ( s . QuantityMultiplier )
}
2021-03-21 02:44:06 +00:00
}
}
if len ( submitOrders ) == 0 {
return
}
2021-05-12 10:58:20 +00:00
makerOrders , err := orderExecutionRouter . SubmitOrdersTo ( ctx , s . MakerExchange , submitOrders ... )
2021-03-21 02:44:06 +00:00
if err != nil {
log . WithError ( err ) . Errorf ( "order error: %s" , err . Error ( ) )
return
}
s . activeMakerOrders . Add ( makerOrders ... )
s . orderStore . Add ( makerOrders ... )
}
func ( s * Strategy ) Hedge ( ctx context . Context , pos fixedpoint . Value ) {
side := types . SideTypeBuy
if pos == 0 {
return
}
quantity := pos
if pos < 0 {
side = types . SideTypeSell
2021-05-10 15:49:25 +00:00
// quantity must be a positive number
2021-03-21 02:44:06 +00:00
quantity = - pos
}
lastPrice := s . lastPrice
sourceBook := s . book . Get ( )
switch side {
case types . SideTypeBuy :
if len ( sourceBook . Asks ) > 0 {
if pv , ok := sourceBook . Asks . First ( ) ; ok {
lastPrice = pv . Price . Float64 ( )
}
}
case types . SideTypeSell :
if len ( sourceBook . Bids ) > 0 {
if pv , ok := sourceBook . Bids . First ( ) ; ok {
lastPrice = pv . Price . Float64 ( )
}
}
}
notional := quantity . MulFloat64 ( lastPrice )
if notional . Float64 ( ) <= s . sourceMarket . MinNotional {
2021-05-10 15:52:17 +00:00
log . Warnf ( "%s %f less than min notional, skipping" , s . Symbol , notional . Float64 ( ) )
2021-03-21 02:44:06 +00:00
return
}
2021-05-10 12:13:23 +00:00
// adjust quantity according to the balances
account := s . sourceSession . Account
switch side {
case types . SideTypeBuy :
// check quote quantity
2021-05-10 12:22:33 +00:00
if quote , ok := account . Balance ( s . sourceMarket . QuoteCurrency ) ; ok {
2021-05-10 12:13:23 +00:00
if quote . Available < notional {
2021-05-10 16:10:49 +00:00
// qf := bbgo.AdjustQuantityByMaxAmount(quantity.Float64(), lastPrice, quote.Available.Float64())
// quantity = fixedpoint.NewFromFloat(qf)
2021-05-10 12:13:23 +00:00
}
}
case types . SideTypeSell :
// check quote quantity
2021-05-10 12:22:33 +00:00
if base , ok := account . Balance ( s . sourceMarket . BaseCurrency ) ; ok {
2021-05-10 12:13:23 +00:00
if base . Available < quantity {
quantity = base . Available
}
}
}
2021-05-09 13:14:51 +00:00
s . Notifiability . Notify ( "Submitting hedge order: %s %s %f" , s . Symbol , side , quantity . Float64 ( ) )
2021-03-21 02:44:06 +00:00
orderExecutor := & bbgo . ExchangeOrderExecutor { Session : s . sourceSession }
returnOrders , err := orderExecutor . SubmitOrders ( ctx , types . SubmitOrder {
Symbol : s . Symbol ,
Type : types . OrderTypeMarket ,
Side : side ,
Quantity : quantity . Float64 ( ) ,
} )
if err != nil {
log . WithError ( err ) . Errorf ( "market order submit error: %s" , err . Error ( ) )
return
}
s . orderStore . Add ( returnOrders ... )
}
func ( s * Strategy ) handleTradeUpdate ( trade types . Trade ) {
log . Infof ( "received trade %+v" , trade )
if trade . Symbol != s . Symbol {
return
}
if ! s . orderStore . Exists ( trade . OrderID ) {
return
}
q := fixedpoint . NewFromFloat ( trade . Quantity )
switch trade . Side {
case types . SideTypeSell :
q = - q
case types . SideTypeBuy :
case types . SideTypeSelf :
// ignore self trades
2021-05-11 07:56:46 +00:00
log . Warnf ( "ignore self trade" )
return
2021-03-21 02:44:06 +00:00
default :
log . Infof ( "ignore non sell/buy side trades, got: %v" , trade . Side )
return
}
2021-05-09 13:14:51 +00:00
log . Infof ( "identified %s trade %d with an existing order: %d" , trade . Symbol , trade . ID , trade . OrderID )
2021-05-09 10:46:09 +00:00
2021-05-09 15:56:54 +00:00
s . state . HedgePosition . AtomicAdd ( q )
s . state . AccumulatedVolume . AtomicAdd ( fixedpoint . NewFromFloat ( trade . Quantity ) )
2021-05-09 13:14:51 +00:00
if profit , madeProfit := s . state . Position . AddTrade ( trade ) ; madeProfit {
2021-05-10 12:22:33 +00:00
s . state . AccumulatedPnL . AtomicAdd ( profit )
if profit < 0 {
s . state . AccumulatedLoss . AtomicAdd ( profit )
} else if profit > 0 {
s . state . AccumulatedProfit . AtomicAdd ( profit )
}
var since time . Time
if s . state . AccumulatedSince > 0 {
2021-05-10 15:27:08 +00:00
since = time . Unix ( s . state . AccumulatedSince , 0 ) . In ( localTimeZone )
2021-05-10 12:22:33 +00:00
}
2021-05-09 15:56:54 +00:00
2021-05-10 16:58:11 +00:00
s . Notify ( "%s trade just made profit %f %s, since %s accumulated net profit %f %s, accumulated loss %f %s" , s . Symbol ,
2021-05-09 15:56:54 +00:00
profit . Float64 ( ) , s . state . Position . QuoteCurrency ,
2021-05-10 16:58:11 +00:00
since . Format ( time . RFC822 ) ,
2021-05-10 12:22:33 +00:00
s . state . AccumulatedPnL . Float64 ( ) , s . state . Position . QuoteCurrency ,
2021-05-10 16:58:11 +00:00
s . state . AccumulatedLoss . Float64 ( ) , s . state . Position . QuoteCurrency )
2021-05-10 12:22:33 +00:00
2021-05-09 10:46:09 +00:00
} else {
2021-05-09 13:14:51 +00:00
s . Notify ( "%s trade modified the position: average cost = %f %s, base = %f" , s . Symbol , s . state . Position . AverageCost . Float64 ( ) , s . state . Position . QuoteCurrency , s . state . Position . Base . Float64 ( ) )
2021-05-09 10:46:09 +00:00
}
2021-03-21 02:44:06 +00:00
s . lastPrice = trade . Price
}
2021-05-09 18:52:41 +00:00
func ( s * Strategy ) Validate ( ) error {
if s . Quantity == 0 || s . QuantityScale == nil {
return errors . New ( "quantity or quantityScale can not be empty" )
}
if s . QuantityMultiplier != 0 && s . QuantityMultiplier < 0 {
return errors . New ( "quantityMultiplier can not be a negative number" )
}
if len ( s . Symbol ) == 0 {
return errors . New ( "symbol is required" )
}
return nil
}
2021-05-12 10:58:20 +00:00
func ( s * Strategy ) CrossRun ( ctx context . Context , orderExecutionRouter bbgo . OrderExecutionRouter , sessions map [ string ] * bbgo . ExchangeSession ) error {
2021-03-21 02:44:06 +00:00
// configure default values
if s . UpdateInterval == 0 {
s . UpdateInterval = types . Duration ( time . Second )
}
if s . HedgeInterval == 0 {
s . HedgeInterval = types . Duration ( 10 * time . Second )
}
if s . NumLayers == 0 {
s . NumLayers = 1
}
if s . BidMargin == 0 {
if s . Margin != 0 {
s . BidMargin = s . Margin
} else {
s . BidMargin = defaultMargin
}
}
if s . AskMargin == 0 {
if s . Margin != 0 {
s . AskMargin = s . Margin
} else {
s . AskMargin = defaultMargin
}
}
// configure sessions
sourceSession , ok := sessions [ s . SourceExchange ]
if ! ok {
return fmt . Errorf ( "source exchange session %s is not defined" , s . SourceExchange )
}
s . sourceSession = sourceSession
makerSession , ok := sessions [ s . MakerExchange ]
if ! ok {
return fmt . Errorf ( "maker exchange session %s is not defined" , s . MakerExchange )
}
s . makerSession = makerSession
s . sourceMarket , ok = s . sourceSession . Market ( s . Symbol )
if ! ok {
return fmt . Errorf ( "source session market %s is not defined" , s . Symbol )
}
s . makerMarket , ok = s . makerSession . Market ( s . Symbol )
if ! ok {
return fmt . Errorf ( "maker session market %s is not defined" , s . Symbol )
}
// restore state
2021-03-21 04:43:41 +00:00
instanceID := fmt . Sprintf ( "%s-%s" , ID , s . Symbol )
2021-03-22 09:27:07 +00:00
s . groupID = max . GenerateGroupID ( instanceID )
2021-03-21 02:44:06 +00:00
log . Infof ( "using group id %d from fnv(%s)" , s . groupID , instanceID )
var state State
// load position
2021-03-25 05:18:38 +00:00
if err := s . Persistence . Load ( & state , ID , s . Symbol , stateKey ) ; err != nil {
2021-03-21 02:44:06 +00:00
if err != service . ErrPersistenceNotExists {
return err
}
s . state = & State { }
} else {
// loaded successfully
s . state = & state
log . Infof ( "state is restored: %+v" , s . state )
2021-05-09 13:14:51 +00:00
s . Notify ( "%s position is restored => %f" , s . Symbol , s . state . HedgePosition . Float64 ( ) )
2021-03-21 02:44:06 +00:00
}
2021-05-09 10:46:09 +00:00
// if position is nil, we need to allocate a new position for calculation
if s . state . Position == nil {
s . state . Position = & bbgo . Position {
2021-05-09 13:14:51 +00:00
Symbol : s . Symbol ,
BaseCurrency : s . makerMarket . BaseCurrency ,
2021-05-09 10:46:09 +00:00
QuoteCurrency : s . makerMarket . QuoteCurrency ,
}
}
2021-05-10 12:22:33 +00:00
if s . state . AccumulatedSince == 0 {
s . state . AccumulatedSince = time . Now ( ) . Unix ( )
}
2021-03-21 02:44:06 +00:00
s . book = types . NewStreamBook ( s . Symbol )
s . book . BindStream ( s . sourceSession . Stream )
s . sourceSession . Stream . OnTradeUpdate ( s . handleTradeUpdate )
s . makerSession . Stream . OnTradeUpdate ( s . handleTradeUpdate )
s . activeMakerOrders = bbgo . NewLocalActiveOrderBook ( )
s . activeMakerOrders . BindStream ( s . makerSession . Stream )
s . orderStore = bbgo . NewOrderStore ( s . Symbol )
s . orderStore . BindStream ( s . sourceSession . Stream )
s . orderStore . BindStream ( s . makerSession . Stream )
s . stopC = make ( chan struct { } )
go func ( ) {
2021-05-09 12:03:06 +00:00
posTicker := time . NewTicker ( durationJitter ( s . HedgeInterval . Duration ( ) , 200 ) )
2021-03-21 02:44:06 +00:00
defer posTicker . Stop ( )
2021-05-09 12:03:06 +00:00
quoteTicker := time . NewTicker ( durationJitter ( s . UpdateInterval . Duration ( ) , 200 ) )
defer quoteTicker . Stop ( )
2021-05-10 05:18:57 +00:00
defer func ( ) {
if err := s . makerSession . Exchange . CancelOrders ( context . Background ( ) , s . activeMakerOrders . Orders ( ) ... ) ; err != nil {
log . WithError ( err ) . Errorf ( "can not cancel %s orders" , s . Symbol )
}
} ( )
2021-03-21 02:44:06 +00:00
for {
select {
case <- s . stopC :
2021-05-09 18:17:19 +00:00
log . Warnf ( "%s maker goroutine stopped, due to the stop signal" , s . Symbol )
2021-03-21 02:44:06 +00:00
return
case <- ctx . Done ( ) :
2021-05-09 18:17:19 +00:00
log . Warnf ( "%s maker goroutine stopped, due to the cancelled context" , s . Symbol )
2021-03-21 02:44:06 +00:00
return
2021-05-09 12:03:06 +00:00
case <- quoteTicker . C :
2021-05-12 10:58:20 +00:00
s . updateQuote ( ctx , orderExecutionRouter )
2021-03-21 02:44:06 +00:00
case <- posTicker . C :
position := s . state . HedgePosition . AtomicLoad ( )
abspos := math . Abs ( position . Float64 ( ) )
if ! s . DisableHedge && abspos > s . sourceMarket . MinQuantity {
s . Hedge ( ctx , - position )
}
}
}
} ( )
s . Graceful . OnShutdown ( func ( ctx context . Context , wg * sync . WaitGroup ) {
defer wg . Done ( )
close ( s . stopC )
2021-05-09 18:17:19 +00:00
time . Sleep ( s . UpdateInterval . Duration ( ) )
2021-05-09 11:04:44 +00:00
2021-05-14 03:53:07 +00:00
for s . activeMakerOrders . NumOfOrders ( ) > 0 {
2021-05-09 10:48:25 +00:00
orders := s . activeMakerOrders . Orders ( )
2021-05-14 03:53:07 +00:00
log . Warnf ( "%d orders are not cancelled yet:" , len ( orders ) )
2021-05-09 12:03:06 +00:00
s . activeMakerOrders . Print ( )
2021-05-10 05:18:57 +00:00
if err := s . makerSession . Exchange . CancelOrders ( ctx , s . activeMakerOrders . Orders ( ) ... ) ; err != nil {
log . WithError ( err ) . Errorf ( "can not cancel %s orders" , s . Symbol )
}
log . Warnf ( "waiting for orders to be cancelled..." )
time . Sleep ( 3 * time . Second )
2021-05-09 10:48:25 +00:00
}
2021-05-14 03:53:07 +00:00
log . Info ( "all orders are cancelled successfully" )
2021-05-09 10:48:25 +00:00
2021-03-25 05:18:38 +00:00
if err := s . Persistence . Save ( s . state , ID , s . Symbol , stateKey ) ; err != nil {
2021-03-21 02:44:06 +00:00
log . WithError ( err ) . Errorf ( "can not save state: %+v" , s . state )
} else {
log . Infof ( "state is saved => %+v" , s . state )
2021-05-09 18:17:19 +00:00
s . Notify ( "%s position is saved: position = %f" , s . Symbol , s . state . HedgePosition . Float64 ( ) )
2021-03-21 02:44:06 +00:00
}
} )
return nil
}
2021-05-09 12:03:06 +00:00
func durationJitter ( d time . Duration , jitterInMilliseconds int ) time . Duration {
n := rand . Intn ( jitterInMilliseconds )
2021-05-09 13:14:51 +00:00
return d + time . Duration ( n ) * time . Millisecond
2021-05-09 12:03:06 +00:00
}