Merge pull request #1221 from c9s/feature/tri

FEATURE: add triangular arbitrate strategy as an example
This commit is contained in:
c9s 2023-07-05 16:24:29 +08:00 committed by GitHub
commit 05a8a7442c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 3850 additions and 39 deletions

35
config/tri.yaml Normal file
View File

@ -0,0 +1,35 @@
---
notifications:
slack:
defaultChannel: "dev-bbgo"
errorChannel: "bbgo-error"
sessions:
binance:
exchange: binance
envVarPrefix: binance
exchangeStrategies:
## triangular arbitrage strategy
- on: binance
tri:
minSpreadRatio: 1.0011
separateStream: true
# resetPosition: true
limits:
BTC: 0.001
ETH: 0.01
USDT: 20.0
symbols:
- BNBUSDT
- BNBBTC
- BNBETH
- BTCUSDT
- ETHUSDT
- ETHBTC
paths:
- [ BTCUSDT, ETHBTC, ETHUSDT ]
- [ BNBBTC, BNBUSDT, BTCUSDT ]
- [ BNBETH, BNBUSDT, ETHUSDT ]

View File

@ -5,12 +5,13 @@
package mocks package mocks
import ( import (
context "context" "context"
reflect "reflect" "reflect"
bbgo "github.com/c9s/bbgo/pkg/bbgo" "github.com/golang/mock/gomock"
types "github.com/c9s/bbgo/pkg/types"
gomock "github.com/golang/mock/gomock" "github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/types"
) )
// MockOrderExecutorExtended is a mock of OrderExecutorExtended interface. // MockOrderExecutorExtended is a mock of OrderExecutorExtended interface.
@ -90,10 +91,10 @@ func (mr *MockOrderExecutorExtendedMockRecorder) SubmitOrders(arg0 interface{},
} }
// TradeCollector mocks base method. // TradeCollector mocks base method.
func (m *MockOrderExecutorExtended) TradeCollector() *bbgo.TradeCollector { func (m *MockOrderExecutorExtended) TradeCollector() *core.TradeCollector {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TradeCollector") ret := m.ctrl.Call(m, "TradeCollector")
ret0, _ := ret[0].(*bbgo.TradeCollector) ret0, _ := ret[0].(*core.TradeCollector)
return ret0 return ret0
} }

View File

@ -10,6 +10,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.uber.org/multierr" "go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
@ -32,7 +33,7 @@ type OrderExecutor interface {
type OrderExecutorExtended interface { type OrderExecutorExtended interface {
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
CancelOrders(ctx context.Context, orders ...types.Order) error CancelOrders(ctx context.Context, orders ...types.Order) error
TradeCollector() *TradeCollector TradeCollector() *core.TradeCollector
Position() *types.Position Position() *types.Position
} }

View File

@ -35,7 +35,7 @@ type GeneralOrderExecutor struct {
position *types.Position position *types.Position
activeMakerOrders *ActiveOrderBook activeMakerOrders *ActiveOrderBook
orderStore *core.OrderStore orderStore *core.OrderStore
tradeCollector *TradeCollector tradeCollector *core.TradeCollector
logger log.FieldLogger logger log.FieldLogger
@ -60,7 +60,7 @@ func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strateg
position: position, position: position,
activeMakerOrders: NewActiveOrderBook(symbol), activeMakerOrders: NewActiveOrderBook(symbol),
orderStore: orderStore, orderStore: orderStore,
tradeCollector: NewTradeCollector(symbol, position, orderStore), tradeCollector: core.NewTradeCollector(symbol, position, orderStore),
} }
if session != nil && session.Margin { if session != nil && session.Margin {
@ -517,7 +517,7 @@ func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fix
return nil return nil
} }
func (e *GeneralOrderExecutor) TradeCollector() *TradeCollector { func (e *GeneralOrderExecutor) TradeCollector() *core.TradeCollector {
return e.tradeCollector return e.tradeCollector
} }

View File

@ -308,7 +308,7 @@ var BacktestCmd = &cobra.Command{
var reportDir = outputDirectory var reportDir = outputDirectory
var sessionTradeStats = make(map[string]map[string]*types.TradeStats) var sessionTradeStats = make(map[string]map[string]*types.TradeStats)
var tradeCollectorList []*bbgo.TradeCollector var tradeCollectorList []*core.TradeCollector
for _, exSource := range exchangeSources { for _, exSource := range exchangeSources {
sessionName := exSource.Session.Name sessionName := exSource.Session.Name
tradeStatsMap := make(map[string]*types.TradeStats) tradeStatsMap := make(map[string]*types.TradeStats)
@ -317,7 +317,7 @@ var BacktestCmd = &cobra.Command{
position := types.NewPositionFromMarket(market) position := types.NewPositionFromMarket(market)
orderStore := core.NewOrderStore(usedSymbol) orderStore := core.NewOrderStore(usedSymbol)
orderStore.AddOrderUpdate = true orderStore.AddOrderUpdate = true
tradeCollector := bbgo.NewTradeCollector(usedSymbol, position, orderStore) tradeCollector := core.NewTradeCollector(usedSymbol, position, orderStore)
tradeStats := types.NewTradeStats(usedSymbol) tradeStats := types.NewTradeStats(usedSymbol)
tradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime)) tradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))

View File

