improve order persistence and support order data sync

This commit is contained in:
c9s 2020-11-05 11:00:51 +08:00
parent a4555a2b7b
commit 7fab2e24de
15 changed files with 294 additions and 119 deletions

View File

@ -3,17 +3,18 @@ CREATE TABLE `orders`
(
`gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`order_id` BIGINT UNSIGNED,
`client_order_id` VARCHAR(32) NOT NULL DEFAULT '',
`exchange` VARCHAR(24) NOT NULL DEFAULT '',
-- order_id is the order id returned from the exchange
`order_id` BIGINT UNSIGNED NOT NULL,
`client_order_id` VARCHAR(42) NOT NULL DEFAULT '',
`order_type` VARCHAR(16) NOT NULL,
`symbol` VARCHAR(7) NOT NULL,
`status` VARCHAR(12) NOT NULL,
`time_in_force` VARCHAR(4) NOT NULL,
`price` DECIMAL(16, 8) UNSIGNED NOT NULL,
`stop_price` DECIMAL(16, 8) UNSIGNED NOT NULL,
`quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,
`executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,
`fee` DECIMAL(16, 8) UNSIGNED NOT NULL,
`fee_currency` VARCHAR(4) NOT NULL,
`executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,
`side` VARCHAR(4) NOT NULL DEFAULT '',
`is_working` BOOL NOT NULL DEFAULT FALSE,
`created_at` DATETIME(6) NOT NULL,
@ -21,5 +22,6 @@ CREATE TABLE `orders`
PRIMARY KEY (`gid`)
) ENGINE = InnoDB;
-- +goose Down
DROP TABLE `orders`;

View File

@ -1,5 +1,7 @@
-- +goose Up
ALTER TABLE `trades` ADD COLUMN `order_id` BIGINT UNSIGNED;
ALTER TABLE `trades`
ADD COLUMN `order_id` BIGINT UNSIGNED NOT NULL;
-- +goose Down
ALTER TABLE `trades` DROP COLUMN `order_id`;
ALTER TABLE `trades`
DROP COLUMN `order_id`;

View File

