bbgo/pkg/strategy/tri/strategy.go

966 lines
24 KiB
Go

package tri
import (
"context"
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"git.qtrade.icu/lychiyu/bbgo/pkg/bbgo"
"git.qtrade.icu/lychiyu/bbgo/pkg/core"
"git.qtrade.icu/lychiyu/bbgo/pkg/exchange/retry"
"git.qtrade.icu/lychiyu/bbgo/pkg/fixedpoint"
"git.qtrade.icu/lychiyu/bbgo/pkg/sigchan"
"git.qtrade.icu/lychiyu/bbgo/pkg/style"
"git.qtrade.icu/lychiyu/bbgo/pkg/types"
"git.qtrade.icu/lychiyu/bbgo/pkg/util"
)
//go:generate bash symbols.sh
const ID = "tri"
var log = logrus.WithField("strategy", ID)
var one = fixedpoint.One
var marketOrderProtectiveRatio = fixedpoint.NewFromFloat(0.008)
var balanceBufferRatio = fixedpoint.NewFromFloat(0.002)
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
type Side int
const Buy Side = 1
const Sell Side = -1
func (s Side) String() string {
return s.SideType().String()
}
func (s Side) SideType() types.SideType {
if s == 1 {
return types.SideTypeBuy
}
return types.SideTypeSell
}
type PathRank struct {
Path *Path
Ratio float64
}
// backward buy -> buy -> sell
func calculateBackwardRate(p *Path) float64 {
var ratio = 1.0
ratio *= p.marketA.calculateRatio(-p.dirA)
ratio *= p.marketB.calculateRatio(-p.dirB)
ratio *= p.marketC.calculateRatio(-p.dirC)
return ratio
}
// calculateForwardRatio
// path: BTCUSDT (0.000044 / 22830.410000) => USDTTWD (0.033220 / 30.101000) => BTCTWD (0.000001 / 687500.000000) <= -> 0.9995899221105569 <- 1.0000373943873788
// 1.0 * 22830 * 30.101000 / 687500.000
// BTCUSDT (0.000044 / 22856.910000) => USDTTWD (0.033217 / 30.104000) => BTCTWD (0.000001 / 688002.100000)
// sell -> rate * 22856
// sell -> rate * 30.104
// buy -> rate / 688002.1
// 1.0000798312
func calculateForwardRatio(p *Path) float64 {
var ratio = 1.0
ratio *= p.marketA.calculateRatio(p.dirA)
ratio *= p.marketB.calculateRatio(p.dirB)
ratio *= p.marketC.calculateRatio(p.dirC)
return ratio
}
func adjustOrderQuantityByRate(orders [3]types.SubmitOrder, rate float64) [3]types.SubmitOrder {
if rate == 1.0 || math.IsNaN(rate) {
return orders
}
for i, o := range orders {
orders[i].Quantity = o.Quantity.Mul(fixedpoint.NewFromFloat(rate))
}
return orders
}
type State struct {
IOCWinTimes int `json:"iocWinningTimes"`
IOCLossTimes int `json:"iocLossTimes"`
IOCWinningRatio float64 `json:"iocWinningRatio"`
}
type Strategy struct {
*bbgo.Environment
Symbols []string `json:"symbols"`
Paths [][]string `json:"paths"`
MinSpreadRatio fixedpoint.Value `json:"minSpreadRatio"`
SeparateStream bool `json:"separateStream"`
Limits map[string]fixedpoint.Value `json:"limits"`
CoolingDownTime types.Duration `json:"coolingDownTime"`
NotifyTrade bool `json:"notifyTrade"`
ResetPosition bool `json:"resetPosition"`
MarketOrderProtectiveRatio fixedpoint.Value `json:"marketOrderProtectiveRatio"`
IocOrderRatio fixedpoint.Value `json:"iocOrderRatio"`
DryRun bool `json:"dryRun"`
markets map[string]types.Market
arbMarkets map[string]*ArbMarket
paths []*Path
session *bbgo.ExchangeSession
activeOrders *bbgo.ActiveOrderBook
orderStore *core.OrderStore
tradeCollector *core.TradeCollector
Position *MultiCurrencyPosition `persistence:"position"`
State *State `persistence:"state"`
TradeState *types.TradeStats `persistence:"trade_stats"`
sigC sigchan.Chan
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) InstanceID() string {
return ID + strings.Join(s.Symbols, "-")
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
if !s.SeparateStream {
for _, symbol := range s.Symbols {
session.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelFull,
})
}
}
}
func (s *Strategy) executeOrder(ctx context.Context, order types.SubmitOrder) *types.Order {
waitTime := 100 * time.Millisecond
for maxTries := 100; maxTries >= 0; maxTries-- {
createdOrder, err := s.session.Exchange.SubmitOrder(ctx, order)
if err != nil {
log.WithError(err).Errorf("can not submit orders")
time.Sleep(waitTime)
waitTime *= 2
continue
}
s.orderStore.Add(*createdOrder)
s.activeOrders.Add(*createdOrder)
return createdOrder
}
return nil
}
func (s *Strategy) Defaults() error {
if s.TradeState == nil {
s.TradeState = types.NewTradeStats("")
}
if len(s.Symbols) == 0 {
s.Symbols = collectSymbols(s.Paths)
}
return nil
}
func (s *Strategy) Initialize() error {
return nil
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
s.Symbols = compileSymbols(s.Symbols)
if s.MarketOrderProtectiveRatio.IsZero() {
s.MarketOrderProtectiveRatio = marketOrderProtectiveRatio
}
if s.MinSpreadRatio.IsZero() {
s.MinSpreadRatio = fixedpoint.NewFromFloat(1.002)
}
if s.State == nil {
s.State = &State{}
}
s.markets = make(map[string]types.Market)
s.sigC = sigchan.New(10)
s.session = session
s.orderStore = core.NewOrderStore("")
s.orderStore.AddOrderUpdate = true
s.orderStore.BindStream(session.UserDataStream)
s.activeOrders = bbgo.NewActiveOrderBook("")
s.activeOrders.BindStream(session.UserDataStream)
s.tradeCollector = core.NewTradeCollector("", nil, s.orderStore)
for _, symbol := range s.Symbols {
market, ok := session.Market(symbol)
if !ok {
return fmt.Errorf("market not found: %s", symbol)
}
s.markets[symbol] = market
}
s.optimizeMarketQuantityPrecision()
arbMarkets, err := s.buildArbMarkets(session, s.Symbols, s.SeparateStream, s.sigC)
if err != nil {
return err
}
s.arbMarkets = arbMarkets
if s.Position == nil {
s.Position = NewMultiCurrencyPosition(s.markets)
}
if s.ResetPosition {
s.Position = NewMultiCurrencyPosition(s.markets)
}
s.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
s.Position.handleTrade(trade)
})
if s.NotifyTrade {
s.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
bbgo.Notify(trade)
})
}
s.tradeCollector.BindStream(session.UserDataStream)
for _, market := range s.arbMarkets {
m := market
if s.SeparateStream {
log.Infof("connecting %s market stream...", m.Symbol)
if err := m.stream.Connect(ctx); err != nil {
return err
}
}
}
// build paths
// rate update and check paths
for _, pathSymbols := range s.Paths {
if len(pathSymbols) != 3 {
return errors.New("a path must contains 3 symbols")
}
p := &Path{
marketA: s.arbMarkets[pathSymbols[0]],
marketB: s.arbMarkets[pathSymbols[1]],
marketC: s.arbMarkets[pathSymbols[2]],
}
if p.marketA == nil {
return fmt.Errorf("market object of %s is missing", pathSymbols[0])
}
if p.marketB == nil {
return fmt.Errorf("market object of %s is missing", pathSymbols[1])
}
if p.marketC == nil {
return fmt.Errorf("market object of %s is missing", pathSymbols[2])
}
if err := p.solveDirection(); err != nil {
return err
}
s.paths = append(s.paths, p)
}
session.UserDataStream.OnAuth(func() {
go func() {
fs := []ratioFunction{calculateForwardRatio, calculateBackwardRate}
log.Infof("waiting for market prices ready...")
wait := true
for wait {
wait = false
for _, p := range s.paths {
if !p.Ready() {
wait = true
break
}
}
}
log.Infof("all markets ready")
for {
select {
case <-ctx.Done():
return
case <-s.sigC:
minRatio := s.MinSpreadRatio.Float64()
for side, f := range fs {
ranks := s.calculateRanks(minRatio, f)
if len(ranks) == 0 {
break
}
forward := side == 0
bestRank := ranks[0]
if forward {
debug("%d paths elected, found best forward path %s profit %.5f%%", len(ranks), bestRank.Path, (bestRank.Ratio-1.0)*100.0)
} else {
debug("%d paths elected, found best backward path %s profit %.5f%%", len(ranks), bestRank.Path, (bestRank.Ratio-1.0)*100.0)
}
logMarketPath(bestRank.Path)
s.executePath(ctx, session, bestRank.Path, bestRank.Ratio, forward)
}
}
}
}()
})
return nil
}
type ratioFunction func(p *Path) float64
func (s *Strategy) checkMinimalOrderQuantity(orders [3]types.SubmitOrder) error {
for _, order := range orders {
if order.Quantity.Compare(order.Market.MinQuantity) <= 0 {
return fmt.Errorf("%s order quantity is too small: %f < %f",
order.Symbol,
order.Quantity.Float64(), order.Market.MinQuantity.Float64())
}
if order.Quantity.Mul(order.Price).Compare(order.Market.MinNotional) <= 0 {
return fmt.Errorf("%s order min notional is too small: %f < %f",
order.Symbol,
order.Quantity.Mul(order.Price).Float64(), order.Market.MinNotional.Float64())
}
}
return nil
}
func (s *Strategy) optimizeMarketQuantityPrecision() {
var baseMarkets = make(map[string][]types.Market)
for _, m := range s.markets {
baseMarkets[m.BaseCurrency] = append(baseMarkets[m.BaseCurrency], m)
}
for _, markets := range baseMarkets {
var prec = -1
for _, m := range markets {
if prec == -1 || m.VolumePrecision < prec {
prec = m.VolumePrecision
}
}
if prec == -1 {
continue
}
for _, m := range markets {
m.VolumePrecision = prec
s.markets[m.Symbol] = m
}
}
}
func (s *Strategy) applyBalanceMaxQuantity(balances types.BalanceMap) types.BalanceMap {
if s.Limits == nil || len(s.Limits) == 0 {
return balances
}
for c, b := range balances {
if limit, ok := s.Limits[c]; ok {
b.Available = fixedpoint.Min(b.Available, limit)
balances[c] = b
}
}
return balances
}
func (s *Strategy) addBalanceBuffer(balances types.BalanceMap) (out types.BalanceMap) {
out = types.BalanceMap{}
for c, b := range balances {
ab := b
ab.Available = ab.Available.Mul(one.Sub(balanceBufferRatio))
out[c] = ab
}
return out
}
func (s *Strategy) toProtectiveMarketOrder(order types.SubmitOrder, ratio fixedpoint.Value) types.SubmitOrder {
sellRatio := one.Sub(ratio)
buyRatio := one.Add(ratio)
switch order.Side {
case types.SideTypeSell:
order.Price = order.Price.Mul(sellRatio)
case types.SideTypeBuy:
order.Price = order.Price.Mul(buyRatio)
}
return order
}
func (s *Strategy) toProtectiveMarketOrders(orders [3]types.SubmitOrder, ratio fixedpoint.Value) [3]types.SubmitOrder {
sellRatio := one.Sub(ratio)
buyRatio := one.Add(ratio)
for i, order := range orders {
switch order.Side {
case types.SideTypeSell:
order.Price = order.Price.Mul(sellRatio)
case types.SideTypeBuy:
order.Price = order.Price.Mul(buyRatio)
}
// order.Quantity = order.Market.TruncateQuantity(order.Quantity)
// order.Type = types.OrderTypeMarket
orders[i] = order
}
return orders
}
func (s *Strategy) executePath(ctx context.Context, session *bbgo.ExchangeSession, p *Path, ratio float64, dir bool) {
balances := session.Account.Balances()
balances = s.addBalanceBuffer(balances)
balances = s.applyBalanceMaxQuantity(balances)
var orders [3]types.SubmitOrder
if dir {
orders = p.newOrders(balances, 1)
} else {
orders = p.newOrders(balances, -1)
}
if err := s.checkMinimalOrderQuantity(orders); err != nil {
log.WithError(err).Warnf("order quantity too small, skip")
return
}
if s.DryRun {
logSubmitOrders(orders)
return
}
createdOrders, err := s.iocOrderExecution(ctx, session, orders, ratio)
if err != nil {
log.WithError(err).Errorf("order execute error")
return
}
if len(createdOrders) == 0 {
return
}
log.Info(s.Position.String())
profits := s.Position.CollectProfits()
profitInUSD := fixedpoint.Zero
for _, profit := range profits {
bbgo.Notify(&profit)
log.Info(profit.PlainText())
profitInUSD = profitInUSD.Add(profit.ProfitInUSD)
// FIXME:
// s.TradeState.Add(&profit)
}
notifyUsdPnL(profitInUSD)
log.Info(s.TradeState.BriefString())
bbgo.Sync(ctx, s)
if s.CoolingDownTime > 0 {
log.Infof("cooling down for %s", s.CoolingDownTime.Duration().String())
time.Sleep(s.CoolingDownTime.Duration())
}
}
func notifyUsdPnL(profit fixedpoint.Value) {
var title = "Triangular Sum PnL ~= "
title += style.PnLEmojiSimple(profit) + " "
title += style.PnLSignString(profit) + " USD"
bbgo.Notify(title)
}
func (s *Strategy) iocOrderExecution(
ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64,
) (types.OrderSlice, error) {
service, ok := session.Exchange.(types.ExchangeOrderQueryService)
if !ok {
return nil, errors.New("exchange does not support ExchangeOrderQueryService")
}
var filledQuantity = fixedpoint.Zero
// Change the first order to IOC
orders[0].Type = types.OrderTypeLimit
orders[0].TimeInForce = types.TimeInForceIOC
var originalOrders [3]types.SubmitOrder
originalOrders[0] = orders[0]
originalOrders[1] = orders[1]
originalOrders[2] = orders[2]
logSubmitOrders(orders)
if !s.IocOrderRatio.IsZero() {
orders[0] = s.toProtectiveMarketOrder(orders[0], s.IocOrderRatio)
}
iocOrder := s.executeOrder(ctx, orders[0])
if iocOrder == nil {
return nil, errors.New("ioc order submit error")
}
iocOrderC := make(chan types.Order, 2)
defer func() {
close(iocOrderC)
}()
go func() {
o, err := s.waitWebSocketOrderDone(ctx, iocOrder.OrderID, 300*time.Millisecond)
if err != nil {
// log.WithError(err).Errorf("ioc order wait error")
return
} else if o != nil {
select {
case <-ctx.Done():
return
case iocOrderC <- *o:
default:
}
}
}()
go func() {
o, err := retry.QueryOrderUntilFilled(ctx, service, iocOrder.Symbol, iocOrder.OrderID)
if err != nil {
log.WithError(err).Errorf("ioc order restful wait error")
return
} else if o != nil {
select {
case <-ctx.Done():
return
case iocOrderC <- *o:
default:
}
}
}()
o := <-iocOrderC
filledQuantity = o.ExecutedQuantity
if filledQuantity.IsZero() {
s.State.IOCLossTimes++
// we didn't get filled
log.Infof("%s %s IOC order did not get filled, skip: %+v", o.Symbol, o.Side, o)
return nil, nil
}
filledRatio := filledQuantity.Div(iocOrder.Quantity)
bbgo.Notify("%s %s IOC order got filled %f/%f (%s)", iocOrder.Symbol, iocOrder.Side, filledQuantity.Float64(), iocOrder.Quantity.Float64(), filledRatio.Percentage())
log.Infof("%s %s IOC order got filled %f/%f", iocOrder.Symbol, iocOrder.Side, filledQuantity.Float64(), iocOrder.Quantity.Float64())
orders[1].Quantity = orders[1].Quantity.Mul(filledRatio)
orders[2].Quantity = orders[2].Quantity.Mul(filledRatio)
orders[1].Quantity = orders[1].Market.TruncateQuantity(orders[1].Quantity)
orders[2].Quantity = orders[2].Market.TruncateQuantity(orders[2].Quantity)
if orders[1].Quantity.Compare(orders[1].Market.MinQuantity) <= 0 {
log.Warnf("order #2 quantity %f is less than min quantity %f, skip", orders[1].Quantity.Float64(), orders[1].Market.MinQuantity.Float64())
return nil, nil
}
if orders[2].Quantity.Compare(orders[2].Market.MinQuantity) <= 0 {
log.Warnf("order #3 quantity %f is less than min quantity %f, skip", orders[2].Quantity.Float64(), orders[2].Market.MinQuantity.Float64())
return nil, nil
}
orders[1] = s.toProtectiveMarketOrder(orders[1], s.MarketOrderProtectiveRatio)
orders[2] = s.toProtectiveMarketOrder(orders[2], s.MarketOrderProtectiveRatio)
var orderC = make(chan types.Order, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
o := s.executeOrder(ctx, orders[1])
orderC <- *o
wg.Done()
}()
go func() {
o := s.executeOrder(ctx, orders[2])
orderC <- *o
wg.Done()
}()
wg.Wait()
var createdOrders = make(types.OrderSlice, 3)
createdOrders[0] = *iocOrder
createdOrders[1] = <-orderC
createdOrders[2] = <-orderC
close(orderC)
orderTrades, updatedOrders, err := s.waitOrdersAndCollectTrades(ctx, service, createdOrders)
if err != nil {
log.WithError(err).Errorf("trade collecting error")
} else {
for i, order := range updatedOrders {
trades, hasTrades := orderTrades[order.OrderID]
if !hasTrades {
continue
}
averagePrice := tradeAveragePrice(trades, order.OrderID)
updatedOrders[i].AveragePrice = averagePrice
if market, hasMarket := s.markets[order.Symbol]; hasMarket {
updatedOrders[i].Market = market
}
for _, originalOrder := range originalOrders {
if originalOrder.Symbol == updatedOrders[i].Symbol {
updatedOrders[i].Price = originalOrder.Price
}
}
}
s.analyzeOrders(updatedOrders)
}
// update ioc winning ratio
s.State.IOCWinTimes++
if s.State.IOCLossTimes == 0 {
s.State.IOCWinningRatio = 999.0
} else {
s.State.IOCWinningRatio = float64(s.State.IOCWinTimes) / float64(s.State.IOCLossTimes)
}
log.Infof("ioc winning ratio update: %f", s.State.IOCWinningRatio)
return createdOrders, nil
}
func (s *Strategy) waitWebSocketOrderDone(ctx context.Context, orderID uint64, timeoutDuration time.Duration) (*types.Order, error) {
prof := util.StartTimeProfile("waitWebSocketOrderDone")
defer prof.StopAndLog(log.Infof)
if order, ok := s.orderStore.Get(orderID); ok {
if order.Status == types.OrderStatusFilled || order.Status == types.OrderStatusCanceled {
return &order, nil
}
}
timeoutC := time.After(timeoutDuration)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timeoutC:
return nil, fmt.Errorf("order wait time timeout %s", timeoutDuration)
case order := <-s.orderStore.C:
if orderID == order.OrderID && (order.Status == types.OrderStatusFilled || order.Status == types.OrderStatusCanceled) {
return &order, nil
}
}
}
}
func (s *Strategy) waitOrdersAndCollectTrades(
ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice,
) (map[uint64][]types.Trade, types.OrderSlice, error) {
var err error
var orderTrades = make(map[uint64][]types.Trade)
var updatedOrders types.OrderSlice
for _, o := range createdOrders {
updatedOrder, err2 := waitForOrderFilled(ctx, service, o, time.Second)
if err2 != nil {
err = multierr.Append(err, err2)
continue
}
trades, err3 := service.QueryOrderTrades(ctx, types.OrderQuery{
Symbol: o.Symbol,
OrderID: strconv.FormatUint(o.OrderID, 10),
})
if err3 != nil {
err = multierr.Append(err, err3)
continue
}
for _, t := range trades {
s.tradeCollector.ProcessTrade(t)
}
orderTrades[o.OrderID] = trades
updatedOrders = append(updatedOrders, *updatedOrder)
}
/*
*/
return orderTrades, updatedOrders, nil
}
func (s *Strategy) analyzeOrders(orders types.OrderSlice) {
sort.Slice(orders, func(i, j int) bool {
// o1 < o2 -- earlier first
return orders[i].CreationTime.Before(orders[i].CreationTime.Time())
})
log.Infof("ANALYZING ORDERS (Earlier First)")
for i, o := range orders {
in, inCurrency := o.In()
out, outCurrency := o.Out()
log.Infof("#%d %s IN %f %s -> OUT %f %s", i, o.String(), in.Float64(), inCurrency, out.Float64(), outCurrency)
}
for _, o := range orders {
switch o.Side {
case types.SideTypeSell:
price := o.Price
priceDiff := o.AveragePrice.Sub(price)
slippage := priceDiff.Div(price)
log.Infof("%-8s %-4s %-10s AVG PRICE %f PRICE %f Q %f SLIPPAGE %.3f%%", o.Symbol, o.Side, o.Type, o.AveragePrice.Float64(), price.Float64(), o.Quantity.Float64(), slippage.Float64()*100.0)
case types.SideTypeBuy:
price := o.Price
priceDiff := price.Sub(o.AveragePrice)
slippage := priceDiff.Div(price)
log.Infof("%-8s %-4s %-10s AVG PRICE %f PRICE %f Q %f SLIPPAGE %.3f%%", o.Symbol, o.Side, o.Type, o.AveragePrice.Float64(), price.Float64(), o.Quantity.Float64(), slippage.Float64()*100.0)
}
}
}
func (s *Strategy) buildArbMarkets(
session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan,
) (map[string]*ArbMarket, error) {
markets := make(map[string]*ArbMarket)
// build market object
for _, symbol := range symbols {
market, ok := s.markets[symbol]
if !ok {
return nil, fmt.Errorf("market not found: %s", symbol)
}
m := &ArbMarket{
Symbol: symbol,
market: market,
BaseCurrency: market.BaseCurrency,
QuoteCurrency: market.QuoteCurrency,
sigC: sigC,
truncateBaseQuantity: createBaseQuantityTruncator(market),
truncateQuoteQuantity: createPricePrecisionBasedQuoteQuantityTruncator(market),
}
if separateStream {
stream := session.Exchange.NewStream()
stream.SetPublicOnly()
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelFull,
Speed: types.SpeedHigh,
})
book := types.NewStreamBook(symbol, session.ExchangeName)
priceUpdater := func(_ types.SliceOrderBook) {
bestBid, bestAsk, _ := book.BestBidAndAsk()
if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) {
return
}
m.bestBid = bestBid
m.bestAsk = bestAsk
m.updateRate()
}
book.OnUpdate(priceUpdater)
book.OnSnapshot(priceUpdater)
book.BindStream(stream)
stream.OnDisconnect(func() {
// reset price and volume
m.bestBid = types.PriceVolume{}
m.bestAsk = types.PriceVolume{}
})
m.book = book
m.stream = stream
} else {
book, _ := session.OrderBook(symbol)
priceUpdater := func(_ types.SliceOrderBook) {
bestAsk, bestBid, _ := book.BestBidAndAsk()
if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) {
return
}
m.bestBid = bestBid
m.bestAsk = bestAsk
m.updateRate()
}
book.OnUpdate(priceUpdater)
book.OnSnapshot(priceUpdater)
m.book = book
m.stream = session.MarketDataStream
}
markets[symbol] = m
}
return markets, nil
}
func (s *Strategy) calculateRanks(minRatio float64, method func(p *Path) float64) []PathRank {
ranks := make([]PathRank, 0, len(s.paths))
// ranking paths here
for _, path := range s.paths {
ratio := method(path)
if ratio < minRatio {
continue
}
p := path
ranks = append(ranks, PathRank{Path: p, Ratio: ratio})
}
// sort and pick up the top rank path
sort.Slice(ranks, func(i, j int) bool {
return ranks[i].Ratio > ranks[j].Ratio
})
return ranks
}
func waitForOrderFilled(
ctx context.Context, ex types.ExchangeOrderQueryService, order types.Order, timeout time.Duration,
) (*types.Order, error) {
prof := util.StartTimeProfile("waitForOrderFilled")
defer prof.StopAndLog(log.Infof)
timeoutC := time.After(timeout)
for {
select {
case <-timeoutC:
return nil, fmt.Errorf("order wait timeout %s", timeout)
default:
p := util.StartTimeProfile("queryOrder")
remoteOrder, err2 := ex.QueryOrder(ctx, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})
p.StopAndLog(log.Infof)
if err2 != nil {
log.WithError(err2).Errorf("order query error")
time.Sleep(100 * time.Millisecond)
continue
}
switch remoteOrder.Status {
case types.OrderStatusFilled, types.OrderStatusCanceled:
return remoteOrder, nil
default:
log.Infof("WAITING: %s", remoteOrder.String())
time.Sleep(5 * time.Millisecond)
}
}
}
}
func tradeAveragePrice(trades []types.Trade, orderID uint64) fixedpoint.Value {
totalAmount := fixedpoint.Zero
totalQuantity := fixedpoint.Zero
for _, trade := range trades {
if trade.OrderID != orderID {
continue
}
totalAmount = totalAmount.Add(trade.Price.Mul(trade.Quantity))
totalQuantity = totalQuantity.Add(trade.Quantity)
}
return totalAmount.Div(totalQuantity)
}
func displayBook(id string, market *ArbMarket) {
if !debugMode {
return
}
var s strings.Builder
s.WriteString(id + ") " + market.market.Symbol + "\n")
s.WriteString(fmt.Sprintf("bestAsk: %-12s x %s\n",
market.bestAsk.Price.String(),
market.bestAsk.Volume.String(),
))
s.WriteString(fmt.Sprintf("bestBid: %-12s x %s\n",
market.bestBid.Price.String(),
market.bestBid.Volume.String(),
))
debug(s.String())
}
func logMarketPath(path *Path) {
if !debugMode {
return
}
displayBook("A", path.marketA)
displayBook("B", path.marketB)
displayBook("C", path.marketC)
}
func collectSymbols(paths [][]string) (symbols []string) {
symbolMap := make(map[string]struct{})
for _, path := range paths {
for _, s := range path {
symbolMap[s] = struct{}{}
}
}
for s := range symbolMap {
symbols = append(symbols, s)
}
return symbols
}