diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 525cc892b..26601ac70 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -49,6 +50,78 @@ func NewTradeService(db *sqlx.DB) *TradeService { return &TradeService{db} } +func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string) error { + isMargin := false + isIsolated := false + if marginExchange, ok := exchange.(types.MarginExchange); ok { + marginSettings := marginExchange.GetMarginSettings() + isMargin = marginSettings.IsMargin + isIsolated = marginSettings.IsIsolatedMargin + if marginSettings.IsIsolatedMargin { + symbol = marginSettings.IsolatedMarginSymbol + } + } + + // records descending ordered + records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) + if err != nil { + return err + } + + var tradeKeys = map[types.TradeKey]struct{}{} + var lastTradeID int64 = 1 + if len(records) > 0 { + for _, record := range records { + tradeKeys[record.Key()] = struct{}{} + } + + lastTradeID = records[0].ID + } + + b := &batch.TradeBatchQuery{Exchange: exchange} + tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ + LastTradeID: lastTradeID, + }) + + for trade := range tradeC { + select { + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + } + + key := trade.Key() + if _, exists := tradeKeys[key]; exists { + continue + } + + tradeKeys[key] = struct{}{} + + log.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s", + trade.Exchange, + trade.ID, + trade.Symbol, + trade.Side, + trade.Price, + trade.Quantity, + trade.MakerOrTakerLabel(), + trade.Time.String()) + + if err := s.Insert(trade); err != nil { + return err + } + } + + return <-errC +} + + func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) { args := map[string]interface{}{ // "symbol": symbol,