@ -15,12 +15,14 @@ type OrderStore struct {
RemoveCancelled bool RemoveCancelled bool
RemoveFilled bool RemoveFilled bool
AddOrderUpdate bool AddOrderUpdate bool
C chan types.Order
} }
func NewOrderStore(symbol string) *OrderStore { func NewOrderStore(symbol string) *OrderStore {
return &OrderStore{ return &OrderStore{
Symbol: symbol, Symbol: symbol,
orders: make(map[uint64]types.Order), orders: make(map[uint64]types.Order),
C: make(chan types.Order),
} }
} }
@ -129,6 +131,7 @@ func (s *OrderStore) BindStream(stream types.Stream) {
} }
func (s *OrderStore) HandleOrderUpdate(order types.Order) { func (s *OrderStore) HandleOrderUpdate(order types.Order) {
switch order.Status { switch order.Status {
case types.OrderStatusNew, types.OrderStatusPartiallyFilled, types.OrderStatusFilled: case types.OrderStatusNew, types.OrderStatusPartiallyFilled, types.OrderStatusFilled:
@ -152,4 +155,9 @@ func (s *OrderStore) HandleOrderUpdate(order types.Order) {
case types.OrderStatusRejected: case types.OrderStatusRejected:
s.Remove(order) s.Remove(order)
} }
select {
case s.C <- order:
default:
}
} }

View File

@ -1,13 +1,12 @@
package bbgo package core
import ( import (
"context" "context"
"sync" "sync"
"time" "time"
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan" "github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
@ -18,10 +17,10 @@ type TradeCollector struct {
Symbol string Symbol string
orderSig sigchan.Chan orderSig sigchan.Chan
tradeStore *core.TradeStore tradeStore *TradeStore
tradeC chan types.Trade tradeC chan types.Trade
position *types.Position position *types.Position
orderStore *core.OrderStore orderStore *OrderStore
doneTrades map[types.TradeKey]struct{} doneTrades map[types.TradeKey]struct{}
mu sync.Mutex mu sync.Mutex
@ -34,13 +33,13 @@ type TradeCollector struct {
profitCallbacks []func(trade types.Trade, profit *types.Profit) profitCallbacks []func(trade types.Trade, profit *types.Profit)
} }
func NewTradeCollector(symbol string, position *types.Position, orderStore *core.OrderStore) *TradeCollector { func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
return &TradeCollector{ return &TradeCollector{
Symbol: symbol, Symbol: symbol,
orderSig: sigchan.New(1), orderSig: sigchan.New(1),
tradeC: make(chan types.Trade, 100), tradeC: make(chan types.Trade, 100),
tradeStore: core.NewTradeStore(), tradeStore: NewTradeStore(),
doneTrades: make(map[types.TradeKey]struct{}), doneTrades: make(map[types.TradeKey]struct{}),
position: position, position: position,
orderStore: orderStore, orderStore: orderStore,
@ -48,7 +47,7 @@ func NewTradeCollector(symbol string, position *types.Position, orderStore *core
} }
// OrderStore returns the order store used by the trade collector // OrderStore returns the order store used by the trade collector
func (c *TradeCollector) OrderStore() *core.OrderStore { func (c *TradeCollector) OrderStore() *OrderStore {
return c.orderStore return c.orderStore
} }
@ -57,7 +56,7 @@ func (c *TradeCollector) Position() *types.Position {
return c.position return c.position
} }
func (c *TradeCollector) TradeStore() *core.TradeStore { func (c *TradeCollector) TradeStore() *TradeStore {
return c.tradeStore return c.tradeStore
} }
@ -99,9 +98,9 @@ func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHist
} }
for _, td := range trades { for _, td := range trades {
log.Debugf("processing trade: %s", td.String()) logrus.Debugf("processing trade: %s", td.String())
if c.ProcessTrade(td) { if c.ProcessTrade(td) {
log.Infof("recovered trade: %s", td.String()) logrus.Infof("recovered trade: %s", td.String())
c.EmitRecover(td) c.EmitRecover(td)
} }
} }

View File

@ -1,6 +1,6 @@
// Code generated by "callbackgen -type TradeCollector"; DO NOT EDIT. // Code generated by "callbackgen -type TradeCollector"; DO NOT EDIT.
package bbgo package core
import ( import (
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"

View File

@ -1,11 +1,10 @@
package bbgo package core
import ( import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -13,7 +12,7 @@ import (
func TestTradeCollector_ShouldNotCountDuplicatedTrade(t *testing.T) { func TestTradeCollector_ShouldNotCountDuplicatedTrade(t *testing.T) {
symbol := "BTCUSDT" symbol := "BTCUSDT"
position := types.NewPosition(symbol, "BTC", "USDT") position := types.NewPosition(symbol, "BTC", "USDT")
orderStore := core.NewOrderStore(symbol) orderStore := NewOrderStore(symbol)
collector := NewTradeCollector(symbol, position, orderStore) collector := NewTradeCollector(symbol, position, orderStore)
assert.NotNil(t, collector) assert.NotNil(t, collector)

View File

@ -8,6 +8,7 @@ import (
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/bbgo/mocks" "github.com/c9s/bbgo/pkg/bbgo/mocks"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -93,7 +94,7 @@ func TestReleasePositionCallbacks(t *testing.T) {
}, },
} }
tradeCollector := &bbgo.TradeCollector{} tradeCollector := &core.TradeCollector{}
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
orderExecutor := mocks.NewMockOrderExecutorExtended(mockCtrl) orderExecutor := mocks.NewMockOrderExecutorExtended(mockCtrl)

View File

@ -49,7 +49,7 @@ type Strategy struct {
// closePositionOrders *bbgo.LocalActiveOrderBook // closePositionOrders *bbgo.LocalActiveOrderBook
orderStore *core.OrderStore orderStore *core.OrderStore
tradeCollector *bbgo.TradeCollector tradeCollector *core.TradeCollector
session *bbgo.ExchangeSession session *bbgo.ExchangeSession
@ -174,7 +174,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.Position.Strategy = ID s.Position.Strategy = ID
s.Position.StrategyInstanceID = instanceID s.Position.StrategyInstanceID = instanceID
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
// StrategyController // StrategyController
if s.Status != types.StrategyStatusRunning { if s.Status != types.StrategyStatusRunning {

View File

@ -95,7 +95,7 @@ type Strategy struct {
// activeOrders is the locally maintained active order book of the maker orders. // activeOrders is the locally maintained active order book of the maker orders.
activeOrders *bbgo.ActiveOrderBook activeOrders *bbgo.ActiveOrderBook
tradeCollector *bbgo.TradeCollector tradeCollector *core.TradeCollector
// groupID is the group ID used for the strategy instance for canceling orders // groupID is the group ID used for the strategy instance for canceling orders
groupID uint32 groupID uint32
@ -571,7 +571,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.activeOrders.OnFilled(s.handleFilledOrder) s.activeOrders.OnFilled(s.handleFilledOrder)
s.activeOrders.BindStream(session.UserDataStream) s.activeOrders.BindStream(session.UserDataStream)
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.State.Position, s.orderStore) s.tradeCollector = core.NewTradeCollector(s.Symbol, s.State.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
bbgo.Notify(trade) bbgo.Notify(trade)

104
pkg/strategy/tri/market.go Normal file
View File

@ -0,0 +1,104 @@
package tri
import (
"fmt"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types"
)
type ArbMarket struct {
Symbol string
BaseCurrency, QuoteCurrency string
market types.Market
stream types.Stream
book *types.StreamOrderBook
bestBid, bestAsk types.PriceVolume
buyRate, sellRate float64
sigC sigchan.Chan
}
func (m *ArbMarket) String() string {
return m.Symbol
}
func (m *ArbMarket) getInitialBalance(balances types.BalanceMap, dir int) (fixedpoint.Value, string) {
if dir == 1 { // sell 1 BTC -> 19000 USDT
b, ok := balances[m.BaseCurrency]
if !ok {
return fixedpoint.Zero, m.BaseCurrency
}
return m.market.TruncateQuantity(b.Available), m.BaseCurrency
} else if dir == -1 {
b, ok := balances[m.QuoteCurrency]
if !ok {
return fixedpoint.Zero, m.QuoteCurrency
}
return m.market.TruncateQuantity(b.Available), m.QuoteCurrency
}
return fixedpoint.Zero, ""
}
func (m *ArbMarket) calculateRatio(dir int) float64 {
if dir == 1 { // direct 1 = sell
if m.bestBid.Price.IsZero() || m.bestBid.Volume.Compare(m.market.MinQuantity) <= 0 {
return 0.0
}
return m.sellRate
} else if dir == -1 {
if m.bestAsk.Price.IsZero() || m.bestAsk.Volume.Compare(m.market.MinQuantity) <= 0 {
return 0.0
}
return m.buyRate
}
return 0.0
}
func (m *ArbMarket) updateRate() {
m.buyRate = 1.0 / m.bestAsk.Price.Float64()
m.sellRate = m.bestBid.Price.Float64()
if m.bestBid.Volume.Compare(m.market.MinQuantity) <= 0 && m.bestAsk.Volume.Compare(m.market.MinQuantity) <= 0 {
return
}
m.sigC.Emit()
}
func (m *ArbMarket) newOrder(dir int, transitingQuantity float64) (types.SubmitOrder, float64) {
if dir == 1 { // sell ETH -> BTC, sell USDT -> TWD
q, r := fitQuantityByBase(m.market.TruncateQuantity(m.bestBid.Volume).Float64(), transitingQuantity)
fq := fixedpoint.NewFromFloat(q)
return types.SubmitOrder{
Symbol: m.Symbol,
Side: types.SideTypeSell,
Type: types.OrderTypeLimit,
Quantity: fq,
Price: m.bestBid.Price,
Market: m.market,
}, r
} else if dir == -1 { // use 1 BTC to buy X ETH
q, r := fitQuantityByQuote(m.bestAsk.Price.Float64(), m.market.TruncateQuantity(m.bestAsk.Volume).Float64(), transitingQuantity)
fq := fixedpoint.NewFromFloat(q)
return types.SubmitOrder{
Symbol: m.Symbol,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Quantity: fq,
Price: m.bestAsk.Price,
Market: m.market,
}, r
} else {
panic(fmt.Errorf("unexpected direction: %v, valid values are (1, -1)", dir))
}
return types.SubmitOrder{}, 0.0
}

83
pkg/strategy/tri/path.go Normal file
View File

@ -0,0 +1,83 @@
package tri
import (
"fmt"
"github.com/c9s/bbgo/pkg/types"
)
type Path struct {
marketA, marketB, marketC *ArbMarket
dirA, dirB, dirC int
}
func (p *Path) solveDirection() error {
// check if we should reverse the rate
// ETHUSDT -> ETHBTC
if p.marketA.QuoteCurrency == p.marketB.BaseCurrency || p.marketA.QuoteCurrency == p.marketB.QuoteCurrency {
p.dirA = 1
} else if p.marketA.BaseCurrency == p.marketB.BaseCurrency || p.marketA.BaseCurrency == p.marketB.QuoteCurrency {
p.dirA = -1
} else {
return fmt.Errorf("marketA and marketB is not related")
}
if p.marketB.QuoteCurrency == p.marketC.BaseCurrency || p.marketB.QuoteCurrency == p.marketC.QuoteCurrency {
p.dirB = 1
} else if p.marketB.BaseCurrency == p.marketC.BaseCurrency || p.marketB.BaseCurrency == p.marketC.QuoteCurrency {
p.dirB = -1
} else {
return fmt.Errorf("marketB and marketC is not related")
}
if p.marketC.QuoteCurrency == p.marketA.BaseCurrency || p.marketC.QuoteCurrency == p.marketA.QuoteCurrency {
p.dirC = 1
} else if p.marketC.BaseCurrency == p.marketA.BaseCurrency || p.marketC.BaseCurrency == p.marketA.QuoteCurrency {
p.dirC = -1
} else {
return fmt.Errorf("marketC and marketA is not related")
}
return nil
}
func (p *Path) Ready() bool {
return !(p.marketA.bestAsk.Price.IsZero() || p.marketA.bestBid.Price.IsZero() ||
p.marketB.bestAsk.Price.IsZero() || p.marketB.bestBid.Price.IsZero() ||
p.marketC.bestAsk.Price.IsZero() || p.marketC.bestBid.Price.IsZero())
}
func (p *Path) String() string {
return p.marketA.String() + " " + p.marketB.String() + " " + p.marketC.String()
}
func (p *Path) newOrders(balances types.BalanceMap, sign int) [3]types.SubmitOrder {
var orders [3]types.SubmitOrder
var transitingQuantity float64
initialBalance, _ := p.marketA.getInitialBalance(balances, p.dirA*sign)
orderA, _ := p.marketA.newOrder(p.dirA*sign, initialBalance.Float64())
orders[0] = orderA
q, _ := orderA.Out()
transitingQuantity = q.Float64()
// orderB
orderB, rateB := p.marketB.newOrder(p.dirB*sign, transitingQuantity)
orders = adjustOrderQuantityByRate(orders, rateB)
q, _ = orderB.Out()
transitingQuantity = q.Float64()
orders[1] = orderB
orderC, rateC := p.marketC.newOrder(p.dirC*sign, transitingQuantity)
orders = adjustOrderQuantityByRate(orders, rateC)
q, _ = orderC.Out()
orders[2] = orderC
orders[0].Quantity = p.marketA.market.TruncateQuantity(orders[0].Quantity)
orders[1].Quantity = p.marketB.market.TruncateQuantity(orders[1].Quantity)
orders[2].Quantity = p.marketC.market.TruncateQuantity(orders[2].Quantity)
return orders
}

View File

@ -0,0 +1,139 @@
package tri
import (
"fmt"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type MultiCurrencyPosition struct {
Currencies map[string]fixedpoint.Value `json:"currencies"`
Markets map[string]types.Market `json:"markets"`
TotalProfits map[string]fixedpoint.Value `json:"totalProfits"`
Fees map[string]fixedpoint.Value `json:"fees"`
TradePrices map[string]fixedpoint.Value `json:"prices"`
}
func NewMultiCurrencyPosition(markets map[string]types.Market) *MultiCurrencyPosition {
p := &MultiCurrencyPosition{
Currencies: make(map[string]fixedpoint.Value),
Markets: make(map[string]types.Market),
TotalProfits: make(map[string]fixedpoint.Value),
TradePrices: make(map[string]fixedpoint.Value),
Fees: make(map[string]fixedpoint.Value),
}
for _, market := range markets {
p.Markets[market.Symbol] = market
p.Currencies[market.BaseCurrency] = fixedpoint.Zero
p.Currencies[market.QuoteCurrency] = fixedpoint.Zero
p.TotalProfits[market.QuoteCurrency] = fixedpoint.Zero
p.TotalProfits[market.BaseCurrency] = fixedpoint.Zero
p.Fees[market.QuoteCurrency] = fixedpoint.Zero
p.Fees[market.BaseCurrency] = fixedpoint.Zero
p.TradePrices[market.QuoteCurrency] = fixedpoint.Zero
p.TradePrices[market.BaseCurrency] = fixedpoint.Zero
}
return p
}
func (p *MultiCurrencyPosition) handleTrade(trade types.Trade) {
market := p.Markets[trade.Symbol]
switch trade.Side {
case types.SideTypeBuy:
p.Currencies[market.BaseCurrency] = p.Currencies[market.BaseCurrency].Add(trade.Quantity)
p.Currencies[market.QuoteCurrency] = p.Currencies[market.QuoteCurrency].Sub(trade.QuoteQuantity)
case types.SideTypeSell:
p.Currencies[market.BaseCurrency] = p.Currencies[market.BaseCurrency].Sub(trade.Quantity)
p.Currencies[market.QuoteCurrency] = p.Currencies[market.QuoteCurrency].Add(trade.QuoteQuantity)
}
if types.IsUSDFiatCurrency(market.QuoteCurrency) {
p.TradePrices[market.BaseCurrency] = trade.Price
} else if types.IsUSDFiatCurrency(market.BaseCurrency) { // For USDT/TWD pair, convert USDT/TWD price to TWD/USDT
p.TradePrices[market.QuoteCurrency] = one.Div(trade.Price)
}
if !trade.Fee.IsZero() {
if f, ok := p.Fees[trade.FeeCurrency]; ok {
p.Fees[trade.FeeCurrency] = f.Add(trade.Fee)
} else {
p.Fees[trade.FeeCurrency] = trade.Fee
}
}
}
func (p *MultiCurrencyPosition) CollectProfits() []Profit {
var profits []Profit
for currency, base := range p.Currencies {
if base.IsZero() {
continue
}
profit := Profit{
Asset: currency,
Profit: base,
ProfitInUSD: fixedpoint.Zero,
}
if price, ok := p.TradePrices[currency]; ok && !price.IsZero() {
profit.ProfitInUSD = base.Mul(price)
} else if types.IsUSDFiatCurrency(currency) {
profit.ProfitInUSD = base
}
profits = append(profits, profit)
if total, ok := p.TotalProfits[currency]; ok {
p.TotalProfits[currency] = total.Add(base)
} else {
p.TotalProfits[currency] = base
}
}
p.Reset()
return profits
}
func (p *MultiCurrencyPosition) Reset() {
for currency := range p.Currencies {
p.Currencies[currency] = fixedpoint.Zero
}
}
func (p *MultiCurrencyPosition) String() (o string) {
o += "position: \n"
for currency, base := range p.Currencies {
if base.IsZero() {
continue
}
o += fmt.Sprintf("- %s: %f\n", currency, base.Float64())
}
o += "totalProfits: \n"
for currency, total := range p.TotalProfits {
if total.IsZero() {
continue
}
o += fmt.Sprintf("- %s: %f\n", currency, total.Float64())
}
o += "fees: \n"
for currency, fee := range p.Fees {
if fee.IsZero() {
continue
}
o += fmt.Sprintf("- %s: %f\n", currency, fee.Float64())
}
return o
}

View File

@ -0,0 +1,62 @@
package tri
import (
"fmt"
"github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/style"
)
type Profit struct {
Asset string `json:"asset"`
Profit fixedpoint.Value `json:"profit"`
ProfitInUSD fixedpoint.Value `json:"profitInUSD"`
}
func (p *Profit) PlainText() string {
var title = fmt.Sprintf("Arbitrage Profit ")
title += style.PnLEmojiSimple(p.Profit) + " "
title += style.PnLSignString(p.Profit) + " " + p.Asset
if !p.ProfitInUSD.IsZero() {
title += " ~= " + style.PnLSignString(p.ProfitInUSD) + " USD"
}
return title
}
func (p *Profit) SlackAttachment() slack.Attachment {
var color = style.PnLColor(p.Profit)
var title = fmt.Sprintf("Triangular PnL ")
title += style.PnLEmojiSimple(p.Profit) + " "
title += style.PnLSignString(p.Profit) + " " + p.Asset
if !p.ProfitInUSD.IsZero() {
title += " ~= " + style.PnLSignString(p.ProfitInUSD) + " USD"
}
var fields []slack.AttachmentField
if !p.Profit.IsZero() {
fields = append(fields, slack.AttachmentField{
Title: "Profit",
Value: style.PnLSignString(p.Profit) + " " + p.Asset,
Short: true,
})
}
if !p.ProfitInUSD.IsZero() {
fields = append(fields, slack.AttachmentField{
Title: "Profit (~= USD)",
Value: style.PnLSignString(p.ProfitInUSD) + " USD",
Short: true,
})
}
return slack.Attachment{
Color: color,
Title: title,
Fields: fields,
// Footer: "",
}
}

View File

@ -0,0 +1,880 @@
package tri
import (
"context"
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/style"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
//go:generate bash symbols.sh
const ID = "tri"
var log = logrus.WithField("strategy", ID)
var one = fixedpoint.One
var marketOrderProtectiveRatio = fixedpoint.NewFromFloat(0.008)
var balanceBufferRatio = fixedpoint.NewFromFloat(0.005)
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
type Side int
const Buy Side = 1
const Sell Side = -1
func (s Side) String() string {
return s.SideType().String()
}
func (s Side) SideType() types.SideType {
if s == 1 {
return types.SideTypeBuy
}
return types.SideTypeSell
}
type PathRank struct {
Path *Path
Ratio float64
}
// backward buy -> buy -> sell
func calculateBackwardRate(p *Path) float64 {
var ratio = 1.0
ratio *= p.marketA.calculateRatio(-p.dirA)
ratio *= p.marketB.calculateRatio(-p.dirB)
ratio *= p.marketC.calculateRatio(-p.dirC)
return ratio
}
// calculateForwardRatio
// path: BTCUSDT (0.000044 / 22830.410000) => USDTTWD (0.033220 / 30.101000) => BTCTWD (0.000001 / 687500.000000) <= -> 0.9995899221105569 <- 1.0000373943873788
// 1.0 * 22830 * 30.101000 / 687500.000
// BTCUSDT (0.000044 / 22856.910000) => USDTTWD (0.033217 / 30.104000) => BTCTWD (0.000001 / 688002.100000)
// sell -> rate * 22856
// sell -> rate * 30.104
// buy -> rate / 688002.1
// 1.0000798312
func calculateForwardRatio(p *Path) float64 {
var ratio = 1.0
ratio *= p.marketA.calculateRatio(p.dirA)
ratio *= p.marketB.calculateRatio(p.dirB)
ratio *= p.marketC.calculateRatio(p.dirC)
return ratio
}
func adjustOrderQuantityByRate(orders [3]types.SubmitOrder, rate float64) [3]types.SubmitOrder {
if rate == 1.0 || math.IsNaN(rate) {
return orders
}
for i, o := range orders {
orders[i].Quantity = o.Quantity.Mul(fixedpoint.NewFromFloat(rate))
}
return orders
}
type State struct {
IOCWinTimes int `json:"iocWinningTimes"`
IOCLossTimes int `json:"iocLossTimes"`
IOCWinningRatio float64 `json:"iocWinningRatio"`
}
type Strategy struct {
Symbols []string `json:"symbols"`
Paths [][]string `json:"paths"`
MinSpreadRatio fixedpoint.Value `json:"minSpreadRatio"`
SeparateStream bool `json:"separateStream"`
Limits map[string]fixedpoint.Value `json:"limits"`
CoolingDownTime types.Duration `json:"coolingDownTime"`
NotifyTrade bool `json:"notifyTrade"`
ResetPosition bool `json:"resetPosition"`
MarketOrderProtectiveRatio fixedpoint.Value `json:"marketOrderProtectiveRatio"`
IocOrderRatio fixedpoint.Value `json:"iocOrderRatio"`
DryRun bool `json:"dryRun"`
markets map[string]types.Market
arbMarkets map[string]*ArbMarket
paths []*Path
session *bbgo.ExchangeSession
activeOrders *bbgo.ActiveOrderBook
orderStore *core.OrderStore
tradeCollector *core.TradeCollector
Position *MultiCurrencyPosition `persistence:"position"`
State *State `persistence:"state"`
TradeState *types.TradeStats `persistence:"trade_stats"`
sigC sigchan.Chan
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) InstanceID() string {
return ID + strings.Join(s.Symbols, "-")
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
if !s.SeparateStream {
for _, symbol := range s.Symbols {
session.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelFull,
})
}
}
}
func (s *Strategy) executeOrder(ctx context.Context, order types.SubmitOrder) *types.Order {
waitTime := 100 * time.Millisecond
for maxTries := 100; maxTries >= 0; maxTries-- {
createdOrder, err := s.session.Exchange.SubmitOrder(ctx, order)
if err != nil {
log.WithError(err).Errorf("can not submit orders")
time.Sleep(waitTime)
waitTime *= 2
continue
}
s.orderStore.Add(*createdOrder)
s.activeOrders.Add(*createdOrder)
return createdOrder
}
return nil
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
if s.TradeState == nil {
s.TradeState = types.NewTradeStats("")
}
s.Symbols = compileSymbols(s.Symbols)
if s.MarketOrderProtectiveRatio.IsZero() {
s.MarketOrderProtectiveRatio = marketOrderProtectiveRatio
}
if s.MinSpreadRatio.IsZero() {
s.MinSpreadRatio = fixedpoint.NewFromFloat(1.002)
}
if s.State == nil {
s.State = &State{}
}
s.markets = make(map[string]types.Market)
s.sigC = sigchan.New(10)
s.session = session
s.orderStore = core.NewOrderStore("")
s.orderStore.AddOrderUpdate = true
s.orderStore.BindStream(session.UserDataStream)
s.activeOrders = bbgo.NewActiveOrderBook("")
s.activeOrders.BindStream(session.UserDataStream)
s.tradeCollector = core.NewTradeCollector("", nil, s.orderStore)
for _, symbol := range s.Symbols {
market, ok := session.Market(symbol)
if !ok {
return fmt.Errorf("market not found: %s", symbol)
}
s.markets[symbol] = market
}
s.optimizeMarketQuantityPrecision()
arbMarkets, err := s.buildArbMarkets(session, s.Symbols, s.SeparateStream, s.sigC)
if err != nil {
return err
}
s.arbMarkets = arbMarkets
if s.Position == nil {
s.Position = NewMultiCurrencyPosition(s.markets)
}
if s.ResetPosition {
s.Position = NewMultiCurrencyPosition(s.markets)
}
s.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
s.Position.handleTrade(trade)
})
if s.NotifyTrade {
s.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
bbgo.Notify(trade)
})
}
s.tradeCollector.BindStream(session.UserDataStream)
for _, market := range s.arbMarkets {
m := market
if s.SeparateStream {
log.Infof("connecting %s market stream...", m.Symbol)
if err := m.stream.Connect(ctx); err != nil {
return err
}
}
}
// build paths
// rate update and check paths
for _, pathSymbols := range s.Paths {
if len(pathSymbols) != 3 {
return errors.New("a path must contains 3 symbols")
}
p := &Path{
marketA: s.arbMarkets[pathSymbols[0]],
marketB: s.arbMarkets[pathSymbols[1]],
marketC: s.arbMarkets[pathSymbols[2]],
}
if p.marketA == nil {
return fmt.Errorf("market object of %s is missing", pathSymbols[0])
}
if p.marketB == nil {
return fmt.Errorf("market object of %s is missing", pathSymbols[1])
}
if p.marketC == nil {
return fmt.Errorf("market object of %s is missing", pathSymbols[2])
}
if err := p.solveDirection(); err != nil {
return err
}
s.paths = append(s.paths, p)
}
go func() {
fs := []ratioFunction{calculateForwardRatio, calculateBackwardRate}
log.Infof("waiting for market prices ready...")
wait := true
for wait {
wait = false
for _, p := range s.paths {
if !p.Ready() {
wait = true
break
}
}
}
log.Infof("all markets ready")
for {
select {
case <-ctx.Done():
return
case <-s.sigC:
minRatio := s.MinSpreadRatio.Float64()
for side, f := range fs {
ranks := s.calculateRanks(minRatio, f)
if len(ranks) == 0 {
break
}
forward := side == 0
bestRank := ranks[0]
if forward {
log.Infof("%d paths elected, found best forward path %s profit %.5f%%", len(ranks), bestRank.Path, (bestRank.Ratio-1.0)*100.0)
} else {
log.Infof("%d paths elected, found best backward path %s profit %.5f%%", len(ranks), bestRank.Path, (bestRank.Ratio-1.0)*100.0)
}
s.executePath(ctx, session, bestRank.Path, bestRank.Ratio, forward)
}
}
}
}()
return nil
}
type ratioFunction func(p *Path) float64
func (s *Strategy) checkMinimalOrderQuantity(orders [3]types.SubmitOrder) error {
for _, order := range orders {
market := s.arbMarkets[order.Symbol]
if order.Quantity.Compare(market.market.MinQuantity) < 0 {
return fmt.Errorf("order quantity is too small: %f < %f", order.Quantity.Float64(), market.market.MinQuantity.Float64())
}
if order.Quantity.Mul(order.Price).Compare(market.market.MinNotional) < 0 {
return fmt.Errorf("order min notional is too small: %f < %f", order.Quantity.Mul(order.Price).Float64(), market.market.MinNotional.Float64())
}
}
return nil
}
func (s *Strategy) optimizeMarketQuantityPrecision() {
var baseMarkets = make(map[string][]types.Market)
for _, m := range s.markets {
baseMarkets[m.BaseCurrency] = append(baseMarkets[m.BaseCurrency], m)
}
for _, markets := range baseMarkets {
var prec = -1
for _, m := range markets {
if prec == -1 || m.VolumePrecision < prec {
prec = m.VolumePrecision
}
}
if prec == -1 {
continue
}
for _, m := range markets {
m.VolumePrecision = prec
s.markets[m.Symbol] = m
}
}
}
func (s *Strategy) applyBalanceMaxQuantity(balances types.BalanceMap) types.BalanceMap {
if s.Limits == nil {
return balances
}
for c, b := range balances {
if limit, ok := s.Limits[c]; ok {
b.Available = fixedpoint.Min(b.Available, limit)
balances[c] = b
}
}
return balances
}
func (s *Strategy) addBalanceBuffer(balances types.BalanceMap) (out types.BalanceMap) {
out = types.BalanceMap{}
for c, b := range balances {
ab := b
ab.Available = ab.Available.Mul(one.Sub(balanceBufferRatio))
out[c] = ab
}
return out
}
func (s *Strategy) toProtectiveMarketOrder(order types.SubmitOrder, ratio fixedpoint.Value) types.SubmitOrder {
sellRatio := one.Sub(ratio)
buyRatio := one.Add(ratio)
switch order.Side {
case types.SideTypeSell:
order.Price = order.Price.Mul(sellRatio)
case types.SideTypeBuy:
order.Price = order.Price.Mul(buyRatio)
}
return order
}
func (s *Strategy) toProtectiveMarketOrders(orders [3]types.SubmitOrder, ratio fixedpoint.Value) [3]types.SubmitOrder {
sellRatio := one.Sub(ratio)
buyRatio := one.Add(ratio)
for i, order := range orders {
switch order.Side {
case types.SideTypeSell:
order.Price = order.Price.Mul(sellRatio)
case types.SideTypeBuy:
order.Price = order.Price.Mul(buyRatio)
}
// order.Quantity = order.Market.TruncateQuantity(order.Quantity)
// order.Type = types.OrderTypeMarket
orders[i] = order
}
return orders
}
func (s *Strategy) executePath(ctx context.Context, session *bbgo.ExchangeSession, p *Path, ratio float64, dir bool) {
balances := session.Account.Balances()
balances = s.addBalanceBuffer(balances)
balances = s.applyBalanceMaxQuantity(balances)
var orders [3]types.SubmitOrder
if dir {
orders = p.newOrders(balances, 1)
} else {
orders = p.newOrders(balances, -1)
}
if err := s.checkMinimalOrderQuantity(orders); err != nil {
log.WithError(err).Warnf("order quantity too small, skip")
return
}
if s.DryRun {
logSubmitOrders(orders)
return
}
createdOrders, err := s.iocOrderExecution(ctx, session, orders, ratio)
if err != nil {
log.WithError(err).Errorf("order execute error")
return
}
if len(createdOrders) == 0 {
return
}
log.Info(s.Position.String())
profits := s.Position.CollectProfits()
profitInUSD := fixedpoint.Zero
for _, profit := range profits {
bbgo.Notify(&profit)
log.Info(profit.PlainText())
profitInUSD = profitInUSD.Add(profit.ProfitInUSD)
// FIXME:
// s.TradeState.Add(&profit)
}
notifyUsdPnL(profitInUSD)
log.Info(s.TradeState.BriefString())
bbgo.Sync(ctx, s)
if s.CoolingDownTime > 0 {
log.Infof("cooling down for %s", s.CoolingDownTime.Duration().String())
time.Sleep(s.CoolingDownTime.Duration())
}
}
func notifyUsdPnL(profit fixedpoint.Value) {
var title = fmt.Sprintf("Triangular Sum PnL ~= ")
title += style.PnLEmojiSimple(profit) + " "
title += style.PnLSignString(profit) + " USD"
bbgo.Notify(title)
}
func (s *Strategy) iocOrderExecution(ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64) (types.OrderSlice, error) {
service, ok := session.Exchange.(types.ExchangeOrderQueryService)
if !ok {
return nil, errors.New("exchange does not support ExchangeOrderQueryService")
}
var filledQuantity = fixedpoint.Zero
// Change the first order to IOC
orders[0].Type = types.OrderTypeLimit
orders[0].TimeInForce = types.TimeInForceIOC
var originalOrders [3]types.SubmitOrder
originalOrders[0] = orders[0]
originalOrders[1] = orders[1]
originalOrders[2] = orders[2]
logSubmitOrders(orders)
if !s.IocOrderRatio.IsZero() {
orders[0] = s.toProtectiveMarketOrder(orders[0], s.IocOrderRatio)
}
iocOrder := s.executeOrder(ctx, orders[0])
if iocOrder == nil {
return nil, errors.New("ioc order submit error")
}
iocOrderC := make(chan types.Order, 2)
defer func() {
close(iocOrderC)
}()
go func() {
o, err := s.waitWebSocketOrderDone(ctx, iocOrder.OrderID, 300*time.Millisecond)
if err != nil {
// log.WithError(err).Errorf("ioc order wait error")
return
} else if o != nil {
select {
case iocOrderC <- *o:
default:
}
}
}()
go func() {
o, err := waitForOrderFilled(ctx, service, *iocOrder, 3*time.Second)
if err != nil {
log.WithError(err).Errorf("ioc order restful wait error")
return
} else if o != nil {
select {
case iocOrderC <- *o:
default:
}
}
}()
o := <-iocOrderC
filledQuantity = o.ExecutedQuantity
if filledQuantity.IsZero() {
s.State.IOCLossTimes++
// we didn't get filled
log.Infof("%s %s IOC order did not get filled, skip: %+v", o.Symbol, o.Side, o)
return nil, nil
}
filledRatio := filledQuantity.Div(iocOrder.Quantity)
bbgo.Notify("%s %s IOC order got filled %f/%f (%s)", iocOrder.Symbol, iocOrder.Side, filledQuantity.Float64(), iocOrder.Quantity.Float64(), filledRatio.Percentage())
log.Infof("%s %s IOC order got filled %f/%f", iocOrder.Symbol, iocOrder.Side, filledQuantity.Float64(), iocOrder.Quantity.Float64())
orders[1].Quantity = orders[1].Quantity.Mul(filledRatio)
orders[2].Quantity = orders[2].Quantity.Mul(filledRatio)
if orders[1].Quantity.Compare(orders[1].Market.MinQuantity) <= 0 {
log.Warnf("order #2 quantity %f is less than min quantity %f, skip", orders[1].Quantity.Float64(), orders[1].Market.MinQuantity.Float64())
return nil, nil
}
if orders[2].Quantity.Compare(orders[2].Market.MinQuantity) <= 0 {
log.Warnf("order #3 quantity %f is less than min quantity %f, skip", orders[2].Quantity.Float64(), orders[2].Market.MinQuantity.Float64())
return nil, nil
}
orders[1] = s.toProtectiveMarketOrder(orders[1], s.MarketOrderProtectiveRatio)
orders[2] = s.toProtectiveMarketOrder(orders[2], s.MarketOrderProtectiveRatio)
var orderC = make(chan types.Order, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
o := s.executeOrder(ctx, orders[1])
orderC <- *o
wg.Done()
}()
go func() {
o := s.executeOrder(ctx, orders[2])
orderC <- *o
wg.Done()
}()
wg.Wait()
var createdOrders = make(types.OrderSlice, 3)
createdOrders[0] = *iocOrder
createdOrders[1] = <-orderC
createdOrders[2] = <-orderC
close(orderC)
orderTrades, updatedOrders, err := s.waitOrdersAndCollectTrades(ctx, service, createdOrders)
if err != nil {
log.WithError(err).Errorf("trade collecting error")
} else {
for i, order := range updatedOrders {
trades, hasTrades := orderTrades[order.OrderID]
if !hasTrades {
continue
}
averagePrice := tradeAveragePrice(trades, order.OrderID)
updatedOrders[i].AveragePrice = averagePrice
if market, hasMarket := s.markets[order.Symbol]; hasMarket {
updatedOrders[i].Market = market
}
for _, originalOrder := range originalOrders {
if originalOrder.Symbol == updatedOrders[i].Symbol {
updatedOrders[i].Price = originalOrder.Price
}
}
}
s.analyzeOrders(updatedOrders)
}
// update ioc winning ratio
s.State.IOCWinTimes++
if s.State.IOCLossTimes == 0 {
s.State.IOCWinningRatio = 999.0
} else {
s.State.IOCWinningRatio = float64(s.State.IOCWinTimes) / float64(s.State.IOCLossTimes)
}
log.Infof("ioc winning ratio update: %f", s.State.IOCWinningRatio)
return createdOrders, nil
}
func (s *Strategy) waitWebSocketOrderDone(ctx context.Context, orderID uint64, timeoutDuration time.Duration) (*types.Order, error) {
prof := util.StartTimeProfile("waitWebSocketOrderDone")
defer prof.StopAndLog(log.Infof)
if order, ok := s.orderStore.Get(orderID); ok {
if order.Status == types.OrderStatusFilled || order.Status == types.OrderStatusCanceled {
return &order, nil
}
}
timeoutC := time.After(timeoutDuration)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timeoutC:
return nil, fmt.Errorf("order wait time timeout %s", timeoutDuration)
case order := <-s.orderStore.C:
if orderID == order.OrderID && (order.Status == types.OrderStatusFilled || order.Status == types.OrderStatusCanceled) {
return &order, nil
}
}
}
}
func (s *Strategy) waitOrdersAndCollectTrades(ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice) (map[uint64][]types.Trade, types.OrderSlice, error) {
var err error
var orderTrades = make(map[uint64][]types.Trade)
var updatedOrders types.OrderSlice
for _, o := range createdOrders {
updatedOrder, err2 := waitForOrderFilled(ctx, service, o, time.Second)
if err2 != nil {
err = multierr.Append(err, err2)
continue
}
trades, err3 := service.QueryOrderTrades(ctx, types.OrderQuery{
Symbol: o.Symbol,
OrderID: strconv.FormatUint(o.OrderID, 10),
})
if err3 != nil {
err = multierr.Append(err, err3)
continue
}
for _, t := range trades {
s.tradeCollector.ProcessTrade(t)
}
orderTrades[o.OrderID] = trades
updatedOrders = append(updatedOrders, *updatedOrder)
}
/*
*/
return orderTrades, updatedOrders, nil
}
func (s *Strategy) analyzeOrders(orders types.OrderSlice) {
sort.Slice(orders, func(i, j int) bool {
// o1 < o2 -- earlier first
return orders[i].CreationTime.Before(orders[i].CreationTime.Time())
})
log.Infof("ANALYZING ORDERS (Earlier First)")
for i, o := range orders {
in, inCurrency := o.In()
out, outCurrency := o.Out()
log.Infof("#%d %s IN %f %s -> OUT %f %s", i, o.String(), in.Float64(), inCurrency, out.Float64(), outCurrency)
}
for _, o := range orders {
switch o.Side {
case types.SideTypeSell:
price := o.Price
priceDiff := o.AveragePrice.Sub(price)
slippage := priceDiff.Div(price)
log.Infof("%-8s %-4s %-10s AVG PRICE %f PRICE %f Q %f SLIPPAGE %.3f%%", o.Symbol, o.Side, o.Type, o.AveragePrice.Float64(), price.Float64(), o.Quantity.Float64(), slippage.Float64()*100.0)
case types.SideTypeBuy:
price := o.Price
priceDiff := price.Sub(o.AveragePrice)
slippage := priceDiff.Div(price)
log.Infof("%-8s %-4s %-10s AVG PRICE %f PRICE %f Q %f SLIPPAGE %.3f%%", o.Symbol, o.Side, o.Type, o.AveragePrice.Float64(), price.Float64(), o.Quantity.Float64(), slippage.Float64()*100.0)
}
}
}
func (s *Strategy) buildArbMarkets(session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan) (map[string]*ArbMarket, error) {
markets := make(map[string]*ArbMarket)
// build market object
for _, symbol := range symbols {
market, ok := s.markets[symbol]
if !ok {
return nil, fmt.Errorf("market not found: %s", symbol)
}
m := &ArbMarket{
Symbol: symbol,
market: market,
BaseCurrency: market.BaseCurrency,
QuoteCurrency: market.QuoteCurrency,
sigC: sigC,
}
if separateStream {
stream := session.Exchange.NewStream()
stream.SetPublicOnly()
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelFull,
Speed: types.SpeedHigh,
})
book := types.NewStreamBook(symbol)
priceUpdater := func(_ types.SliceOrderBook) {
bestAsk, bestBid, _ := book.BestBidAndAsk()
if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) {
return
}
m.bestBid = bestBid
m.bestAsk = bestAsk
m.updateRate()
}
book.OnUpdate(priceUpdater)
book.OnSnapshot(priceUpdater)
book.BindStream(stream)
stream.OnDisconnect(func() {
// reset price and volume
m.bestBid = types.PriceVolume{}
m.bestAsk = types.PriceVolume{}
})
m.book = book
m.stream = stream
} else {
book, _ := session.OrderBook(symbol)
priceUpdater := func(_ types.SliceOrderBook) {
bestAsk, bestBid, _ := book.BestBidAndAsk()
if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) {
return
}
m.bestBid = bestBid
m.bestAsk = bestAsk
m.updateRate()
}
book.OnUpdate(priceUpdater)
book.OnSnapshot(priceUpdater)
m.book = book
m.stream = session.MarketDataStream
}
markets[symbol] = m
}
return markets, nil
}
func (s *Strategy) calculateRanks(minRatio float64, method func(p *Path) float64) []PathRank {
ranks := make([]PathRank, 0, len(s.paths))
// ranking paths here
for _, path := range s.paths {
ratio := method(path)
if ratio < minRatio {
continue
}
p := path
ranks = append(ranks, PathRank{Path: p, Ratio: ratio})
}
// sort and pick up the top rank path
sort.Slice(ranks, func(i, j int) bool {
return ranks[i].Ratio > ranks[j].Ratio
})
return ranks
}
func waitForOrderFilled(ctx context.Context, ex types.ExchangeOrderQueryService, order types.Order, timeout time.Duration) (*types.Order, error) {
prof := util.StartTimeProfile("waitForOrderFilled")
defer prof.StopAndLog(log.Infof)
timeoutC := time.After(timeout)
for {
select {
case <-timeoutC:
return nil, fmt.Errorf("order wait timeout %s", timeout)
default:
p := util.StartTimeProfile("queryOrder")
remoteOrder, err2 := ex.QueryOrder(ctx, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})
p.StopAndLog(log.Infof)
if err2 != nil {
log.WithError(err2).Errorf("order query error")
time.Sleep(100 * time.Millisecond)
continue
}
switch remoteOrder.Status {
case types.OrderStatusFilled, types.OrderStatusCanceled:
return remoteOrder, nil
default:
log.Infof("WAITING: %s", remoteOrder.String())
time.Sleep(5 * time.Millisecond)
}
}
}
}
func tradeAveragePrice(trades []types.Trade, orderID uint64) fixedpoint.Value {
totalAmount := fixedpoint.Zero
totalQuantity := fixedpoint.Zero
for _, trade := range trades {
if trade.OrderID != orderID {
continue
}
totalAmount = totalAmount.Add(trade.Price.Mul(trade.Quantity))
totalQuantity = totalQuantity.Add(trade.Quantity)
}
return totalAmount.Div(totalQuantity)
}

