use channel to sync trades

This commit is contained in:
c9s 2020-11-05 13:35:04 +08:00
parent 9f532be5a1
commit 7e47f754c5
7 changed files with 73 additions and 49 deletions

View File

@ -17,8 +17,8 @@ CREATE TABLE `orders`
`executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0, `executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,
`side` VARCHAR(4) NOT NULL DEFAULT '', `side` VARCHAR(4) NOT NULL DEFAULT '',
`is_working` BOOL NOT NULL DEFAULT FALSE, `is_working` BOOL NOT NULL DEFAULT FALSE,
`created_at` DATETIME(6) NOT NULL, `created_at` DATETIME(3) NOT NULL,
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
PRIMARY KEY (`gid`) PRIMARY KEY (`gid`)
) ENGINE = InnoDB; ) ENGINE = InnoDB;

View File

@ -45,10 +45,15 @@ func toGlobalOrder(binanceOrder *binance.Order) (*types.Order, error) {
OrderID: uint64(binanceOrder.OrderID), OrderID: uint64(binanceOrder.OrderID),
Status: toGlobalOrderStatus(binanceOrder.Status), Status: toGlobalOrderStatus(binanceOrder.Status),
ExecutedQuantity: util.MustParseFloat(binanceOrder.ExecutedQuantity), ExecutedQuantity: util.MustParseFloat(binanceOrder.ExecutedQuantity),
CreationTime: time.Unix(0, binanceOrder.Time*int64(time.Millisecond)), CreationTime: millisecondTime(binanceOrder.Time),
UpdateTime: millisecondTime(binanceOrder.UpdateTime),
}, nil }, nil
} }
func millisecondTime(t int64) time.Time {
return time.Unix(0, t*int64(time.Millisecond))
}
func toGlobalTrade(t binance.TradeV3) (*types.Trade, error) { func toGlobalTrade(t binance.TradeV3) (*types.Trade, error) {
// skip trade ID that is the same. however this should not happen // skip trade ID that is the same. however this should not happen
var side types.SideType var side types.SideType
@ -58,9 +63,6 @@ func toGlobalTrade(t binance.TradeV3) (*types.Trade, error) {
side = types.SideTypeSell side = types.SideTypeSell
} }
// trade time
mts := time.Unix(0, t.Time*int64(time.Millisecond))
price, err := strconv.ParseFloat(t.Price, 64) price, err := strconv.ParseFloat(t.Price, 64)
if err != nil { if err != nil {
return nil, err return nil, err
@ -94,7 +96,7 @@ func toGlobalTrade(t binance.TradeV3) (*types.Trade, error) {
Fee: fee, Fee: fee,
FeeCurrency: t.CommissionAsset, FeeCurrency: t.CommissionAsset,
QuoteQuantity: quoteQuantity, QuoteQuantity: quoteQuantity,
Time: mts, Time: millisecondTime(t.Time),
}, nil }, nil
} }

View File

@ -73,8 +73,8 @@ func (s *OrderService) scanRows(rows *sqlx.Rows) (orders []types.Order, err erro
func (s *OrderService) Insert(order types.Order) error { func (s *OrderService) Insert(order types.Order) error {
_, err := s.DB.NamedExec(` _, err := s.DB.NamedExec(`
INSERT INTO orders (exchange, order_id, client_order_id, order_type, status, symbol, price, stop_price, quantity, executed_quantity, side, is_working, time_in_force, created_at) INSERT INTO orders (exchange, order_id, client_order_id, order_type, status, symbol, price, stop_price, quantity, executed_quantity, side, is_working, time_in_force, created_at, updated_at)
VALUES (:exchange, :order_id, :client_order_id, :order_type, :status, :symbol, :price, :stop_price, :quantity, :executed_quantity, :side, :is_working, :time_in_force, :created_at)`, VALUES (:exchange, :order_id, :client_order_id, :order_type, :status, :symbol, :price, :stop_price, :quantity, :executed_quantity, :side, :is_working, :time_in_force, :created_at, :updated_at)
order) ON DUPLICATE KEY UPDATE status=:status, executed_quantity=:executed_quantity, is_working=:is_working, updated_at=:updated_at`, order)
return err return err
} }

View File

@ -66,19 +66,27 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
} }
batch := &types.ExchangeBatchProcessor{Exchange: exchange} batch := &types.ExchangeBatchProcessor{Exchange: exchange}
trades, err := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{ tradeC, errC := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime, StartTime: &startTime,
Limit: 200, Limit: 200,
LastTradeID: lastID, LastTradeID: lastID,
}) })
if err != nil {
for trade := range tradeC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err return err
default:
} }
for _, trade := range trades {
if err := s.TradeService.Insert(trade); err != nil { if err := s.TradeService.Insert(trade); err != nil {
return err return err
} }
} }
return nil return nil

