diff --git a/examples/binance-create-self-trade/main.go b/examples/create-self-trade/main.go similarity index 68% rename from examples/binance-create-self-trade/main.go rename to examples/create-self-trade/main.go index 70a2d8f33..42c3d236c 100644 --- a/examples/binance-create-self-trade/main.go +++ b/examples/create-self-trade/main.go @@ -4,28 +4,26 @@ import ( "context" "fmt" "math" - "os" "strings" "syscall" "time" "github.com/joho/godotenv" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/c9s/bbgo/pkg/cmd/cmdutil" - "github.com/c9s/bbgo/pkg/exchange/binance" "github.com/c9s/bbgo/pkg/types" ) func init() { + rootCmd.PersistentFlags().String("exchange", "binance", "exchange name") rootCmd.PersistentFlags().String("symbol", "SANDUSDT", "symbol") } var rootCmd = &cobra.Command{ - Use: "binance-create-self-trade", + Use: "create-self-trade", Short: "this program creates the self trade by getting the market ticker", // SilenceUsage is an option to silence usage when an error occurs. @@ -35,21 +33,29 @@ var rootCmd = &cobra.Command{ ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if err := godotenv.Load(".env.local") ; err != nil { + if err := godotenv.Load(".env.local"); err != nil { log.Fatal(err) } - key, secret := os.Getenv("BINANCE_API_KEY"), os.Getenv("BINANCE_API_SECRET") - if len(key) == 0 || len(secret) == 0 { - return errors.New("empty key or secret") - } - symbol, err := cmd.Flags().GetString("symbol") if err != nil { return err } - var exchange = binance.New(key, secret) + exchangeNameStr, err := cmd.Flags().GetString("exchange") + if err != nil { + return err + } + + exchangeName, err := types.ValidExchangeName(exchangeNameStr) + if err != nil { + return err + } + + exchange, err := cmdutil.NewExchange(exchangeName) + if err != nil { + return err + } markets, err := exchange.QueryMarkets(ctx) if err != nil { @@ -80,32 +86,32 @@ var rootCmd = &cobra.Command{ price := ticker.Buy + market.TickSize - if int64(ticker.Sell * 1e8) == int64(price * 1e8) { + if int64(ticker.Sell*1e8) == int64(price*1e8) { log.Fatal("zero spread, can not continue") } - quantity := math.Max(market.MinNotional / price, market.MinQuantity) * 1.1 + quantity := math.Max(market.MinNotional/price, market.MinQuantity) * 1.1 log.Infof("submiting order using quantity %f at price %f", quantity, price) createdOrders, err := exchange.SubmitOrders(ctx, []types.SubmitOrder{ { - Symbol: symbol, - Market: market, - Side: types.SideTypeBuy, - Type: types.OrderTypeLimit, - Price: price, - Quantity: quantity, - TimeInForce: "GTC", + Symbol: symbol, + Market: market, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimit, + Price: price, + Quantity: quantity, + TimeInForce: "GTC", }, { - Symbol: symbol, - Market: market, - Side: types.SideTypeSell, - Type: types.OrderTypeLimit, - Price: price, - Quantity: quantity, - TimeInForce: "GTC", + Symbol: symbol, + Market: market, + Side: types.SideTypeSell, + Type: types.OrderTypeLimit, + Price: price, + Quantity: quantity, + TimeInForce: "GTC", }, }...) diff --git a/pkg/backtest/exchange.go b/pkg/backtest/exchange.go index 9c3b4b533..6689f9fef 100644 --- a/pkg/backtest/exchange.go +++ b/pkg/backtest/exchange.go @@ -14,6 +14,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +var ErrUnimplemented = errors.New("unimplemented method") + type Exchange struct { sourceName types.ExchangeName publicExchange types.Exchange @@ -208,9 +210,14 @@ func (e Exchange) QueryTrades(ctx context.Context, symbol string, options *types return nil, nil } +func (e Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) { + // Not using Tickers in back test (yet) + return nil, ErrUnimplemented +} + func (e Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) { // Not using Tickers in back test (yet) - return nil, nil + return nil, ErrUnimplemented } func (e Exchange) Name() types.ExchangeName { diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go new file mode 100644 index 000000000..07eb4855a --- /dev/null +++ b/pkg/exchange/batch/batch.go @@ -0,0 +1,188 @@ +package batch + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" + "golang.org/x/time/rate" + + "github.com/c9s/bbgo/pkg/exchange/binance" + "github.com/c9s/bbgo/pkg/types" +) + +type ExchangeBatchProcessor struct { + types.Exchange +} + +func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) { + c = make(chan types.Order, 500) + errC = make(chan error, 1) + + go func() { + limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) + + defer close(c) + defer close(errC) + + orderIDs := make(map[uint64]struct{}, 500) + if lastOrderID > 0 { + orderIDs[lastOrderID] = struct{}{} + } + + for startTime.Before(endTime) { + if err := limiter.Wait(ctx); err != nil { + logrus.WithError(err).Error("rate limit error") + } + + logrus.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime) + + orders, err := e.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID) + if err != nil { + errC <- err + return + } + + if len(orders) == 0 || (len(orders) == 1 && orders[0].OrderID == lastOrderID) { + return + } + + for _, o := range orders { + if _, ok := orderIDs[o.OrderID]; ok { + logrus.Infof("skipping duplicated order id: %d", o.OrderID) + continue + } + + c <- o + startTime = o.CreationTime.Time() + lastOrderID = o.OrderID + orderIDs[o.OrderID] = struct{}{} + } + } + + }() + + return c, errC +} + +func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan types.KLine, errC chan error) { + c = make(chan types.KLine, 1000) + errC = make(chan error, 1) + + go func() { + limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) + + defer close(c) + defer close(errC) + + for startTime.Before(endTime) { + if err := limiter.Wait(ctx); err != nil { + logrus.WithError(err).Error("rate limit error") + } + + kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ + StartTime: &startTime, + Limit: 1000, + }) + + if err != nil { + errC <- err + return + } + + if len(kLines) == 0 { + return + } + + for _, kline := range kLines { + // ignore any kline before the given start time + if kline.StartTime.Before(startTime) { + continue + } + + if kline.EndTime.After(endTime) { + return + } + + c <- kline + startTime = kline.EndTime + } + } + }() + + return c, errC +} + +func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (c chan types.Trade, errC chan error) { + c = make(chan types.Trade, 500) + errC = make(chan error, 1) + + var lastTradeID = options.LastTradeID + + // last 7 days + var startTime = time.Now().Add(-7 * 24 * time.Hour) + if options.StartTime != nil { + startTime = *options.StartTime + } + + go func() { + limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) + + defer close(c) + defer close(errC) + + for { + if err := limiter.Wait(ctx); err != nil { + logrus.WithError(err).Error("rate limit error") + } + + logrus.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit) + + var err error + var trades []types.Trade + + switch ex := e.Exchange.(type) { + + case *binance.Exchange: + trades, err = ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ + StartTime: &startTime, + Limit: options.Limit, + + // only for MAX right now. binance is not using it, since we need to handle self-trade for binance + LastTradeID: lastTradeID, + }) + + default: + trades, err = ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ + StartTime: &startTime, + Limit: options.Limit, + + // only for MAX right now. binance is not using it, since we need to handle self-trade for binance + LastTradeID: lastTradeID, + }) + } + + if err != nil { + errC <- err + return + } + + if len(trades) == 0 { + break + } + + logrus.Infof("returned %d trades", len(trades)) + + // increase the window to the next time frame by adding 1 millisecond + startTime = time.Time(trades[len(trades)-1].Time) + for _, t := range trades { + lastTradeID = t.ID + + // ignore the first trade if last TradeID is given + c <- t + } + } + }() + + return c, errC +} diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index e9751a586..b62b8eeae 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -62,14 +62,20 @@ func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticke } func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) { - var ret = make(map[string]types.Ticker) - listPriceChangeStatsService := e.Client.NewListPriceChangeStatsService() + var tickers = make(map[string]types.Ticker) if len(symbol) == 1 { - listPriceChangeStatsService.Symbol(symbol[0]) + ticker, err := e.QueryTicker(ctx, symbol[0]) + if err != nil { + return nil, err + } + + tickers[strings.ToUpper(symbol[0])] = *ticker + return tickers, nil } - changeStats, err := listPriceChangeStatsService.Do(ctx) + var req = e.Client.NewListPriceChangeStatsService() + changeStats, err := req.Do(ctx) if err != nil { return nil, err } @@ -97,10 +103,10 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri Time: time.Unix(0, stats.CloseTime*int64(time.Millisecond)), } - ret[stats.Symbol] = tick + tickers[stats.Symbol] = tick } - return ret, nil + return tickers, nil } func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) { @@ -564,7 +570,6 @@ func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder) } } - switch order.Type { case types.OrderTypeStopLimit, types.OrderTypeStopMarket: if len(order.StopPriceString) == 0 { @@ -695,17 +700,21 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type if options.Limit > 0 { req.Limit(int(options.Limit)) + } else { + req.Limit(1000) } if options.StartTime != nil { req.StartTime(options.StartTime.UnixNano() / int64(time.Millisecond)) } + if options.EndTime != nil { req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond)) } + // BINANCE uses inclusive last trade ID, so we need to add by 1 if options.LastTradeID > 0 { - req.FromID(options.LastTradeID) + req.FromID(options.LastTradeID + 1) } remoteTrades, err = req.Do(ctx) @@ -714,11 +723,12 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type } } else { req := e.Client.NewListTradesService(). - Limit(1000). Symbol(symbol) if options.Limit > 0 { req.Limit(int(options.Limit)) + } else { + req.Limit(1000) } if options.StartTime != nil { @@ -727,8 +737,10 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type if options.EndTime != nil { req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond)) } + + // BINANCE uses inclusive last trade ID, so we need to add by 1 if options.LastTradeID > 0 { - req.FromID(options.LastTradeID) + req.FromID(options.LastTradeID + 1) } remoteTrades, err = req.Do(ctx) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 9d2ede655..da4de1e9c 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "os" + "strconv" "time" "github.com/google/uuid" @@ -43,24 +44,34 @@ func (e *Exchange) Name() types.ExchangeName { return types.ExchangeMax } +func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) { + ticker, err := e.client.PublicService.Ticker(toLocalSymbol(symbol)) + if err != nil { + return nil, err + } + + return &types.Ticker{ + Time: ticker.Time, + Volume: util.MustParseFloat(ticker.Volume), + Last: util.MustParseFloat(ticker.Last), + Open: util.MustParseFloat(ticker.Open), + High: util.MustParseFloat(ticker.High), + Low: util.MustParseFloat(ticker.Low), + Buy: util.MustParseFloat(ticker.Buy), + Sell: util.MustParseFloat(ticker.Sell), + }, nil +} + func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) { var ret = make(map[string]types.Ticker) if len(symbol) == 1 { - ticker, err := e.client.PublicService.Ticker(toLocalSymbol(symbol[0])) + ticker, err := e.QueryTicker(ctx, symbol[0]) if err != nil { return nil, err } - ret[toGlobalSymbol(symbol[0])] = types.Ticker{ - Time: ticker.Time, - Volume: util.MustParseFloat(ticker.Volume), - Last: util.MustParseFloat(ticker.Last), - Open: util.MustParseFloat(ticker.Open), - High: util.MustParseFloat(ticker.High), - Low: util.MustParseFloat(ticker.Low), - Buy: util.MustParseFloat(ticker.Buy), - Sell: util.MustParseFloat(ticker.Sell), - } + + ret[toGlobalSymbol(symbol[0])] = *ticker } else { tickers, err := e.client.PublicService.Tickers() if err != nil { @@ -285,8 +296,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder req := e.client.OrderService.NewCreateOrderRequest(). Market(toLocalSymbol(order.Symbol)). OrderType(string(orderType)). - Side(toLocalSideType(order.Side)). - Volume(order.QuantityString) + Side(toLocalSideType(order.Side)) if len(order.ClientOrderID) > 0 { req.ClientOrderID(order.ClientOrderID) @@ -295,6 +305,25 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder req.ClientOrderID(clientOrderID) } + if len(order.QuantityString) > 0 { + req.Volume(order.QuantityString) + } else if order.Market.Symbol != "" { + req.Volume(order.Market.FormatQuantity(order.Quantity)) + } else { + req.Volume(strconv.FormatFloat(order.Quantity, 'f', 8, 64)) + } + + // set price field for limit orders + switch order.Type { + case types.OrderTypeStopLimit, types.OrderTypeLimit: + if len(order.PriceString) > 0 { + req.Price(order.PriceString) + } else if order.Market.Symbol != "" { + req.Price(order.Market.FormatPrice(order.Price)) + } + } + + // set stop price field for limit orders switch order.Type { case types.OrderTypeStopLimit, types.OrderTypeStopMarket: if len(order.StopPriceString) == 0 { @@ -502,6 +531,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type req.Limit(500) } + // MAX uses exclusive last trade ID if options.LastTradeID > 0 { req.From(options.LastTradeID) } diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index c44d99298..4acbfca52 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + batch2 "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -31,7 +32,7 @@ func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, sym startTime = lastKLine.StartTime.Add(time.Minute) } - batch := &types.ExchangeBatchProcessor{Exchange: exchange} + batch := &batch2.ExchangeBatchProcessor{Exchange: exchange} // should use channel here klineC, errC := batch.BatchQueryKLines(ctx, symbol, interval, startTime, now) diff --git a/pkg/service/sync.go b/pkg/service/sync.go index ed5ff342d..fd8432216 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -6,6 +6,7 @@ import ( "github.com/sirupsen/logrus" + batch2 "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -39,7 +40,7 @@ func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, s logrus.Infof("found last order, start from lastID = %d since %s", lastID, startTime) } - batch := &types.ExchangeBatchProcessor{Exchange: exchange} + batch := &batch2.ExchangeBatchProcessor{Exchange: exchange} ordersC, errC := batch.BatchQueryClosedOrders(ctx, symbol, startTime, time.Now(), lastID) for order := range ordersC { select { @@ -81,14 +82,17 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s return err } + var lastTradeID int64 = 0 if lastTrade != nil { startTime = time.Time(lastTrade.Time).Add(time.Millisecond) + lastTradeID = lastTrade.ID logrus.Infof("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime) } - batch := &types.ExchangeBatchProcessor{Exchange: exchange} + batch := &batch2.ExchangeBatchProcessor{Exchange: exchange} tradeC, errC := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ StartTime: &startTime, + LastTradeID: lastTradeID, }) for trade := range tradeC { @@ -107,7 +111,6 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s if err := s.TradeService.Insert(trade); err != nil { return err } - } return <-errC diff --git a/pkg/types/batch.go b/pkg/types/batch.go index 2b371cd51..5bb13ecf9 100644 --- a/pkg/types/batch.go +++ b/pkg/types/batch.go @@ -1,162 +1,2 @@ package types -import ( - "context" - "time" - - "github.com/sirupsen/logrus" - "golang.org/x/time/rate" -) - -type ExchangeBatchProcessor struct { - Exchange -} - -func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan Order, errC chan error) { - c = make(chan Order, 500) - errC = make(chan error, 1) - - go func() { - limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) - - defer close(c) - defer close(errC) - - orderIDs := make(map[uint64]struct{}, 500) - if lastOrderID > 0 { - orderIDs[lastOrderID] = struct{}{} - } - - for startTime.Before(endTime) { - if err := limiter.Wait(ctx); err != nil { - logrus.WithError(err).Error("rate limit error") - } - - logrus.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime) - - orders, err := e.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID) - if err != nil { - errC <- err - return - } - - if len(orders) == 0 || (len(orders) == 1 && orders[0].OrderID == lastOrderID) { - return - } - - for _, o := range orders { - if _, ok := orderIDs[o.OrderID]; ok { - logrus.Infof("skipping duplicated order id: %d", o.OrderID) - continue - } - - c <- o - startTime = o.CreationTime.Time() - lastOrderID = o.OrderID - orderIDs[o.OrderID] = struct{}{} - } - } - - }() - - return c, errC -} - -func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval Interval, startTime, endTime time.Time) (c chan KLine, errC chan error) { - c = make(chan KLine, 1000) - errC = make(chan error, 1) - - go func() { - limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) - - defer close(c) - defer close(errC) - - for startTime.Before(endTime) { - if err := limiter.Wait(ctx); err != nil { - logrus.WithError(err).Error("rate limit error") - } - - kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{ - StartTime: &startTime, - Limit: 1000, - }) - - if err != nil { - errC <- err - return - } - - if len(kLines) == 0 { - return - } - - for _, kline := range kLines { - // ignore any kline before the given start time - if kline.StartTime.Before(startTime) { - continue - } - - if kline.EndTime.After(endTime) { - return - } - - c <- kline - startTime = kline.EndTime - } - } - }() - - return c, errC -} - -func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (c chan Trade, errC chan error) { - c = make(chan Trade, 500) - errC = make(chan error, 1) - - // last 7 days - var startTime = time.Now().Add(-7 * 24 * time.Hour) - if options.StartTime != nil { - startTime = *options.StartTime - } - - go func() { - limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) - - defer close(c) - defer close(errC) - - for { - if err := limiter.Wait(ctx); err != nil { - logrus.WithError(err).Error("rate limit error") - } - - logrus.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit) - - trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{ - StartTime: &startTime, - Limit: options.Limit, - // LastTradeID: lastTradeID, - }) - if err != nil { - errC <- err - return - } - - if len(trades) == 0 { - break - } - - logrus.Infof("returned %d trades", len(trades)) - - // increase the window to the next time frame by adding 1 millisecond - startTime = time.Time(trades[len(trades)-1].Time).Add(time.Millisecond) - for _, t := range trades { - // ignore the first trade if last TradeID is given - c <- t - } - } - }() - - return c, errC -} diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index d692f6002..e65f5b57f 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -2,6 +2,8 @@ package types import ( "context" + "encoding/json" + "fmt" "strings" "time" @@ -12,6 +14,22 @@ const DateFormat = "2006-01-02" type ExchangeName string +func (n *ExchangeName) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s) ; err != nil { + return err + } + + switch s { + case "max", "binance", "ftx": + *n = ExchangeName(s) + return nil + + } + + return fmt.Errorf("unknown or unsupported exchange name: %s, valid names are: max, binance, ftx", s) +} + func (n ExchangeName) String() string { return string(n) } @@ -46,6 +64,8 @@ type Exchange interface { QueryAccountBalances(ctx context.Context) (BalanceMap, error) + QueryTicker(ctx context.Context, symbol string) (*Ticker, error) + QueryTickers(ctx context.Context, symbol ...string) (map[string]Ticker, error) QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error)