View File

@ -0,0 +1,222 @@
//go:build !dnum
package tri
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/cache"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
var markets = make(types.MarketMap)
func init() {
var err error
markets, err = cache.LoadExchangeMarketsWithCache(context.Background(), &binance.Exchange{})
if err != nil {
panic(err)
}
}
func loadMarket(symbol string) types.Market {
if market, ok := markets[symbol]; ok {
return market
}
panic(fmt.Errorf("market %s not found", symbol))
}
func newArbMarket(symbol, base, quote string, askPrice, askVolume, bidPrice, bidVolume float64) *ArbMarket {
return &ArbMarket{
Symbol: symbol,
BaseCurrency: base,
QuoteCurrency: quote,
market: loadMarket(symbol),
book: nil,
bestBid: types.PriceVolume{
Price: fixedpoint.NewFromFloat(bidPrice),
Volume: fixedpoint.NewFromFloat(bidVolume),
},
bestAsk: types.PriceVolume{
Price: fixedpoint.NewFromFloat(askPrice),
Volume: fixedpoint.NewFromFloat(askVolume),
},
buyRate: 1.0 / askPrice,
sellRate: bidPrice,
}
}
func TestPath_calculateBackwardRatio(t *testing.T) {
// BTCUSDT 22800.0 22700.0
// ETHBTC 0.074, 0.073
// ETHUSDT 1630.0 1620.0
// sell BTCUSDT @ 22700 ( 0.1 BTC => 2270 USDT)
// buy ETHUSDT @ 1630 ( 2270 USDT => 1.3926380368 ETH)
// sell ETHBTC @ 0.073 (1.3926380368 ETH => 0.1016625767 BTC)
marketA := newArbMarket("BTCUSDT", "BTC", "USDT", 22800.0, 1.0, 22700.0, 1.0)
marketB := newArbMarket("ETHBTC", "ETH", "BTC", 0.074, 2.0, 0.073, 2.0)
marketC := newArbMarket("ETHUSDT", "ETH", "USDT", 1630.0, 2.0, 1620.0, 2.0)
path := &Path{
marketA: marketA,
marketB: marketB,
marketC: marketC,
dirA: -1,
dirB: -1,
dirC: 1,
}
ratio := calculateForwardRatio(path)
assert.Equal(t, 0.9601706970128022, ratio)
ratio = calculateBackwardRate(path)
assert.Equal(t, 1.0166257668711656, ratio)
}
func TestPath_CalculateForwardRatio(t *testing.T) {
// BTCUSDT 22800.0 22700.0
// ETHBTC 0.070, 0.069
// ETHUSDT 1630.0 1620.0
// buy BTCUSDT @ 22800 ( 2280 usdt => 0.1 BTC)
// buy ETHBTC @ 0.070 ( 0.1 BTC => 1.4285714286 ETH)
// sell ETHUSDT @ 1620 ( 1.4285714286 ETH => 2,314.285714332 USDT)
marketA := newArbMarket("BTCUSDT", "BTC", "USDT", 22800.0, 1.0, 22700.0, 1.0)
marketB := newArbMarket("ETHBTC", "ETH", "BTC", 0.070, 2.0, 0.069, 2.0)
marketC := newArbMarket("ETHUSDT", "ETH", "USDT", 1630.0, 2.0, 1620.0, 2.0)
path := &Path{
marketA: marketA,
marketB: marketB,
marketC: marketC,
dirA: -1,
dirB: -1,
dirC: 1,
}
ratio := calculateForwardRatio(path)
assert.Equal(t, 1.015037593984962, ratio)
ratio = calculateBackwardRate(path)
assert.Equal(t, 0.9609202453987732, ratio)
}
func TestPath_newForwardOrders(t *testing.T) {
// BTCUSDT 22800.0 22700.0
// ETHBTC 0.070, 0.069
// ETHUSDT 1630.0 1620.0
// buy BTCUSDT @ 22800 ( 2280 usdt => 0.1 BTC)
// buy ETHBTC @ 0.070 ( 0.1 BTC => 1.4285714286 ETH)
// sell ETHUSDT @ 1620 ( 1.4285714286 ETH => 2,314.285714332 USDT)
marketA := newArbMarket("BTCUSDT", "BTC", "USDT", 22800.0, 1.0, 22700.0, 1.0)
marketB := newArbMarket("ETHBTC", "ETH", "BTC", 0.070, 2.0, 0.069, 2.0)
marketC := newArbMarket("ETHUSDT", "ETH", "USDT", 1630.0, 2.0, 1620.0, 2.0)
path := &Path{
marketA: marketA,
marketB: marketB,
marketC: marketC,
dirA: -1,
dirB: -1,
dirC: 1,
}
orders := path.newOrders(types.BalanceMap{
"USDT": {
Currency: "USDT",
Available: fixedpoint.NewFromFloat(2280.0),
Locked: fixedpoint.Zero,
Borrowed: fixedpoint.Zero,
Interest: fixedpoint.Zero,
NetAsset: fixedpoint.Zero,
},
}, 1)
for i, order := range orders {
t.Logf("order #%d: %+v", i, order.String())
}
assert.InDelta(t, 2314.17, orders[2].Price.Mul(orders[2].Quantity).Float64(), 0.01)
}
func TestPath_newForwardOrdersWithAdjustRate(t *testing.T) {
// BTCUSDT 22800.0 22700.0
// ETHBTC 0.070, 0.069
// ETHUSDT 1630.0 1620.0
// buy BTCUSDT @ 22800 (2280 usdt => 0.1 BTC)
// buy ETHBTC @ 0.070 (0.1 BTC => 1.4285714286 ETH)
// APPLY ADJUST RATE B: 0.7 = 1 ETH / 1.4285714286 ETH
// buy BTCUSDT @ 22800 ( 1596 usdt => 0.07 BTC)
// buy ETHBTC @ 0.070 (0.07 BTC => 1 ETH)
// sell ETHUSDT @ 1620.0 (1 ETH => 1620 USDT)
// APPLY ADJUST RATE C: 0.5 = 0.5 ETH / 1 ETH
// buy BTCUSDT @ 22800 ( 798 usdt => 0.0035 BTC)
// buy ETHBTC @ 0.070 (0.035 BTC => 0.5 ETH)
// sell ETHUSDT @ 1620.0 (0.5 ETH => 1620 USDT)
// sell ETHUSDT @ 1620 ( 1.4285714286 ETH => 2,314.285714332 USDT)
marketA := newArbMarket("BTCUSDT", "BTC", "USDT", 22800.0, 1.0, 22700.0, 1.0)
marketB := newArbMarket("ETHBTC", "ETH", "BTC", 0.070, 1.0, 0.069, 2.0)
marketC := newArbMarket("ETHUSDT", "ETH", "USDT", 1630.0, 0.5, 1620.0, 0.5)
path := &Path{
marketA: marketA,
marketB: marketB,
marketC: marketC,
dirA: -1,
dirB: -1,
dirC: 1,
}
orders := path.newOrders(types.BalanceMap{
"USDT": {
Currency: "USDT",
Available: fixedpoint.NewFromFloat(2280.0),
Locked: fixedpoint.Zero,
Borrowed: fixedpoint.Zero,
Interest: fixedpoint.Zero,
NetAsset: fixedpoint.Zero,
},
}, 1)
for i, order := range orders {
t.Logf("order #%d: %+v", i, order.String())
}
assert.Equal(t, "0.03499", orders[0].Quantity.String())
assert.Equal(t, "0.5", orders[1].Quantity.String())
assert.Equal(t, "0.5", orders[2].Quantity.String())
}
func Test_fitQuantityByQuote(t *testing.T) {
type args struct {
price float64
quantity float64
quoteBalance float64
}
tests := []struct {
name string
args args
want float64
}{
{
name: "simple",
args: args{
price: 1630.0,
quantity: 2.0,
quoteBalance: 1000,
},
want: 0.6134969325153374,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := fitQuantityByQuote(tt.args.price, tt.args.quantity, tt.args.quoteBalance)
if !assert.Equal(t, got, tt.want) {
t.Errorf("fitQuantityByQuote() got = %v, want %v", got, tt.want)
}
})
}
}

