Fix trade sync for self trades

MAX uses one single trade for presenting self trade.

BINANCE uses two trade records for presenting self trade. the trade
creation time are the same.
This commit is contained in:
c9s 2021-02-18 17:37:49 +08:00
parent c3dbb1b204
commit 0ba595bd55
9 changed files with 322 additions and 215 deletions

View File

@ -4,28 +4,26 @@ import (
"context"
"fmt"
"math"
"os"
"strings"
"syscall"
"time"
"github.com/joho/godotenv"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/types"
)
func init() {
rootCmd.PersistentFlags().String("exchange", "binance", "exchange name")
rootCmd.PersistentFlags().String("symbol", "SANDUSDT", "symbol")
}
var rootCmd = &cobra.Command{
Use: "binance-create-self-trade",
Use: "create-self-trade",
Short: "this program creates the self trade by getting the market ticker",
// SilenceUsage is an option to silence usage when an error occurs.
@ -35,21 +33,29 @@ var rootCmd = &cobra.Command{
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := godotenv.Load(".env.local") ; err != nil {
if err := godotenv.Load(".env.local"); err != nil {
log.Fatal(err)
}
key, secret := os.Getenv("BINANCE_API_KEY"), os.Getenv("BINANCE_API_SECRET")
if len(key) == 0 || len(secret) == 0 {
return errors.New("empty key or secret")
}
symbol, err := cmd.Flags().GetString("symbol")
if err != nil {
return err
}
var exchange = binance.New(key, secret)
exchangeNameStr, err := cmd.Flags().GetString("exchange")
if err != nil {
return err
}
exchangeName, err := types.ValidExchangeName(exchangeNameStr)
if err != nil {
return err
}
exchange, err := cmdutil.NewExchange(exchangeName)
if err != nil {
return err
}
markets, err := exchange.QueryMarkets(ctx)
if err != nil {
@ -80,32 +86,32 @@ var rootCmd = &cobra.Command{
price := ticker.Buy + market.TickSize
if int64(ticker.Sell * 1e8) == int64(price * 1e8) {
if int64(ticker.Sell*1e8) == int64(price*1e8) {
log.Fatal("zero spread, can not continue")
}
quantity := math.Max(market.MinNotional / price, market.MinQuantity) * 1.1
quantity := math.Max(market.MinNotional/price, market.MinQuantity) * 1.1
log.Infof("submiting order using quantity %f at price %f", quantity, price)
createdOrders, err := exchange.SubmitOrders(ctx, []types.SubmitOrder{
{
Symbol: symbol,
Market: market,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Price: price,
Quantity: quantity,
TimeInForce: "GTC",
Symbol: symbol,
Market: market,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Price: price,
Quantity: quantity,
TimeInForce: "GTC",
},
{
Symbol: symbol,
Market: market,
Side: types.SideTypeSell,
Type: types.OrderTypeLimit,
Price: price,
Quantity: quantity,
TimeInForce: "GTC",
Symbol: symbol,
Market: market,
Side: types.SideTypeSell,
Type: types.OrderTypeLimit,
Price: price,
Quantity: quantity,
TimeInForce: "GTC",
},
}...)

View File

@ -14,6 +14,8 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
var ErrUnimplemented = errors.New("unimplemented method")
type Exchange struct {
sourceName types.ExchangeName
publicExchange types.Exchange
@ -208,9 +210,14 @@ func (e Exchange) QueryTrades(ctx context.Context, symbol string, options *types
return nil, nil
}
func (e Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) {
// Not using Tickers in back test (yet)
return nil, ErrUnimplemented
}
func (e Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) {
// Not using Tickers in back test (yet)
return nil, nil
return nil, ErrUnimplemented
}
func (e Exchange) Name() types.ExchangeName {

188
pkg/exchange/batch/batch.go Normal file
View File

@ -0,0 +1,188 @@
package batch
import (
"context"
"time"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/types"
)
type ExchangeBatchProcessor struct {
types.Exchange
}
func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) {
c = make(chan types.Order, 500)
errC = make(chan error, 1)
go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
defer close(c)
defer close(errC)
orderIDs := make(map[uint64]struct{}, 500)
if lastOrderID > 0 {
orderIDs[lastOrderID] = struct{}{}
}
for startTime.Before(endTime) {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
logrus.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime)
orders, err := e.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID)
if err != nil {
errC <- err
return
}
if len(orders) == 0 || (len(orders) == 1 && orders[0].OrderID == lastOrderID) {
return
}
for _, o := range orders {
if _, ok := orderIDs[o.OrderID]; ok {
logrus.Infof("skipping duplicated order id: %d", o.OrderID)
continue
}
c <- o
startTime = o.CreationTime.Time()
lastOrderID = o.OrderID
orderIDs[o.OrderID] = struct{}{}
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan types.KLine, errC chan error) {
c = make(chan types.KLine, 1000)
errC = make(chan error, 1)
go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
defer close(c)
defer close(errC)
for startTime.Before(endTime) {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
})
if err != nil {
errC <- err
return
}
if len(kLines) == 0 {
return
}
for _, kline := range kLines {
// ignore any kline before the given start time
if kline.StartTime.Before(startTime) {
continue
}
if kline.EndTime.After(endTime) {
return
}
c <- kline
startTime = kline.EndTime
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (c chan types.Trade, errC chan error) {
c = make(chan types.Trade, 500)
errC = make(chan error, 1)
var lastTradeID = options.LastTradeID
// last 7 days
var startTime = time.Now().Add(-7 * 24 * time.Hour)
if options.StartTime != nil {
startTime = *options.StartTime
}
go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
defer close(c)
defer close(errC)
for {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
logrus.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit)
var err error
var trades []types.Trade
switch ex := e.Exchange.(type) {
case *binance.Exchange:
trades, err = ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
Limit: options.Limit,
// only for MAX right now. binance is not using it, since we need to handle self-trade for binance
LastTradeID: lastTradeID,
})
default:
trades, err = ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
Limit: options.Limit,
// only for MAX right now. binance is not using it, since we need to handle self-trade for binance
LastTradeID: lastTradeID,
})
}
if err != nil {
errC <- err
return
}
if len(trades) == 0 {
break
}
logrus.Infof("returned %d trades", len(trades))
// increase the window to the next time frame by adding 1 millisecond
startTime = time.Time(trades[len(trades)-1].Time)
for _, t := range trades {
lastTradeID = t.ID
// ignore the first trade if last TradeID is given
c <- t
}
}
}()
return c, errC
}

View File

@ -62,14 +62,20 @@ func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticke
}
func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) {
var ret = make(map[string]types.Ticker)
listPriceChangeStatsService := e.Client.NewListPriceChangeStatsService()
var tickers = make(map[string]types.Ticker)
if len(symbol) == 1 {
listPriceChangeStatsService.Symbol(symbol[0])
ticker, err := e.QueryTicker(ctx, symbol[0])
if err != nil {
return nil, err
}
tickers[strings.ToUpper(symbol[0])] = *ticker
return tickers, nil
}
changeStats, err := listPriceChangeStatsService.Do(ctx)
var req = e.Client.NewListPriceChangeStatsService()
changeStats, err := req.Do(ctx)
if err != nil {
return nil, err
}
@ -97,10 +103,10 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri
Time: time.Unix(0, stats.CloseTime*int64(time.Millisecond)),
}
ret[stats.Symbol] = tick
tickers[stats.Symbol] = tick
}
return ret, nil
return tickers, nil
}
func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
@ -564,7 +570,6 @@ func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder)
}
}
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeStopMarket:
if len(order.StopPriceString) == 0 {
@ -695,17 +700,21 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
if options.Limit > 0 {
req.Limit(int(options.Limit))
} else {
req.Limit(1000)
}
if options.StartTime != nil {
req.StartTime(options.StartTime.UnixNano() / int64(time.Millisecond))
}
if options.EndTime != nil {
req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond))
}
// BINANCE uses inclusive last trade ID, so we need to add by 1
if options.LastTradeID > 0 {
req.FromID(options.LastTradeID)
req.FromID(options.LastTradeID + 1)
}
remoteTrades, err = req.Do(ctx)
@ -714,11 +723,12 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}
} else {
req := e.Client.NewListTradesService().
Limit(1000).
Symbol(symbol)
if options.Limit > 0 {
req.Limit(int(options.Limit))
} else {
req.Limit(1000)
}
if options.StartTime != nil {
@ -727,8 +737,10 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
if options.EndTime != nil {
req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond))
}
// BINANCE uses inclusive last trade ID, so we need to add by 1
if options.LastTradeID > 0 {
req.FromID(options.LastTradeID)
req.FromID(options.LastTradeID + 1)
}
remoteTrades, err = req.Do(ctx)

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math"
"os"
"strconv"
"time"
"github.com/google/uuid"
@ -43,24 +44,34 @@ func (e *Exchange) Name() types.ExchangeName {
return types.ExchangeMax
}
func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) {
ticker, err := e.client.PublicService.Ticker(toLocalSymbol(symbol))
if err != nil {
return nil, err
}
return &types.Ticker{
Time: ticker.Time,
Volume: util.MustParseFloat(ticker.Volume),
Last: util.MustParseFloat(ticker.Last),
Open: util.MustParseFloat(ticker.Open),
High: util.MustParseFloat(ticker.High),
Low: util.MustParseFloat(ticker.Low),
Buy: util.MustParseFloat(ticker.Buy),
Sell: util.MustParseFloat(ticker.Sell),
}, nil
}
func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) {
var ret = make(map[string]types.Ticker)
if len(symbol) == 1 {
ticker, err := e.client.PublicService.Ticker(toLocalSymbol(symbol[0]))
ticker, err := e.QueryTicker(ctx, symbol[0])
if err != nil {
return nil, err
}
ret[toGlobalSymbol(symbol[0])] = types.Ticker{
Time: ticker.Time,
Volume: util.MustParseFloat(ticker.Volume),
Last: util.MustParseFloat(ticker.Last),
Open: util.MustParseFloat(ticker.Open),
High: util.MustParseFloat(ticker.High),
Low: util.MustParseFloat(ticker.Low),
Buy: util.MustParseFloat(ticker.Buy),
Sell: util.MustParseFloat(ticker.Sell),
}
ret[toGlobalSymbol(symbol[0])] = *ticker
} else {
tickers, err := e.client.PublicService.Tickers()
if err != nil {
@ -285,8 +296,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder
req := e.client.OrderService.NewCreateOrderRequest().
Market(toLocalSymbol(order.Symbol)).
OrderType(string(orderType)).
Side(toLocalSideType(order.Side)).
Volume(order.QuantityString)
Side(toLocalSideType(order.Side))
if len(order.ClientOrderID) > 0 {
req.ClientOrderID(order.ClientOrderID)
@ -295,6 +305,25 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder
req.ClientOrderID(clientOrderID)
}
if len(order.QuantityString) > 0 {
req.Volume(order.QuantityString)
} else if order.Market.Symbol != "" {
req.Volume(order.Market.FormatQuantity(order.Quantity))
} else {
req.Volume(strconv.FormatFloat(order.Quantity, 'f', 8, 64))
}
// set price field for limit orders
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeLimit:
if len(order.PriceString) > 0 {
req.Price(order.PriceString)
} else if order.Market.Symbol != "" {
req.Price(order.Market.FormatPrice(order.Price))
}
}
// set stop price field for limit orders
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeStopMarket:
if len(order.StopPriceString) == 0 {
@ -502,6 +531,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
req.Limit(500)
}
// MAX uses exclusive last trade ID
if options.LastTradeID > 0 {
req.From(options.LastTradeID)
}

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
@ -31,7 +32,7 @@ func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, sym
startTime = lastKLine.StartTime.Add(time.Minute)
}
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
batch := &batch2.ExchangeBatchProcessor{Exchange: exchange}
// should use channel here
klineC, errC := batch.BatchQueryKLines(ctx, symbol, interval, startTime, now)

View File

@ -6,6 +6,7 @@ import (
"github.com/sirupsen/logrus"
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
@ -39,7 +40,7 @@ func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, s
logrus.Infof("found last order, start from lastID = %d since %s", lastID, startTime)
}
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
batch := &batch2.ExchangeBatchProcessor{Exchange: exchange}
ordersC, errC := batch.BatchQueryClosedOrders(ctx, symbol, startTime, time.Now(), lastID)
for order := range ordersC {
select {
@ -81,14 +82,17 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
return err
}
var lastTradeID int64 = 0
if lastTrade != nil {
startTime = time.Time(lastTrade.Time).Add(time.Millisecond)
lastTradeID = lastTrade.ID
logrus.Infof("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime)
}
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
batch := &batch2.ExchangeBatchProcessor{Exchange: exchange}
tradeC, errC := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
LastTradeID: lastTradeID,
})
for trade := range tradeC {
@ -107,7 +111,6 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
if err := s.TradeService.Insert(trade); err != nil {
return err
}
}
return <-errC

View File

@ -1,162 +1,2 @@
package types
import (
"context"
"time"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
type ExchangeBatchProcessor struct {
Exchange
}
func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan Order, errC chan error) {
c = make(chan Order, 500)
errC = make(chan error, 1)
go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
defer close(c)
defer close(errC)
orderIDs := make(map[uint64]struct{}, 500)
if lastOrderID > 0 {
orderIDs[lastOrderID] = struct{}{}
}
for startTime.Before(endTime) {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
logrus.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime)
orders, err := e.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID)
if err != nil {
errC <- err
return
}
if len(orders) == 0 || (len(orders) == 1 && orders[0].OrderID == lastOrderID) {
return
}
for _, o := range orders {
if _, ok := orderIDs[o.OrderID]; ok {
logrus.Infof("skipping duplicated order id: %d", o.OrderID)
continue
}
c <- o
startTime = o.CreationTime.Time()
lastOrderID = o.OrderID
orderIDs[o.OrderID] = struct{}{}
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval Interval, startTime, endTime time.Time) (c chan KLine, errC chan error) {
c = make(chan KLine, 1000)
errC = make(chan error, 1)
go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
defer close(c)
defer close(errC)
for startTime.Before(endTime) {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
})
if err != nil {
errC <- err
return
}
if len(kLines) == 0 {
return
}
for _, kline := range kLines {
// ignore any kline before the given start time
if kline.StartTime.Before(startTime) {
continue
}
if kline.EndTime.After(endTime) {
return
}
c <- kline
startTime = kline.EndTime
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (c chan Trade, errC chan error) {
c = make(chan Trade, 500)
errC = make(chan error, 1)
// last 7 days
var startTime = time.Now().Add(-7 * 24 * time.Hour)
if options.StartTime != nil {
startTime = *options.StartTime
}
go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
defer close(c)
defer close(errC)
for {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
logrus.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit)
trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{
StartTime: &startTime,
Limit: options.Limit,
// LastTradeID: lastTradeID,
})
if err != nil {
errC <- err
return
}
if len(trades) == 0 {
break
}
logrus.Infof("returned %d trades", len(trades))
// increase the window to the next time frame by adding 1 millisecond
startTime = time.Time(trades[len(trades)-1].Time).Add(time.Millisecond)
for _, t := range trades {
// ignore the first trade if last TradeID is given
c <- t
}
}
}()
return c, errC
}

View File

@ -2,6 +2,8 @@ package types
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
@ -12,6 +14,22 @@ const DateFormat = "2006-01-02"
type ExchangeName string
func (n *ExchangeName) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s) ; err != nil {
return err
}
switch s {
case "max", "binance", "ftx":
*n = ExchangeName(s)
return nil
}
return fmt.Errorf("unknown or unsupported exchange name: %s, valid names are: max, binance, ftx", s)
}
func (n ExchangeName) String() string {
return string(n)
}
@ -46,6 +64,8 @@ type Exchange interface {
QueryAccountBalances(ctx context.Context) (BalanceMap, error)
QueryTicker(ctx context.Context, symbol string) (*Ticker, error)
QueryTickers(ctx context.Context, symbol ...string) (map[string]Ticker, error)
QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error)