diff --git a/migrations/20201102222546_orders.sql b/migrations/20201102222546_orders.sql index eb09898bd..2e005cc93 100644 --- a/migrations/20201102222546_orders.sql +++ b/migrations/20201102222546_orders.sql @@ -17,8 +17,8 @@ CREATE TABLE `orders` `executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0, `side` VARCHAR(4) NOT NULL DEFAULT '', `is_working` BOOL NOT NULL DEFAULT FALSE, - `created_at` DATETIME(6) NOT NULL, - + `created_at` DATETIME(3) NOT NULL, + `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3), PRIMARY KEY (`gid`) ) ENGINE = InnoDB; diff --git a/pkg/exchange/binance/convert.go b/pkg/exchange/binance/convert.go index 032a65fda..3e5621f6f 100644 --- a/pkg/exchange/binance/convert.go +++ b/pkg/exchange/binance/convert.go @@ -45,10 +45,15 @@ func toGlobalOrder(binanceOrder *binance.Order) (*types.Order, error) { OrderID: uint64(binanceOrder.OrderID), Status: toGlobalOrderStatus(binanceOrder.Status), ExecutedQuantity: util.MustParseFloat(binanceOrder.ExecutedQuantity), - CreationTime: time.Unix(0, binanceOrder.Time*int64(time.Millisecond)), + CreationTime: millisecondTime(binanceOrder.Time), + UpdateTime: millisecondTime(binanceOrder.UpdateTime), }, nil } +func millisecondTime(t int64) time.Time { + return time.Unix(0, t*int64(time.Millisecond)) +} + func toGlobalTrade(t binance.TradeV3) (*types.Trade, error) { // skip trade ID that is the same. however this should not happen var side types.SideType @@ -58,9 +63,6 @@ func toGlobalTrade(t binance.TradeV3) (*types.Trade, error) { side = types.SideTypeSell } - // trade time - mts := time.Unix(0, t.Time*int64(time.Millisecond)) - price, err := strconv.ParseFloat(t.Price, 64) if err != nil { return nil, err @@ -94,7 +96,7 @@ func toGlobalTrade(t binance.TradeV3) (*types.Trade, error) { Fee: fee, FeeCurrency: t.CommissionAsset, QuoteQuantity: quoteQuantity, - Time: mts, + Time: millisecondTime(t.Time), }, nil } diff --git a/pkg/service/order.go b/pkg/service/order.go index 575b163d5..8ff2b78c1 100644 --- a/pkg/service/order.go +++ b/pkg/service/order.go @@ -73,8 +73,8 @@ func (s *OrderService) scanRows(rows *sqlx.Rows) (orders []types.Order, err erro func (s *OrderService) Insert(order types.Order) error { _, err := s.DB.NamedExec(` - INSERT INTO orders (exchange, order_id, client_order_id, order_type, status, symbol, price, stop_price, quantity, executed_quantity, side, is_working, time_in_force, created_at) - VALUES (:exchange, :order_id, :client_order_id, :order_type, :status, :symbol, :price, :stop_price, :quantity, :executed_quantity, :side, :is_working, :time_in_force, :created_at)`, - order) + INSERT INTO orders (exchange, order_id, client_order_id, order_type, status, symbol, price, stop_price, quantity, executed_quantity, side, is_working, time_in_force, created_at, updated_at) + VALUES (:exchange, :order_id, :client_order_id, :order_type, :status, :symbol, :price, :stop_price, :quantity, :executed_quantity, :side, :is_working, :time_in_force, :created_at, :updated_at) + ON DUPLICATE KEY UPDATE status=:status, executed_quantity=:executed_quantity, is_working=:is_working, updated_at=:updated_at`, order) return err } diff --git a/pkg/service/sync.go b/pkg/service/sync.go index d388c3856..32ec803a5 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -66,19 +66,27 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s } batch := &types.ExchangeBatchProcessor{Exchange: exchange} - trades, err := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ + tradeC, errC := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ StartTime: &startTime, Limit: 200, LastTradeID: lastID, }) - if err != nil { - return err - } - for _, trade := range trades { + for trade := range tradeC { + select { + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + return err + + default: + } + if err := s.TradeService.Insert(trade); err != nil { return err } + } return nil diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 5ebf90a8a..836535447 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -87,8 +87,8 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro func (s *TradeService) Insert(trade types.Trade) error { _, err := s.DB.NamedExec(` - INSERT INTO trades (id, exchange, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at) - VALUES (:id, :exchange, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at)`, + INSERT INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at) + VALUES (:id, :exchange, :order_id, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at)`, trade) return err } diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 6fd5527e0..5e4f55d95 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -83,6 +83,10 @@ func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symb defer close(errC) orderIDs := make(map[uint64]struct{}, 500) + if lastOrderID > 0 { + orderIDs[lastOrderID] = struct{}{} + } + for startTime.Before(endTime) { limitedEndTime := startTime.Add(24 * time.Hour) orders, err := e.QueryClosedOrders(ctx, symbol, startTime, limitedEndTime, lastOrderID) @@ -138,7 +142,10 @@ func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol, in return allKLines, err } -func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (allTrades []Trade, err error) { +func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (c chan Trade, errC chan error) { + c = make(chan Trade, 500) + errC = make(chan error, 1) + // last 7 days var startTime = time.Now().Add(-7 * 24 * time.Hour) if options.StartTime != nil { @@ -146,40 +153,46 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str } var lastTradeID = options.LastTradeID - for { - log.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit) - trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{ - StartTime: &startTime, - Limit: options.Limit, - LastTradeID: lastTradeID, - }) - if err != nil { - return allTrades, err - } + go func() { + defer close(c) + defer close(errC) - if len(trades) == 0 { - break - } + for { + log.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit) - if len(trades) == 1 && trades[0].ID == lastTradeID { - break - } - - log.Infof("returned %d trades", len(trades)) - - startTime = trades[len(trades)-1].Time - - for _, t := range trades { - // ignore the first trade if last TradeID is given - if t.ID == lastTradeID { - continue + trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{ + StartTime: &startTime, + Limit: options.Limit, + LastTradeID: lastTradeID, + }) + if err != nil { + errC <- err + return } - allTrades = append(allTrades, t) - lastTradeID = t.ID - } - } + if len(trades) == 0 { + break + } - return allTrades, nil + if len(trades) == 1 && trades[0].ID == lastTradeID { + break + } + + log.Infof("returned %d trades", len(trades)) + + startTime = trades[len(trades)-1].Time + for _, t := range trades { + // ignore the first trade if last TradeID is given + if t.ID == lastTradeID { + continue + } + + c <- t + lastTradeID = t.ID + } + } + }() + + return c, errC } diff --git a/pkg/types/order.go b/pkg/types/order.go index 77116a665..3b8e0e71f 100644 --- a/pkg/types/order.go +++ b/pkg/types/order.go @@ -28,7 +28,7 @@ func (t *OrderType) Scan(v interface{}) error { } return nil } - */ +*/ type OrderStatus string @@ -71,6 +71,7 @@ type Order struct { ExecutedQuantity float64 `json:"executedQuantity" db:"executed_quantity"` IsWorking bool `json:"isWorking" db:"is_working"` CreationTime time.Time `json:"creationTime" db:"created_at"` + UpdateTime time.Time `json:"updateTime" db:"updated_at"` } func (o *SubmitOrder) SlackAttachment() slack.Attachment {