2212
pkg/strategy/tri/symbols.go Normal file

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,33 @@
#!/bin/bash
echo '// Code generated by "bash symbols.sh"; DO NOT EDIT.' > symbols.go
echo 'package tri' >> symbols.go
max_symbols=$(curl -s https://max-api.maicoin.com/api/v2/markets | jq -r '.[].id | ascii_upcase')
binance_symbols=$(curl -s https://api.binance.com/api/v3/exchangeInfo | jq -r '.symbols[].symbol | ascii_upcase')
symbols=$(echo "$max_symbols$binance_symbols" | sort | uniq | grep -v -E '^[0-9]' | grep -v "DOWNUSDT" | grep -v "UPUSDT")
echo "$symbols" | perl -l -n -e 'BEGIN { print "const (" } END { print ")" } print qq{\t$_ = "$_"}' >> symbols.go
cat <<DOC >> symbols.go
var symbols = []string{
$(echo -e "$symbols" | tr '\n' ',')
}
func toSymbol(s string) string {
for _, symbol := range symbols {
if s == symbol {
return symbol
}
}
return s
}
func compileSymbols(symbols []string) []string {
var ss = make([]string, len(symbols))
for i, s := range symbols {
ss[i] = toSymbol(s)
}
return ss
}
DOC

28
pkg/strategy/tri/utils.go Normal file
View File

@ -0,0 +1,28 @@
package tri
import (
"math"
"github.com/c9s/bbgo/pkg/types"
)
func fitQuantityByBase(quantity, balance float64) (float64, float64) {
q := math.Min(quantity, balance)
r := q / balance
return q, r
}
// 1620 x 2 , quote balance = 1000 => rate = 1000/(1620*2) = 0.3086419753, quantity = 0.61728395
func fitQuantityByQuote(price, quantity, quoteBalance float64) (float64, float64) {
quote := quantity * price
minQuote := math.Min(quote, quoteBalance)
q := minQuote / price
r := minQuote / quoteBalance
return q, r
}
func logSubmitOrders(orders [3]types.SubmitOrder) {
for i, order := range orders {
log.Infof("SUBMIT ORDER #%d: %s", i, order.String())
}
}

View File

@ -67,7 +67,7 @@ type Strategy struct {
activeAdjustmentOrders *bbgo.ActiveOrderBook activeAdjustmentOrders *bbgo.ActiveOrderBook
activeWallOrders *bbgo.ActiveOrderBook activeWallOrders *bbgo.ActiveOrderBook
orderStore *core.OrderStore orderStore *core.OrderStore
tradeCollector *bbgo.TradeCollector tradeCollector *core.TradeCollector
groupID uint32 groupID uint32
@ -277,7 +277,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderStore = core.NewOrderStore(s.Symbol) s.orderStore = core.NewOrderStore(s.Symbol)
s.orderStore.BindStream(session.UserDataStream) s.orderStore.BindStream(session.UserDataStream)
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
bbgo.Notify(trade) bbgo.Notify(trade)

View File

@ -105,7 +105,7 @@ type Strategy struct {
hedgeErrorRateReservation *rate.Reservation hedgeErrorRateReservation *rate.Reservation
orderStore *core.OrderStore orderStore *core.OrderStore
tradeCollector *bbgo.TradeCollector tradeCollector *core.TradeCollector
askPriceHeartBeat, bidPriceHeartBeat types.PriceHeartBeat askPriceHeartBeat, bidPriceHeartBeat types.PriceHeartBeat
@ -737,7 +737,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.orderStore.BindStream(s.sourceSession.UserDataStream) s.orderStore.BindStream(s.sourceSession.UserDataStream)
s.orderStore.BindStream(s.makerSession.UserDataStream) s.orderStore.BindStream(s.makerSession.UserDataStream)
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
if s.NotifyTrade { if s.NotifyTrade {
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {

View File

@ -66,6 +66,7 @@ func (m Market) TruncateQuantity(quantity fixedpoint.Value) fixedpoint.Value {
qf := math.Trunc(quantity.Float64() * pow10) qf := math.Trunc(quantity.Float64() * pow10)
qf = qf / pow10 qf = qf / pow10
qs := strconv.FormatFloat(qf, 'f', prec, 64) qs := strconv.FormatFloat(qf, 'f', prec, 64)
return fixedpoint.MustNewFromString(qs) return fixedpoint.MustNewFromString(qs)
} }

View File

@ -12,6 +12,10 @@ type PriceVolume struct {
Price, Volume fixedpoint.Value Price, Volume fixedpoint.Value
} }
func (p PriceVolume) Equals(b PriceVolume) bool {
return p.Price.Eq(b.Price) && p.Volume.Eq(b.Volume)
}
func (p PriceVolume) String() string { func (p PriceVolume) String() string {
return fmt.Sprintf("PriceVolume{ price: %s, volume: %s }", p.Price.String(), p.Volume.String()) return fmt.Sprintf("PriceVolume{ price: %s, volume: %s }", p.Price.String(), p.Volume.String())
} }
@ -139,8 +143,7 @@ func (slice *PriceVolumeSlice) UnmarshalJSON(b []byte) error {
// ParsePriceVolumeSliceJSON tries to parse a 2 dimensional string array into a PriceVolumeSlice // ParsePriceVolumeSliceJSON tries to parse a 2 dimensional string array into a PriceVolumeSlice
// //
// [["9000", "10"], ["9900", "10"], ... ] // [["9000", "10"], ["9900", "10"], ... ]
//
func ParsePriceVolumeSliceJSON(b []byte) (slice PriceVolumeSlice, err error) { func ParsePriceVolumeSliceJSON(b []byte) (slice PriceVolumeSlice, err error) {
var as [][]fixedpoint.Value var as [][]fixedpoint.Value