Merge pull request #488 from c9s/improve/mysql-index

improve: improve trades table index
This commit is contained in:
Yo-An Lin 2022-03-18 15:51:04 +08:00 committed by GitHub
commit 71af9961a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 251 additions and 13 deletions

View File

@ -0,0 +1,19 @@
-- +up
DROP INDEX trades_symbol ON trades;
DROP INDEX trades_symbol_fee_currency ON trades;
DROP INDEX trades_traded_at_symbol ON trades;
-- this index is used for general trade query
CREATE INDEX trades_traded_at ON trades (traded_at, symbol, exchange, id, fee_currency, fee);
-- this index is used for join clause by trade_id
CREATE INDEX trades_id_traded_at ON trades (id, traded_at);
-- this index is used for join clause by order id
CREATE INDEX trades_order_id_traded_at ON trades (order_id, traded_at);
-- +down
DROP INDEX trades_traded_at ON trades;
DROP INDEX trades_id_traded_at ON trades;
DROP INDEX trades_order_id_traded_at ON trades;
CREATE INDEX trades_symbol ON trades (exchange, symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);

View File

@ -0,0 +1,19 @@
-- +up
DROP INDEX IF EXISTS trades_symbol;
DROP INDEX IF EXISTS trades_symbol_fee_currency;
DROP INDEX IF EXISTS trades_traded_at_symbol;
-- this index is used for general trade query
CREATE INDEX trades_traded_at ON trades (traded_at, symbol, exchange, id, fee_currency, fee);
-- this index is used for join clause by trade_id
CREATE INDEX trades_id_traded_at ON trades (id, traded_at);
-- this index is used for join clause by order id
CREATE INDEX trades_order_id_traded_at ON trades (order_id, traded_at);
-- +down
DROP INDEX IF EXISTS trades_traded_at;
DROP INDEX IF EXISTS trades_id_traded_at;
DROP INDEX IF EXISTS trades_order_id_traded_at;
CREATE INDEX trades_symbol ON trades (exchange, symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);

View File

@ -0,0 +1,84 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upFixTradeIndexes, downFixTradeIndexes)
}
func upFixTradeIndexes(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at ON trades (traded_at, symbol, exchange, id, fee_currency, fee);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_id_traded_at ON trades (id, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_order_id_traded_at ON trades (order_id, traded_at);")
if err != nil {
return err
}
return err
}
func downFixTradeIndexes(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_id_traded_at ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_order_id_traded_at ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,84 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upFixTradeIndexes, downFixTradeIndexes)
}
func upFixTradeIndexes(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol_fee_currency;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_traded_at_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at ON trades (traded_at, symbol, exchange, id, fee_currency, fee);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_id_traded_at ON trades (id, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_order_id_traded_at ON trades (order_id, traded_at);")
if err != nil {
return err
}
return err
}
func downFixTradeIndexes(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_traded_at;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_id_traded_at;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_order_id_traded_at;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);")
if err != nil {
return err
}
return err
}

View File

@ -320,7 +320,7 @@ func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol
func (s *TradeService) Query(options QueryTradesOptions) ([]types.Trade, error) {
sql := queryTradesSQL(options)
log.Info(sql)
log.Debug(sql)
args := map[string]interface{}{
"exchange": options.Exchange,
@ -408,14 +408,6 @@ func queryTradesSQL(options QueryTradesOptions) string {
var where []string
if len(options.Exchange) > 0 {
where = append(where, `exchange = :exchange`)
}
if len(options.Symbol) > 0 {
where = append(where, `symbol = :symbol`)
}
if options.LastGID > 0 {
switch ordering {
case "ASC":
@ -425,8 +417,15 @@ func queryTradesSQL(options QueryTradesOptions) string {
}
}
sql := `SELECT * FROM trades`
if len(options.Symbol) > 0 {
where = append(where, `symbol = :symbol`)
}
if len(options.Exchange) > 0 {
where = append(where, `exchange = :exchange`)
}
sql := `SELECT * FROM trades`
if len(where) > 0 {
sql += ` WHERE ` + strings.Join(where, " AND ")
}
@ -455,8 +454,41 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro
func (s *TradeService) Insert(trade types.Trade) error {
_, err := s.DB.NamedExec(`
INSERT INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at, is_margin, is_futures, is_isolated)
VALUES (:id, :exchange, :order_id, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at, :is_margin, :is_futures, :is_isolated)`,
INSERT INTO trades (
id,
exchange,
order_id,
symbol,
price,
quantity,
quote_quantity,
side,
is_buyer,
is_maker,
fee,
fee_currency,
traded_at,
is_margin,
is_futures,
is_isolated)
VALUES (
:id,
:exchange,
:order_id,
:symbol,
:price,
:quantity,
:quote_quantity,
:side,
:is_buyer,
:is_maker,
:fee,
:fee_currency,
:traded_at,
:is_margin,
:is_futures,
:is_isolated
)`,
trade)
return err
}

View File

@ -99,7 +99,7 @@ func Test_queryTradesSQL(t *testing.T) {
})
t.Run("convert all options", func(t *testing.T) {
assert.Equal(t, "SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND gid < :gid ORDER BY gid DESC LIMIT 500", queryTradesSQL(QueryTradesOptions{
assert.Equal(t, "SELECT * FROM trades WHERE gid < :gid AND symbol = :symbol AND exchange = :exchange ORDER BY gid DESC LIMIT 500", queryTradesSQL(QueryTradesOptions{
Exchange: "max",
Symbol: "btc",
LastGID: 123,