mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-15 03:23:52 +00:00
966 lines
24 KiB
Go
966 lines
24 KiB
Go
package tri
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"go.uber.org/multierr"
|
|
|
|
"github.com/c9s/bbgo/pkg/bbgo"
|
|
"github.com/c9s/bbgo/pkg/core"
|
|
"github.com/c9s/bbgo/pkg/exchange/retry"
|
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
|
"github.com/c9s/bbgo/pkg/sigchan"
|
|
"github.com/c9s/bbgo/pkg/style"
|
|
"github.com/c9s/bbgo/pkg/types"
|
|
"github.com/c9s/bbgo/pkg/util"
|
|
)
|
|
|
|
//go:generate bash symbols.sh
|
|
|
|
const ID = "tri"
|
|
|
|
var log = logrus.WithField("strategy", ID)
|
|
|
|
var one = fixedpoint.One
|
|
var marketOrderProtectiveRatio = fixedpoint.NewFromFloat(0.008)
|
|
var balanceBufferRatio = fixedpoint.NewFromFloat(0.002)
|
|
|
|
func init() {
|
|
bbgo.RegisterStrategy(ID, &Strategy{})
|
|
}
|
|
|
|
type Side int
|
|
|
|
const Buy Side = 1
|
|
const Sell Side = -1
|
|
|
|
func (s Side) String() string {
|
|
return s.SideType().String()
|
|
}
|
|
|
|
func (s Side) SideType() types.SideType {
|
|
if s == 1 {
|
|
return types.SideTypeBuy
|
|
}
|
|
|
|
return types.SideTypeSell
|
|
}
|
|
|
|
type PathRank struct {
|
|
Path *Path
|
|
Ratio float64
|
|
}
|
|
|
|
// backward buy -> buy -> sell
|
|
func calculateBackwardRate(p *Path) float64 {
|
|
var ratio = 1.0
|
|
ratio *= p.marketA.calculateRatio(-p.dirA)
|
|
ratio *= p.marketB.calculateRatio(-p.dirB)
|
|
ratio *= p.marketC.calculateRatio(-p.dirC)
|
|
return ratio
|
|
}
|
|
|
|
// calculateForwardRatio
|
|
// path: BTCUSDT (0.000044 / 22830.410000) => USDTTWD (0.033220 / 30.101000) => BTCTWD (0.000001 / 687500.000000) <= -> 0.9995899221105569 <- 1.0000373943873788
|
|
// 1.0 * 22830 * 30.101000 / 687500.000
|
|
|
|
// BTCUSDT (0.000044 / 22856.910000) => USDTTWD (0.033217 / 30.104000) => BTCTWD (0.000001 / 688002.100000)
|
|
// sell -> rate * 22856
|
|
// sell -> rate * 30.104
|
|
// buy -> rate / 688002.1
|
|
// 1.0000798312
|
|
func calculateForwardRatio(p *Path) float64 {
|
|
var ratio = 1.0
|
|
ratio *= p.marketA.calculateRatio(p.dirA)
|
|
ratio *= p.marketB.calculateRatio(p.dirB)
|
|
ratio *= p.marketC.calculateRatio(p.dirC)
|
|
return ratio
|
|
}
|
|
|
|
func adjustOrderQuantityByRate(orders [3]types.SubmitOrder, rate float64) [3]types.SubmitOrder {
|
|
if rate == 1.0 || math.IsNaN(rate) {
|
|
return orders
|
|
}
|
|
|
|
for i, o := range orders {
|
|
orders[i].Quantity = o.Quantity.Mul(fixedpoint.NewFromFloat(rate))
|
|
}
|
|
|
|
return orders
|
|
}
|
|
|
|
type State struct {
|
|
IOCWinTimes int `json:"iocWinningTimes"`
|
|
IOCLossTimes int `json:"iocLossTimes"`
|
|
IOCWinningRatio float64 `json:"iocWinningRatio"`
|
|
}
|
|
|
|
type Strategy struct {
|
|
*bbgo.Environment
|
|
|
|
Symbols []string `json:"symbols"`
|
|
Paths [][]string `json:"paths"`
|
|
MinSpreadRatio fixedpoint.Value `json:"minSpreadRatio"`
|
|
SeparateStream bool `json:"separateStream"`
|
|
Limits map[string]fixedpoint.Value `json:"limits"`
|
|
CoolingDownTime types.Duration `json:"coolingDownTime"`
|
|
NotifyTrade bool `json:"notifyTrade"`
|
|
ResetPosition bool `json:"resetPosition"`
|
|
MarketOrderProtectiveRatio fixedpoint.Value `json:"marketOrderProtectiveRatio"`
|
|
IocOrderRatio fixedpoint.Value `json:"iocOrderRatio"`
|
|
DryRun bool `json:"dryRun"`
|
|
|
|
markets map[string]types.Market
|
|
arbMarkets map[string]*ArbMarket
|
|
paths []*Path
|
|
|
|
session *bbgo.ExchangeSession
|
|
|
|
activeOrders *bbgo.ActiveOrderBook
|
|
orderStore *core.OrderStore
|
|
tradeCollector *core.TradeCollector
|
|
Position *MultiCurrencyPosition `persistence:"position"`
|
|
State *State `persistence:"state"`
|
|
TradeState *types.TradeStats `persistence:"trade_stats"`
|
|
sigC sigchan.Chan
|
|
}
|
|
|
|
func (s *Strategy) ID() string {
|
|
return ID
|
|
}
|
|
|
|
func (s *Strategy) InstanceID() string {
|
|
return ID + strings.Join(s.Symbols, "-")
|
|
}
|
|
|
|
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
|
|
if !s.SeparateStream {
|
|
for _, symbol := range s.Symbols {
|
|
session.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
|
|
Depth: types.DepthLevelFull,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Strategy) executeOrder(ctx context.Context, order types.SubmitOrder) *types.Order {
|
|
waitTime := 100 * time.Millisecond
|
|
for maxTries := 100; maxTries >= 0; maxTries-- {
|
|
createdOrder, err := s.session.Exchange.SubmitOrder(ctx, order)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("can not submit orders")
|
|
time.Sleep(waitTime)
|
|
waitTime *= 2
|
|
continue
|
|
}
|
|
|
|
s.orderStore.Add(*createdOrder)
|
|
s.activeOrders.Add(*createdOrder)
|
|
return createdOrder
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Strategy) Defaults() error {
|
|
if s.TradeState == nil {
|
|
s.TradeState = types.NewTradeStats("")
|
|
}
|
|
|
|
if len(s.Symbols) == 0 {
|
|
s.Symbols = collectSymbols(s.Paths)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Strategy) Initialize() error {
|
|
return nil
|
|
}
|
|
|
|
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
|
|
|
|
s.Symbols = compileSymbols(s.Symbols)
|
|
|
|
if s.MarketOrderProtectiveRatio.IsZero() {
|
|
s.MarketOrderProtectiveRatio = marketOrderProtectiveRatio
|
|
}
|
|
|
|
if s.MinSpreadRatio.IsZero() {
|
|
s.MinSpreadRatio = fixedpoint.NewFromFloat(1.002)
|
|
}
|
|
|
|
if s.State == nil {
|
|
s.State = &State{}
|
|
}
|
|
|
|
s.markets = make(map[string]types.Market)
|
|
s.sigC = sigchan.New(10)
|
|
|
|
s.session = session
|
|
s.orderStore = core.NewOrderStore("")
|
|
s.orderStore.AddOrderUpdate = true
|
|
s.orderStore.BindStream(session.UserDataStream)
|
|
|
|
s.activeOrders = bbgo.NewActiveOrderBook("")
|
|
s.activeOrders.BindStream(session.UserDataStream)
|
|
s.tradeCollector = core.NewTradeCollector("", nil, s.orderStore)
|
|
|
|
for _, symbol := range s.Symbols {
|
|
market, ok := session.Market(symbol)
|
|
if !ok {
|
|
return fmt.Errorf("market not found: %s", symbol)
|
|
}
|
|
s.markets[symbol] = market
|
|
}
|
|
s.optimizeMarketQuantityPrecision()
|
|
|
|
arbMarkets, err := s.buildArbMarkets(session, s.Symbols, s.SeparateStream, s.sigC)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.arbMarkets = arbMarkets
|
|
|
|
if s.Position == nil {
|
|
s.Position = NewMultiCurrencyPosition(s.markets)
|
|
}
|
|
|
|
if s.ResetPosition {
|
|
s.Position = NewMultiCurrencyPosition(s.markets)
|
|
}
|
|
|
|
s.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
|
|
s.Position.handleTrade(trade)
|
|
})
|
|
|
|
if s.NotifyTrade {
|
|
s.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
|
|
bbgo.Notify(trade)
|
|
})
|
|
}
|
|
|
|
s.tradeCollector.BindStream(session.UserDataStream)
|
|
|
|
for _, market := range s.arbMarkets {
|
|
m := market
|
|
if s.SeparateStream {
|
|
log.Infof("connecting %s market stream...", m.Symbol)
|
|
if err := m.stream.Connect(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// build paths
|
|
// rate update and check paths
|
|
for _, pathSymbols := range s.Paths {
|
|
if len(pathSymbols) != 3 {
|
|
return errors.New("a path must contains 3 symbols")
|
|
}
|
|
|
|
p := &Path{
|
|
marketA: s.arbMarkets[pathSymbols[0]],
|
|
marketB: s.arbMarkets[pathSymbols[1]],
|
|
marketC: s.arbMarkets[pathSymbols[2]],
|
|
}
|
|
|
|
if p.marketA == nil {
|
|
return fmt.Errorf("market object of %s is missing", pathSymbols[0])
|
|
}
|
|
|
|
if p.marketB == nil {
|
|
return fmt.Errorf("market object of %s is missing", pathSymbols[1])
|
|
}
|
|
|
|
if p.marketC == nil {
|
|
return fmt.Errorf("market object of %s is missing", pathSymbols[2])
|
|
}
|
|
|
|
if err := p.solveDirection(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.paths = append(s.paths, p)
|
|
}
|
|
|
|
session.UserDataStream.OnAuth(func() {
|
|
go func() {
|
|
fs := []ratioFunction{calculateForwardRatio, calculateBackwardRate}
|
|
log.Infof("waiting for market prices ready...")
|
|
wait := true
|
|
for wait {
|
|
wait = false
|
|
for _, p := range s.paths {
|
|
if !p.Ready() {
|
|
wait = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Infof("all markets ready")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-s.sigC:
|
|
minRatio := s.MinSpreadRatio.Float64()
|
|
for side, f := range fs {
|
|
ranks := s.calculateRanks(minRatio, f)
|
|
if len(ranks) == 0 {
|
|
break
|
|
}
|
|
|
|
forward := side == 0
|
|
bestRank := ranks[0]
|
|
if forward {
|
|
debug("%d paths elected, found best forward path %s profit %.5f%%", len(ranks), bestRank.Path, (bestRank.Ratio-1.0)*100.0)
|
|
} else {
|
|
debug("%d paths elected, found best backward path %s profit %.5f%%", len(ranks), bestRank.Path, (bestRank.Ratio-1.0)*100.0)
|
|
}
|
|
|
|
logMarketPath(bestRank.Path)
|
|
|
|
s.executePath(ctx, session, bestRank.Path, bestRank.Ratio, forward)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
type ratioFunction func(p *Path) float64
|
|
|
|
func (s *Strategy) checkMinimalOrderQuantity(orders [3]types.SubmitOrder) error {
|
|
for _, order := range orders {
|
|
if order.Quantity.Compare(order.Market.MinQuantity) <= 0 {
|
|
return fmt.Errorf("%s order quantity is too small: %f < %f",
|
|
order.Symbol,
|
|
order.Quantity.Float64(), order.Market.MinQuantity.Float64())
|
|
}
|
|
|
|
if order.Quantity.Mul(order.Price).Compare(order.Market.MinNotional) <= 0 {
|
|
return fmt.Errorf("%s order min notional is too small: %f < %f",
|
|
order.Symbol,
|
|
order.Quantity.Mul(order.Price).Float64(), order.Market.MinNotional.Float64())
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Strategy) optimizeMarketQuantityPrecision() {
|
|
var baseMarkets = make(map[string][]types.Market)
|
|
for _, m := range s.markets {
|
|
baseMarkets[m.BaseCurrency] = append(baseMarkets[m.BaseCurrency], m)
|
|
}
|
|
|
|
for _, markets := range baseMarkets {
|
|
var prec = -1
|
|
for _, m := range markets {
|
|
if prec == -1 || m.VolumePrecision < prec {
|
|
prec = m.VolumePrecision
|
|
}
|
|
}
|
|
|
|
if prec == -1 {
|
|
continue
|
|
}
|
|
|
|
for _, m := range markets {
|
|
m.VolumePrecision = prec
|
|
s.markets[m.Symbol] = m
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Strategy) applyBalanceMaxQuantity(balances types.BalanceMap) types.BalanceMap {
|
|
if s.Limits == nil || len(s.Limits) == 0 {
|
|
return balances
|
|
}
|
|
|
|
for c, b := range balances {
|
|
if limit, ok := s.Limits[c]; ok {
|
|
b.Available = fixedpoint.Min(b.Available, limit)
|
|
balances[c] = b
|
|
}
|
|
}
|
|
|
|
return balances
|
|
}
|
|
|
|
func (s *Strategy) addBalanceBuffer(balances types.BalanceMap) (out types.BalanceMap) {
|
|
out = types.BalanceMap{}
|
|
for c, b := range balances {
|
|
ab := b
|
|
ab.Available = ab.Available.Mul(one.Sub(balanceBufferRatio))
|
|
out[c] = ab
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
func (s *Strategy) toProtectiveMarketOrder(order types.SubmitOrder, ratio fixedpoint.Value) types.SubmitOrder {
|
|
sellRatio := one.Sub(ratio)
|
|
buyRatio := one.Add(ratio)
|
|
|
|
switch order.Side {
|
|
case types.SideTypeSell:
|
|
order.Price = order.Price.Mul(sellRatio)
|
|
|
|
case types.SideTypeBuy:
|
|
order.Price = order.Price.Mul(buyRatio)
|
|
}
|
|
|
|
return order
|
|
}
|
|
|
|
func (s *Strategy) toProtectiveMarketOrders(orders [3]types.SubmitOrder, ratio fixedpoint.Value) [3]types.SubmitOrder {
|
|
sellRatio := one.Sub(ratio)
|
|
buyRatio := one.Add(ratio)
|
|
for i, order := range orders {
|
|
switch order.Side {
|
|
case types.SideTypeSell:
|
|
order.Price = order.Price.Mul(sellRatio)
|
|
|
|
case types.SideTypeBuy:
|
|
order.Price = order.Price.Mul(buyRatio)
|
|
}
|
|
|
|
// order.Quantity = order.Market.TruncateQuantity(order.Quantity)
|
|
// order.Type = types.OrderTypeMarket
|
|
orders[i] = order
|
|
}
|
|
|
|
return orders
|
|
}
|
|
|
|
func (s *Strategy) executePath(ctx context.Context, session *bbgo.ExchangeSession, p *Path, ratio float64, dir bool) {
|
|
balances := session.Account.Balances()
|
|
balances = s.addBalanceBuffer(balances)
|
|
balances = s.applyBalanceMaxQuantity(balances)
|
|
|
|
var orders [3]types.SubmitOrder
|
|
if dir {
|
|
orders = p.newOrders(balances, 1)
|
|
} else {
|
|
orders = p.newOrders(balances, -1)
|
|
}
|
|
|
|
if err := s.checkMinimalOrderQuantity(orders); err != nil {
|
|
log.WithError(err).Warnf("order quantity too small, skip")
|
|
return
|
|
}
|
|
|
|
if s.DryRun {
|
|
logSubmitOrders(orders)
|
|
return
|
|
}
|
|
|
|
createdOrders, err := s.iocOrderExecution(ctx, session, orders, ratio)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("order execute error")
|
|
return
|
|
}
|
|
|
|
if len(createdOrders) == 0 {
|
|
return
|
|
}
|
|
|
|
log.Info(s.Position.String())
|
|
|
|
profits := s.Position.CollectProfits()
|
|
profitInUSD := fixedpoint.Zero
|
|
for _, profit := range profits {
|
|
bbgo.Notify(&profit)
|
|
log.Info(profit.PlainText())
|
|
|
|
profitInUSD = profitInUSD.Add(profit.ProfitInUSD)
|
|
|
|
// FIXME:
|
|
// s.TradeState.Add(&profit)
|
|
}
|
|
|
|
notifyUsdPnL(profitInUSD)
|
|
|
|
log.Info(s.TradeState.BriefString())
|
|
|
|
bbgo.Sync(ctx, s)
|
|
|
|
if s.CoolingDownTime > 0 {
|
|
log.Infof("cooling down for %s", s.CoolingDownTime.Duration().String())
|
|
time.Sleep(s.CoolingDownTime.Duration())
|
|
}
|
|
}
|
|
|
|
func notifyUsdPnL(profit fixedpoint.Value) {
|
|
var title = "Triangular Sum PnL ~= "
|
|
title += style.PnLEmojiSimple(profit) + " "
|
|
title += style.PnLSignString(profit) + " USD"
|
|
bbgo.Notify(title)
|
|
}
|
|
|
|
func (s *Strategy) iocOrderExecution(
|
|
ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64,
|
|
) (types.OrderSlice, error) {
|
|
service, ok := session.Exchange.(types.ExchangeOrderQueryService)
|
|
if !ok {
|
|
return nil, errors.New("exchange does not support ExchangeOrderQueryService")
|
|
}
|
|
|
|
var filledQuantity = fixedpoint.Zero
|
|
|
|
// Change the first order to IOC
|
|
orders[0].Type = types.OrderTypeLimit
|
|
orders[0].TimeInForce = types.TimeInForceIOC
|
|
|
|
var originalOrders [3]types.SubmitOrder
|
|
originalOrders[0] = orders[0]
|
|
originalOrders[1] = orders[1]
|
|
originalOrders[2] = orders[2]
|
|
logSubmitOrders(orders)
|
|
|
|
if !s.IocOrderRatio.IsZero() {
|
|
orders[0] = s.toProtectiveMarketOrder(orders[0], s.IocOrderRatio)
|
|
}
|
|
|
|
iocOrder := s.executeOrder(ctx, orders[0])
|
|
if iocOrder == nil {
|
|
return nil, errors.New("ioc order submit error")
|
|
}
|
|
|
|
iocOrderC := make(chan types.Order, 2)
|
|
defer func() {
|
|
close(iocOrderC)
|
|
}()
|
|
|
|
go func() {
|
|
o, err := s.waitWebSocketOrderDone(ctx, iocOrder.OrderID, 300*time.Millisecond)
|
|
if err != nil {
|
|
// log.WithError(err).Errorf("ioc order wait error")
|
|
return
|
|
} else if o != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case iocOrderC <- *o:
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
o, err := retry.QueryOrderUntilFilled(ctx, service, iocOrder.Symbol, iocOrder.OrderID)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("ioc order restful wait error")
|
|
return
|
|
} else if o != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case iocOrderC <- *o:
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
o := <-iocOrderC
|
|
|
|
filledQuantity = o.ExecutedQuantity
|
|
|
|
if filledQuantity.IsZero() {
|
|
s.State.IOCLossTimes++
|
|
|
|
// we didn't get filled
|
|
log.Infof("%s %s IOC order did not get filled, skip: %+v", o.Symbol, o.Side, o)
|
|
return nil, nil
|
|
}
|
|
|
|
filledRatio := filledQuantity.Div(iocOrder.Quantity)
|
|
bbgo.Notify("%s %s IOC order got filled %f/%f (%s)", iocOrder.Symbol, iocOrder.Side, filledQuantity.Float64(), iocOrder.Quantity.Float64(), filledRatio.Percentage())
|
|
log.Infof("%s %s IOC order got filled %f/%f", iocOrder.Symbol, iocOrder.Side, filledQuantity.Float64(), iocOrder.Quantity.Float64())
|
|
|
|
orders[1].Quantity = orders[1].Quantity.Mul(filledRatio)
|
|
orders[2].Quantity = orders[2].Quantity.Mul(filledRatio)
|
|
|
|
orders[1].Quantity = orders[1].Market.TruncateQuantity(orders[1].Quantity)
|
|
orders[2].Quantity = orders[2].Market.TruncateQuantity(orders[2].Quantity)
|
|
|
|
if orders[1].Quantity.Compare(orders[1].Market.MinQuantity) <= 0 {
|
|
log.Warnf("order #2 quantity %f is less than min quantity %f, skip", orders[1].Quantity.Float64(), orders[1].Market.MinQuantity.Float64())
|
|
return nil, nil
|
|
}
|
|
|
|
if orders[2].Quantity.Compare(orders[2].Market.MinQuantity) <= 0 {
|
|
log.Warnf("order #3 quantity %f is less than min quantity %f, skip", orders[2].Quantity.Float64(), orders[2].Market.MinQuantity.Float64())
|
|
return nil, nil
|
|
}
|
|
|
|
orders[1] = s.toProtectiveMarketOrder(orders[1], s.MarketOrderProtectiveRatio)
|
|
orders[2] = s.toProtectiveMarketOrder(orders[2], s.MarketOrderProtectiveRatio)
|
|
|
|
var orderC = make(chan types.Order, 2)
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
|
|
go func() {
|
|
o := s.executeOrder(ctx, orders[1])
|
|
orderC <- *o
|
|
wg.Done()
|
|
}()
|
|
|
|
go func() {
|
|
o := s.executeOrder(ctx, orders[2])
|
|
orderC <- *o
|
|
wg.Done()
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
var createdOrders = make(types.OrderSlice, 3)
|
|
createdOrders[0] = *iocOrder
|
|
createdOrders[1] = <-orderC
|
|
createdOrders[2] = <-orderC
|
|
close(orderC)
|
|
|
|
orderTrades, updatedOrders, err := s.waitOrdersAndCollectTrades(ctx, service, createdOrders)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("trade collecting error")
|
|
} else {
|
|
for i, order := range updatedOrders {
|
|
trades, hasTrades := orderTrades[order.OrderID]
|
|
if !hasTrades {
|
|
continue
|
|
}
|
|
averagePrice := tradeAveragePrice(trades, order.OrderID)
|
|
updatedOrders[i].AveragePrice = averagePrice
|
|
|
|
if market, hasMarket := s.markets[order.Symbol]; hasMarket {
|
|
updatedOrders[i].Market = market
|
|
}
|
|
|
|
for _, originalOrder := range originalOrders {
|
|
if originalOrder.Symbol == updatedOrders[i].Symbol {
|
|
updatedOrders[i].Price = originalOrder.Price
|
|
}
|
|
}
|
|
}
|
|
s.analyzeOrders(updatedOrders)
|
|
}
|
|
|
|
// update ioc winning ratio
|
|
s.State.IOCWinTimes++
|
|
if s.State.IOCLossTimes == 0 {
|
|
s.State.IOCWinningRatio = 999.0
|
|
} else {
|
|
s.State.IOCWinningRatio = float64(s.State.IOCWinTimes) / float64(s.State.IOCLossTimes)
|
|
}
|
|
|
|
log.Infof("ioc winning ratio update: %f", s.State.IOCWinningRatio)
|
|
|
|
return createdOrders, nil
|
|
}
|
|
|
|
func (s *Strategy) waitWebSocketOrderDone(ctx context.Context, orderID uint64, timeoutDuration time.Duration) (*types.Order, error) {
|
|
prof := util.StartTimeProfile("waitWebSocketOrderDone")
|
|
defer prof.StopAndLog(log.Infof)
|
|
|
|
if order, ok := s.orderStore.Get(orderID); ok {
|
|
if order.Status == types.OrderStatusFilled || order.Status == types.OrderStatusCanceled {
|
|
return &order, nil
|
|
}
|
|
}
|
|
|
|
timeoutC := time.After(timeoutDuration)
|
|
for {
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
|
|
case <-timeoutC:
|
|
return nil, fmt.Errorf("order wait time timeout %s", timeoutDuration)
|
|
|
|
case order := <-s.orderStore.C:
|
|
if orderID == order.OrderID && (order.Status == types.OrderStatusFilled || order.Status == types.OrderStatusCanceled) {
|
|
return &order, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Strategy) waitOrdersAndCollectTrades(
|
|
ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice,
|
|
) (map[uint64][]types.Trade, types.OrderSlice, error) {
|
|
var err error
|
|
var orderTrades = make(map[uint64][]types.Trade)
|
|
var updatedOrders types.OrderSlice
|
|
for _, o := range createdOrders {
|
|
updatedOrder, err2 := waitForOrderFilled(ctx, service, o, time.Second)
|
|
if err2 != nil {
|
|
err = multierr.Append(err, err2)
|
|
continue
|
|
}
|
|
|
|
trades, err3 := service.QueryOrderTrades(ctx, types.OrderQuery{
|
|
Symbol: o.Symbol,
|
|
OrderID: strconv.FormatUint(o.OrderID, 10),
|
|
})
|
|
if err3 != nil {
|
|
err = multierr.Append(err, err3)
|
|
continue
|
|
}
|
|
|
|
for _, t := range trades {
|
|
s.tradeCollector.ProcessTrade(t)
|
|
}
|
|
|
|
orderTrades[o.OrderID] = trades
|
|
updatedOrders = append(updatedOrders, *updatedOrder)
|
|
}
|
|
|
|
/*
|
|
*/
|
|
return orderTrades, updatedOrders, nil
|
|
}
|
|
|
|
func (s *Strategy) analyzeOrders(orders types.OrderSlice) {
|
|
sort.Slice(orders, func(i, j int) bool {
|
|
// o1 < o2 -- earlier first
|
|
return orders[i].CreationTime.Before(orders[i].CreationTime.Time())
|
|
})
|
|
|
|
log.Infof("ANALYZING ORDERS (Earlier First)")
|
|
for i, o := range orders {
|
|
in, inCurrency := o.In()
|
|
out, outCurrency := o.Out()
|
|
log.Infof("#%d %s IN %f %s -> OUT %f %s", i, o.String(), in.Float64(), inCurrency, out.Float64(), outCurrency)
|
|
}
|
|
|
|
for _, o := range orders {
|
|
switch o.Side {
|
|
case types.SideTypeSell:
|
|
price := o.Price
|
|
priceDiff := o.AveragePrice.Sub(price)
|
|
slippage := priceDiff.Div(price)
|
|
log.Infof("%-8s %-4s %-10s AVG PRICE %f PRICE %f Q %f SLIPPAGE %.3f%%", o.Symbol, o.Side, o.Type, o.AveragePrice.Float64(), price.Float64(), o.Quantity.Float64(), slippage.Float64()*100.0)
|
|
|
|
case types.SideTypeBuy:
|
|
price := o.Price
|
|
priceDiff := price.Sub(o.AveragePrice)
|
|
slippage := priceDiff.Div(price)
|
|
log.Infof("%-8s %-4s %-10s AVG PRICE %f PRICE %f Q %f SLIPPAGE %.3f%%", o.Symbol, o.Side, o.Type, o.AveragePrice.Float64(), price.Float64(), o.Quantity.Float64(), slippage.Float64()*100.0)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Strategy) buildArbMarkets(
|
|
session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan,
|
|
) (map[string]*ArbMarket, error) {
|
|
markets := make(map[string]*ArbMarket)
|
|
// build market object
|
|
for _, symbol := range symbols {
|
|
market, ok := s.markets[symbol]
|
|
if !ok {
|
|
return nil, fmt.Errorf("market not found: %s", symbol)
|
|
}
|
|
|
|
m := &ArbMarket{
|
|
Symbol: symbol,
|
|
market: market,
|
|
BaseCurrency: market.BaseCurrency,
|
|
QuoteCurrency: market.QuoteCurrency,
|
|
sigC: sigC,
|
|
truncateBaseQuantity: createBaseQuantityTruncator(market),
|
|
truncateQuoteQuantity: createPricePrecisionBasedQuoteQuantityTruncator(market),
|
|
}
|
|
|
|
if separateStream {
|
|
stream := session.Exchange.NewStream()
|
|
stream.SetPublicOnly()
|
|
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
|
|
Depth: types.DepthLevelFull,
|
|
Speed: types.SpeedHigh,
|
|
})
|
|
|
|
book := types.NewStreamBook(symbol, session.ExchangeName)
|
|
priceUpdater := func(_ types.SliceOrderBook) {
|
|
bestBid, bestAsk, _ := book.BestBidAndAsk()
|
|
if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) {
|
|
return
|
|
}
|
|
|
|
m.bestBid = bestBid
|
|
m.bestAsk = bestAsk
|
|
m.updateRate()
|
|
}
|
|
book.OnUpdate(priceUpdater)
|
|
book.OnSnapshot(priceUpdater)
|
|
book.BindStream(stream)
|
|
|
|
stream.OnDisconnect(func() {
|
|
// reset price and volume
|
|
m.bestBid = types.PriceVolume{}
|
|
m.bestAsk = types.PriceVolume{}
|
|
})
|
|
|
|
m.book = book
|
|
m.stream = stream
|
|
} else {
|
|
book, _ := session.OrderBook(symbol)
|
|
priceUpdater := func(_ types.SliceOrderBook) {
|
|
bestAsk, bestBid, _ := book.BestBidAndAsk()
|
|
if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) {
|
|
return
|
|
}
|
|
|
|
m.bestBid = bestBid
|
|
m.bestAsk = bestAsk
|
|
m.updateRate()
|
|
}
|
|
book.OnUpdate(priceUpdater)
|
|
book.OnSnapshot(priceUpdater)
|
|
|
|
m.book = book
|
|
m.stream = session.MarketDataStream
|
|
}
|
|
|
|
markets[symbol] = m
|
|
}
|
|
|
|
return markets, nil
|
|
}
|
|
|
|
func (s *Strategy) calculateRanks(minRatio float64, method func(p *Path) float64) []PathRank {
|
|
ranks := make([]PathRank, 0, len(s.paths))
|
|
|
|
// ranking paths here
|
|
for _, path := range s.paths {
|
|
ratio := method(path)
|
|
if ratio < minRatio {
|
|
continue
|
|
}
|
|
|
|
p := path
|
|
ranks = append(ranks, PathRank{Path: p, Ratio: ratio})
|
|
}
|
|
|
|
// sort and pick up the top rank path
|
|
sort.Slice(ranks, func(i, j int) bool {
|
|
return ranks[i].Ratio > ranks[j].Ratio
|
|
})
|
|
|
|
return ranks
|
|
}
|
|
|
|
func waitForOrderFilled(
|
|
ctx context.Context, ex types.ExchangeOrderQueryService, order types.Order, timeout time.Duration,
|
|
) (*types.Order, error) {
|
|
prof := util.StartTimeProfile("waitForOrderFilled")
|
|
defer prof.StopAndLog(log.Infof)
|
|
|
|
timeoutC := time.After(timeout)
|
|
|
|
for {
|
|
select {
|
|
case <-timeoutC:
|
|
return nil, fmt.Errorf("order wait timeout %s", timeout)
|
|
|
|
default:
|
|
p := util.StartTimeProfile("queryOrder")
|
|
remoteOrder, err2 := ex.QueryOrder(ctx, types.OrderQuery{
|
|
Symbol: order.Symbol,
|
|
OrderID: strconv.FormatUint(order.OrderID, 10),
|
|
})
|
|
p.StopAndLog(log.Infof)
|
|
|
|
if err2 != nil {
|
|
log.WithError(err2).Errorf("order query error")
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
switch remoteOrder.Status {
|
|
case types.OrderStatusFilled, types.OrderStatusCanceled:
|
|
return remoteOrder, nil
|
|
default:
|
|
log.Infof("WAITING: %s", remoteOrder.String())
|
|
time.Sleep(5 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func tradeAveragePrice(trades []types.Trade, orderID uint64) fixedpoint.Value {
|
|
totalAmount := fixedpoint.Zero
|
|
totalQuantity := fixedpoint.Zero
|
|
for _, trade := range trades {
|
|
if trade.OrderID != orderID {
|
|
continue
|
|
}
|
|
|
|
totalAmount = totalAmount.Add(trade.Price.Mul(trade.Quantity))
|
|
totalQuantity = totalQuantity.Add(trade.Quantity)
|
|
}
|
|
|
|
return totalAmount.Div(totalQuantity)
|
|
}
|
|
|
|
func displayBook(id string, market *ArbMarket) {
|
|
if !debugMode {
|
|
return
|
|
}
|
|
|
|
var s strings.Builder
|
|
|
|
s.WriteString(id + ") " + market.market.Symbol + "\n")
|
|
s.WriteString(fmt.Sprintf("bestAsk: %-12s x %s\n",
|
|
market.bestAsk.Price.String(),
|
|
market.bestAsk.Volume.String(),
|
|
))
|
|
s.WriteString(fmt.Sprintf("bestBid: %-12s x %s\n",
|
|
market.bestBid.Price.String(),
|
|
market.bestBid.Volume.String(),
|
|
))
|
|
|
|
debug(s.String())
|
|
}
|
|
|
|
func logMarketPath(path *Path) {
|
|
if !debugMode {
|
|
return
|
|
}
|
|
|
|
displayBook("A", path.marketA)
|
|
displayBook("B", path.marketB)
|
|
displayBook("C", path.marketC)
|
|
}
|
|
|
|
func collectSymbols(paths [][]string) (symbols []string) {
|
|
symbolMap := make(map[string]struct{})
|
|
for _, path := range paths {
|
|
for _, s := range path {
|
|
symbolMap[s] = struct{}{}
|
|
}
|
|
}
|
|
|
|
for s := range symbolMap {
|
|
symbols = append(symbols, s)
|
|
}
|
|
|
|
return symbols
|
|
}
|