@ -0,0 +1,19 @@
-- +goose Up
DROP INDEX trades_symbol ON trades;
DROP INDEX trades_symbol_fee_currency ON trades;
DROP INDEX trades_traded_at_symbol ON trades;
CREATE INDEX trades_symbol ON trades (exchange, symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);
-- +goose Down
DROP INDEX trades_symbol ON trades;
DROP INDEX trades_symbol_fee_currency ON trades;
DROP INDEX trades_traded_at_symbol ON trades;
CREATE INDEX trades_symbol ON trades (symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (traded_at, symbol);

View File

@ -0,0 +1,7 @@
-- +goose Up
CREATE INDEX orders_symbol ON orders (exchange, symbol);
CREATE INDEX orders_id_symbol ON orders(exchange, order_id, symbol);
-- +goose Down
DROP INDEX orders_symbol ON orders;
DROP INDEX orders_id_symbol ON orders;

View File

@ -36,7 +36,7 @@ type Environment struct {
Notifiability
TradeService *service.TradeService
TradeSync *service.TradeSync
TradeSync *service.SyncService
tradeScanTime time.Time
sessions map[string]*ExchangeSession
@ -52,8 +52,8 @@ func NewEnvironment() *Environment {
func (environ *Environment) SyncTrades(db *sqlx.DB) *Environment {
environ.TradeService = &service.TradeService{DB: db}
environ.TradeSync = &service.TradeSync{
Service: environ.TradeService,
environ.TradeSync = &service.SyncService{
TradeService: environ.TradeService,
}
return environ
@ -90,15 +90,15 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
if environ.TradeSync != nil {
log.Infof("syncing trades from %s for symbol %s...", session.Exchange.Name(), symbol)
if err := environ.TradeSync.Sync(ctx, session.Exchange, symbol, environ.tradeScanTime); err != nil {
if err := environ.TradeSync.SyncTrades(ctx, session.Exchange, symbol, environ.tradeScanTime); err != nil {
return err
}
tradingFeeCurrency := session.Exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) {
trades, err = environ.TradeService.QueryForTradingFeeCurrency(symbol, tradingFeeCurrency)
trades, err = environ.TradeService.QueryForTradingFeeCurrency(session.Exchange.Name(), symbol, tradingFeeCurrency)
} else {
trades, err = environ.TradeService.Query(symbol)
trades, err = environ.TradeService.Query(session.Exchange.Name(), symbol)
}
if err != nil {

View File

@ -59,7 +59,8 @@ var PnLCmd = &cobra.Command{
return err
}
var startTime = time.Now().AddDate(-2, 0, 0)
// default start time
var startTime = time.Now().AddDate(0, -3, 0)
if len(since) > 0 {
loc, err := time.LoadLocation("Asia/Taipei")
if err != nil {
@ -73,10 +74,19 @@ var PnLCmd = &cobra.Command{
}
tradeService := &service.TradeService{DB: db}
tradeSync := &service.TradeSync{Service: tradeService}
orderService := &service.OrderService{DB: db}
syncService := &service.SyncService{
TradeService: tradeService,
OrderService: orderService,
}
logrus.Info("syncing trades from exchange...")
if err := tradeSync.Sync(ctx, exchange, symbol, startTime); err != nil {
if err := syncService.SyncTrades(ctx, exchange, symbol, startTime); err != nil {
return err
}
logrus.Info("syncing orders from exchange...")
if err := syncService.SyncOrders(ctx, exchange, symbol, startTime); err != nil {
return err
}
@ -84,9 +94,9 @@ var PnLCmd = &cobra.Command{
tradingFeeCurrency := exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) {
logrus.Infof("loading all trading fee currency related trades: %s", symbol)
trades, err = tradeService.QueryForTradingFeeCurrency(symbol, tradingFeeCurrency)
trades, err = tradeService.QueryForTradingFeeCurrency(exchange.Name(), symbol, tradingFeeCurrency)
} else {
trades, err = tradeService.Query(symbol)
trades, err = tradeService.Query(exchange.Name(), symbol)
}
if err != nil {

View File

@ -40,10 +40,12 @@ func toGlobalOrder(binanceOrder *binance.Order) (*types.Order, error) {
Price: util.MustParseFloat(binanceOrder.Price),
TimeInForce: string(binanceOrder.TimeInForce),
},
Exchange: types.ExchangeBinance.String(),
IsWorking: binanceOrder.IsWorking,
OrderID: uint64(binanceOrder.OrderID),
Status: toGlobalOrderStatus(binanceOrder.Status),
ExecutedQuantity: util.MustParseFloat(binanceOrder.ExecutedQuantity),
CreationTime: time.Unix(0, binanceOrder.Time*int64(time.Millisecond)),
}, nil
}

View File

@ -287,17 +287,22 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
return orders, err
}
func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
if until.Sub(since) >= 24*time.Hour {
until = since.Add(24*time.Hour - time.Millisecond)
}
func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time) (orders []types.Order, err error) {
var lastOrderID int64 = 0
for {
time.Sleep(3 * time.Second)
log.Infof("querying closed orders %s from %s <=> %s ...", symbol, since, until)
req := e.Client.NewListOrdersService().
Symbol(symbol).
StartTime(since.Unix()).
EndTime(until.Unix())
Symbol(symbol)
if lastOrderID > 0 {
req.OrderID(lastOrderID)
req.OrderID(int64(lastOrderID))
} else {
req.StartTime(since.UnixNano() / int64(time.Millisecond)).
EndTime(until.UnixNano() / int64(time.Millisecond))
}
binanceOrders, err := req.Do(ctx)
@ -306,7 +311,7 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
}
if len(binanceOrders) == 0 {
break
return orders, nil
}
for _, binanceOrder := range binanceOrders {
@ -314,18 +319,12 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
if err != nil {
return orders, err
}
lastOrderID = binanceOrder.OrderID
orders = append(orders, *order)
}
}
return orders, err
return orders, nil
}
func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) (err2 error) {
for _, o := range orders {
var req = e.Client.NewCancelOrderService()

View File

@ -91,13 +91,13 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
return orders, err
}
func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time) (orders []types.Order, err error) {
// lastOrderID is not supported on MAX
func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
offset := 0
limit := 500
orderIDs := make(map[uint64]struct{}, 500)
orderIDs := make(map[uint64]struct{}, limit * 2)
for {
log.Infof("querying closed orders from %s <=> %s", since, until)
log.Infof("querying closed orders offset %d ~ %d + %d", offset, offset, limit)
maxOrders, err := e.client.OrderService.Closed(toLocalSymbol(symbol), maxapi.QueryOrderOptions{
Offset: offset,

View File

@ -2,6 +2,8 @@ package service
import (
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
@ -14,8 +16,37 @@ func NewOrderService(db *sqlx.DB) *OrderService {
return &OrderService{db}
}
func (s *OrderService) Query(symbol string) ([]types.Order, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM orders WHERE symbol = :symbol ORDER BY gid ASC`, map[string]interface{}{
// QueryLast queries the last order from the database
func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string) (*types.Order, error) {
log.Infof("querying last order exchange = %s AND symbol = %s", ex, symbol)
rows, err := s.DB.NamedQuery(`SELECT * FROM orders WHERE exchange = :exchange AND symbol = :symbol ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
})
if err != nil {
return nil, errors.Wrap(err, "query last order error")
}
if rows.Err() != nil {
return nil, rows.Err()
}
defer rows.Close()
if rows.Next() {
var order types.Order
err = rows.StructScan(&order)
return &order, err
}
return nil, rows.Err()
}
func (s *OrderService) Query(ex types.ExchangeName, symbol string) ([]types.Order, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM orders WHERE exchange = :exchange AND symbol = :symbol ORDER BY gid ASC`, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
})
if err != nil {
@ -42,8 +73,8 @@ func (s *OrderService) scanRows(rows *sqlx.Rows) (orders []types.Order, err erro
func (s *OrderService) Insert(order types.Order) error {
_, err := s.DB.NamedExec(`
INSERT INTO orders (id, exchange, order_type, symbol, price, stop_price, quantity, side, :is_working, time_in_force, created_at)
VALUES (:id, :exchange, :order_type, :symbol, :price, :stop_price, :quantity, :side, :is_working, :time_in_force, :created_at)`,
INSERT INTO orders (exchange, order_id, client_order_id, order_type, status, symbol, price, stop_price, quantity, executed_quantity, side, is_working, time_in_force, created_at)
VALUES (:exchange, :order_id, :client_order_id, :order_type, :status, :symbol, :price, :stop_price, :quantity, :executed_quantity, :side, :is_working, :time_in_force, :created_at)`,
order)
return err
}

85
pkg/service/sync.go Normal file
View File

@ -0,0 +1,85 @@
package service
import (
"context"
"time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
type SyncService struct {
TradeService *TradeService
OrderService *OrderService
}
func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
lastOrder, err := s.OrderService.QueryLast(exchange.Name(), symbol)
if err != nil {
return err
}
var lastID uint64 = 0
if lastOrder != nil {
lastID = lastOrder.OrderID
startTime = lastOrder.CreationTime
logrus.Infof("found last order, start from lastID = %d since %s", lastID, startTime)
}
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
ordersC, errC := batch.BatchQueryClosedOrders(ctx, symbol, startTime, time.Now(), lastID)
for order := range ordersC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
default:
}
if err := s.OrderService.Insert(order); err != nil {
return err
}
}
return nil
}
func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
lastTrade, err := s.TradeService.QueryLast(exchange.Name(), symbol)
if err != nil {
return err
}
var lastID int64 = 0
if lastTrade != nil {
lastID = lastTrade.ID
startTime = lastTrade.Time
logrus.Infof("found last trade, start from lastID = %d since %s", lastID, startTime)
}
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
trades, err := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
Limit: 200,
LastTradeID: lastID,
})
if err != nil {
return err
}
for _, trade := range trades {
if err := s.TradeService.Insert(trade); err != nil {
return err
}
}
return nil
}

View File

@ -1,9 +1,6 @@
package service
import (
"context"
"time"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -11,43 +8,6 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
type TradeSync struct {
Service *TradeService
}
func (s *TradeSync) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
lastTrade, err := s.Service.QueryLast(symbol)
if err != nil {
return err
}
var lastID int64 = 0
if lastTrade != nil {
lastID = lastTrade.ID
startTime = lastTrade.Time
log.Infof("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime)
}
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
trades, err := batch.BatchQueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
Limit: 200,
LastTradeID: lastID,
})
if err != nil {
return err
}
for _, trade := range trades {
if err := s.Service.Insert(trade); err != nil {
return err
}
}
return nil
}
type TradeService struct {
DB *sqlx.DB
}
@ -57,11 +17,12 @@ func NewTradeService(db *sqlx.DB) *TradeService {
}
// QueryLast queries the last trade from the database
func (s *TradeService) QueryLast(symbol string) (*types.Trade, error) {
log.Infof("querying last trade symbol = %s", symbol)
func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string) (*types.Trade, error) {
log.Infof("querying last trade exchange = %s AND symbol = %s", ex, symbol)
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE symbol = :symbol ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
"symbol": symbol,
"exchange": ex,
})
if err != nil {
return nil, errors.Wrap(err, "query last trade error")
@ -82,8 +43,9 @@ func (s *TradeService) QueryLast(symbol string) (*types.Trade, error) {
return nil, rows.Err()
}
func (s *TradeService) QueryForTradingFeeCurrency(symbol string, feeCurrency string) ([]types.Trade, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE symbol = :symbol OR fee_currency = :fee_currency ORDER BY traded_at ASC`, map[string]interface{}{
func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol string, feeCurrency string) ([]types.Trade, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND (symbol = :symbol OR fee_currency = :fee_currency) ORDER BY traded_at ASC`, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
"fee_currency": feeCurrency,
})
@ -96,8 +58,9 @@ func (s *TradeService) QueryForTradingFeeCurrency(symbol string, feeCurrency str
return s.scanRows(rows)
}
func (s *TradeService) Query(symbol string) ([]types.Trade, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE symbol = :symbol ORDER BY gid ASC`, map[string]interface{}{
func (s *TradeService) Query(ex types.ExchangeName, symbol string) ([]types.Trade, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol ORDER BY gid ASC`, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
})
if err != nil {
@ -113,7 +76,7 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro
for rows.Next() {
var trade types.Trade
if err := rows.StructScan(&trade); err != nil {
return nil, err
return trades, err
}
trades = append(trades, trade)

View File

@ -58,7 +58,7 @@ type Exchange interface {
QueryOpenOrders(ctx context.Context, symbol string) (orders []Order, err error)
QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time) (orders []Order, err error)
QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []Order, err error)
CancelOrders(ctx context.Context, orders ...Order) error
}
@ -74,6 +74,45 @@ type ExchangeBatchProcessor struct {
Exchange
}
func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan Order, errC chan error) {
c = make(chan Order, 500)
errC = make(chan error, 1)
go func() {
defer close(c)
defer close(errC)
orderIDs := make(map[uint64]struct{}, 500)
for startTime.Before(endTime) {
limitedEndTime := startTime.Add(24 * time.Hour)
orders, err := e.QueryClosedOrders(ctx, symbol, startTime, limitedEndTime, lastOrderID)
if err != nil {
errC <- err
return
}
if len(orders) == 0 {
startTime = limitedEndTime
continue
}
for _, o := range orders {
if _, ok := orderIDs[o.OrderID]; ok {
log.Infof("skipping duplicated order id: %d", o.OrderID)
continue
}
c <- o
startTime = o.CreationTime
lastOrderID = o.OrderID
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol, interval string, startTime, endTime time.Time) (allKLines []KLine, err error) {
for startTime.Before(endTime) {
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
@ -82,7 +121,7 @@ func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol, in
})
if err != nil {
return nil, err
return allKLines, err
}
for _, kline := range kLines {

View File

@ -16,6 +16,20 @@ const (
OrderTypeStopMarket OrderType = "STOP_MARKET"
)
/*
func (t *OrderType) Scan(v interface{}) error {
switch d := v.(type) {
case string:
*t = OrderType(d)
default:
return errors.New("order type scan error, type unsupported")
}
return nil
}
*/
type OrderStatus string
const (
@ -26,16 +40,6 @@ const (
OrderStatusRejected OrderStatus = "REJECTED"
)
type Order struct {
SubmitOrder
OrderID uint64 `json:"orderID" db:"order_id"` // order id
Status OrderStatus `json:"status" db:"status"`
ExecutedQuantity float64 `json:"executedQuantity" db:"executed_quantity"`
IsWorking bool `json:"isWorking" db:"is_working"`
CreationTime time.Time `json:"creationTime" db:"created_at"`
}
type SubmitOrder struct {
ClientOrderID string `json:"clientOrderID" db:"client_order_id"`
@ -57,6 +61,18 @@ type SubmitOrder struct {
TimeInForce string `json:"timeInForce" db:"time_in_force"` // GTC, IOC, FOK
}
type Order struct {
SubmitOrder
Exchange string `json:"exchange" db:"exchange"`
GID uint64 `json:"gid" db:"gid"`
OrderID uint64 `json:"orderID" db:"order_id"` // order id
Status OrderStatus `json:"status" db:"status"`
ExecutedQuantity float64 `json:"executedQuantity" db:"executed_quantity"`
IsWorking bool `json:"isWorking" db:"is_working"`
CreationTime time.Time `json:"creationTime" db:"created_at"`
}
func (o *SubmitOrder) SlackAttachment() slack.Attachment {
var fields = []slack.AttachmentField{
{Title: "Symbol", Value: o.Symbol, Short: true},