View File

@ -87,8 +87,8 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro
func (s *TradeService) Insert(trade types.Trade) error { func (s *TradeService) Insert(trade types.Trade) error {
_, err := s.DB.NamedExec(` _, err := s.DB.NamedExec(`
INSERT INTO trades (id, exchange, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at) INSERT INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at)
VALUES (:id, :exchange, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at)`, VALUES (:id, :exchange, :order_id, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at)`,
trade) trade)
return err return err
} }

View File

@ -83,6 +83,10 @@ func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symb
defer close(errC) defer close(errC)
orderIDs := make(map[uint64]struct{}, 500) orderIDs := make(map[uint64]struct{}, 500)
if lastOrderID > 0 {
orderIDs[lastOrderID] = struct{}{}
}
for startTime.Before(endTime) { for startTime.Before(endTime) {
limitedEndTime := startTime.Add(24 * time.Hour) limitedEndTime := startTime.Add(24 * time.Hour)
orders, err := e.QueryClosedOrders(ctx, symbol, startTime, limitedEndTime, lastOrderID) orders, err := e.QueryClosedOrders(ctx, symbol, startTime, limitedEndTime, lastOrderID)
@ -138,7 +142,10 @@ func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol, in
return allKLines, err return allKLines, err
} }
func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (allTrades []Trade, err error) { func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (c chan Trade, errC chan error) {
c = make(chan Trade, 500)
errC = make(chan error, 1)
// last 7 days // last 7 days
var startTime = time.Now().Add(-7 * 24 * time.Hour) var startTime = time.Now().Add(-7 * 24 * time.Hour)
if options.StartTime != nil { if options.StartTime != nil {
@ -146,6 +153,11 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
} }
var lastTradeID = options.LastTradeID var lastTradeID = options.LastTradeID
go func() {
defer close(c)
defer close(errC)
for { for {
log.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit) log.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit)
@ -155,7 +167,8 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
LastTradeID: lastTradeID, LastTradeID: lastTradeID,
}) })
if err != nil { if err != nil {
return allTrades, err errC <- err
return
} }
if len(trades) == 0 { if len(trades) == 0 {
@ -169,17 +182,17 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
log.Infof("returned %d trades", len(trades)) log.Infof("returned %d trades", len(trades))
startTime = trades[len(trades)-1].Time startTime = trades[len(trades)-1].Time
for _, t := range trades { for _, t := range trades {
// ignore the first trade if last TradeID is given // ignore the first trade if last TradeID is given
if t.ID == lastTradeID { if t.ID == lastTradeID {
continue continue
} }
allTrades = append(allTrades, t) c <- t
lastTradeID = t.ID lastTradeID = t.ID
} }
} }
}()
return allTrades, nil return c, errC
} }

View File

@ -28,7 +28,7 @@ func (t *OrderType) Scan(v interface{}) error {
} }
return nil return nil
} }
*/ */
type OrderStatus string type OrderStatus string
@ -71,6 +71,7 @@ type Order struct {
ExecutedQuantity float64 `json:"executedQuantity" db:"executed_quantity"` ExecutedQuantity float64 `json:"executedQuantity" db:"executed_quantity"`
IsWorking bool `json:"isWorking" db:"is_working"` IsWorking bool `json:"isWorking" db:"is_working"`
CreationTime time.Time `json:"creationTime" db:"created_at"` CreationTime time.Time `json:"creationTime" db:"created_at"`
UpdateTime time.Time `json:"updateTime" db:"updated_at"`
} }
func (o *SubmitOrder) SlackAttachment() slack.Attachment { func (o *SubmitOrder) SlackAttachment() slack.Attachment {