implement QueryCh on kline service

This commit is contained in:
c9s 2020-11-06 20:58:45 +08:00
parent 78d7c71ecc
commit f78fefb3b0

View File

@ -16,10 +16,11 @@ type KLineService struct {
// QueryLast queries the last order from the database
func (s *KLineService) QueryLast(ex types.ExchangeName, symbol, interval string) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s", ex, symbol)
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
table := ex.String() + "_klines"
// make the SQL syntax IDE friendly, so that it can analyze it.
table := ex.String() + "_klines"
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY gid DESC LIMIT 1"
sql = strings.ReplaceAll(sql, "binance_klines", table)
@ -43,24 +44,50 @@ func (s *KLineService) QueryLast(ex types.ExchangeName, symbol, interval string)
if rows.Next() {
var kline types.KLine
err = rows.StructScan(&kline)
return &order, err
return &kline, err
}
return nil, rows.Err()
}
func (s *KLineService) Query(ex types.ExchangeName, symbol string) ([]types.KLine, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM orders WHERE exchange = :exchange AND symbol = :symbol ORDER BY gid ASC`, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
func (s *KLineService) QueryCh(ex types.ExchangeName, symbol string, intervals ...string) (chan types.KLine, error) {
sql := "SELECT * FROM `binance_klines` WHERE `symbol` = :symbol AND `interval` IN (:intervals) ORDER BY start_time ASC"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
"intervals": intervals,
})
if err != nil {
return nil, err
}
defer rows.Close()
c := s.scanRowsCh(rows)
return c, nil
}
return s.scanRows(rows)
// scanRowsCh scan rows into channel
func (s *KLineService) scanRowsCh(rows *sqlx.Rows) chan types.KLine {
ch := make(chan types.KLine, 100)
go func() {
defer rows.Close()
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
log.WithError(err).Error("kline scan error")
continue
}
ch <- kline
}
if err := rows.Err(); err != nil {
log.WithError(err).Error("kline scan error")
}
}()
return ch
}
func (s *KLineService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err error) {