2020-08-03 05:17:17 +00:00
package service
import (
2021-02-15 12:51:25 +00:00
"context"
2024-07-09 08:12:35 +00:00
"fmt"
2021-01-29 10:48:00 +00:00
"strconv"
2021-01-26 09:21:18 +00:00
"strings"
"time"
2022-06-01 10:30:24 +00:00
sq "github.com/Masterminds/squirrel"
2020-08-03 05:17:17 +00:00
"github.com/jmoiron/sqlx"
2020-08-03 07:25:06 +00:00
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2020-09-05 08:22:46 +00:00
2022-07-14 09:36:16 +00:00
exchange2 "github.com/c9s/bbgo/pkg/exchange"
2021-03-14 03:03:45 +00:00
"github.com/c9s/bbgo/pkg/exchange/batch"
2020-10-11 08:46:15 +00:00
"github.com/c9s/bbgo/pkg/types"
2020-08-03 05:17:17 +00:00
)
2021-02-16 07:34:01 +00:00
var ErrTradeNotFound = errors . New ( "trade not found" )
2021-02-15 12:51:25 +00:00
type QueryTradesOptions struct {
Exchange types . ExchangeName
2022-06-22 10:19:11 +00:00
Sessions [ ] string
2021-02-15 12:51:25 +00:00
Symbol string
LastGID int64
2023-10-31 04:36:59 +00:00
// inclusive
Since * time . Time
// exclusive
Until * time . Time
2021-02-15 12:51:25 +00:00
// ASC or DESC
Ordering string
2024-07-09 08:12:35 +00:00
// OrderByColumn is the column name to order by
// Currently we only support traded_at and gid column.
OrderByColumn string
Limit uint64
2021-02-15 12:51:25 +00:00
}
2021-01-26 09:21:18 +00:00
type TradingVolume struct {
Year int ` db:"year" json:"year" `
Month int ` db:"month" json:"month,omitempty" `
Day int ` db:"day" json:"day,omitempty" `
Time time . Time ` json:"time,omitempty" `
Exchange string ` db:"exchange" json:"exchange,omitempty" `
Symbol string ` db:"symbol" json:"symbol,omitempty" `
QuoteVolume float64 ` db:"quote_volume" json:"quoteVolume" `
}
type TradingVolumeQueryOptions struct {
2021-01-29 10:48:00 +00:00
GroupByPeriod string
SegmentBy string
2021-01-26 09:21:18 +00:00
}
2020-08-03 05:17:17 +00:00
type TradeService struct {
2020-08-03 07:25:06 +00:00
DB * sqlx . DB
2020-08-03 05:17:17 +00:00
}
2020-08-03 07:25:06 +00:00
func NewTradeService ( db * sqlx . DB ) * TradeService {
2020-08-03 05:17:17 +00:00
return & TradeService { db }
}
2024-06-19 06:18:21 +00:00
func ( s * TradeService ) Sync (
ctx context . Context ,
exchange types . Exchange , symbol string ,
startTime , endTime time . Time ,
) error {
2022-07-14 09:36:16 +00:00
isMargin , isFutures , isIsolated , isolatedSymbol := exchange2 . GetSessionAttributes ( exchange )
2022-06-01 10:30:24 +00:00
// override symbol if isolatedSymbol is not empty
if isIsolated && len ( isolatedSymbol ) > 0 {
symbol = isolatedSymbol
2021-03-14 03:03:45 +00:00
}
2022-06-01 10:30:24 +00:00
api , ok := exchange . ( types . ExchangeTradeHistoryService )
2022-05-30 16:59:33 +00:00
if ! ok {
return nil
}
2022-06-01 10:30:24 +00:00
lastTradeID := uint64 ( 1 )
tasks := [ ] SyncTask {
{
Type : types . Trade { } ,
Select : SelectLastTrades ( exchange . Name ( ) , symbol , isMargin , isFutures , isIsolated , 100 ) ,
OnLoad : func ( objs interface { } ) {
// update last trade ID
trades := objs . ( [ ] types . Trade )
if len ( trades ) > 0 {
end := len ( trades ) - 1
last := trades [ end ]
lastTradeID = last . ID
}
} ,
BatchQuery : func ( ctx context . Context , startTime , endTime time . Time ) ( interface { } , chan error ) {
query := & batch . TradeBatchQuery {
ExchangeTradeHistoryService : api ,
}
return query . Query ( ctx , symbol , & types . TradeQueryOptions {
StartTime : & startTime ,
EndTime : & endTime ,
LastTradeID : lastTradeID ,
} )
} ,
Time : func ( obj interface { } ) time . Time {
return obj . ( types . Trade ) . Time . Time ( )
} ,
ID : func ( obj interface { } ) string {
trade := obj . ( types . Trade )
return strconv . FormatUint ( trade . ID , 10 ) + trade . Side . String ( )
} ,
2022-06-06 04:24:18 +00:00
LogInsert : true ,
2022-06-01 10:30:24 +00:00
} ,
}
for _ , sel := range tasks {
2024-06-19 06:18:21 +00:00
if err := sel . execute ( ctx , s . DB , startTime , endTime ) ; err != nil {
2022-06-01 10:30:24 +00:00
return err
2021-03-14 03:03:45 +00:00
}
2022-06-01 10:30:24 +00:00
}
2021-03-14 03:03:45 +00:00
2022-06-01 10:30:24 +00:00
return nil
2021-03-14 03:03:45 +00:00
}
2021-01-26 09:21:18 +00:00
func ( s * TradeService ) QueryTradingVolume ( startTime time . Time , options TradingVolumeQueryOptions ) ( [ ] TradingVolume , error ) {
args := map [ string ] interface { } {
// "symbol": symbol,
// "exchange": ex,
// "is_margin": isMargin,
// "is_isolated": isIsolated,
"start_time" : startTime ,
}
2021-02-06 08:05:21 +00:00
sql := ""
driverName := s . DB . DriverName ( )
if driverName == "mysql" {
sql = generateMysqlTradingVolumeQuerySQL ( options )
} else {
sql = generateSqliteTradingVolumeSQL ( options )
}
log . Info ( sql )
2021-02-03 08:51:02 +00:00
rows , err := s . DB . NamedQuery ( sql , args )
if err != nil {
return nil , errors . Wrap ( err , "query last trade error" )
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
}
defer rows . Close ( )
var records [ ] TradingVolume
for rows . Next ( ) {
var record TradingVolume
err = rows . StructScan ( & record )
if err != nil {
return records , err
}
2022-06-24 07:42:30 +00:00
record . Time = time . Date ( record . Year , time . Month ( record . Month ) , record . Day , 0 , 0 , 0 , 0 , time . Local )
2021-02-03 08:51:02 +00:00
records = append ( records , record )
}
return records , rows . Err ( )
}
2021-02-06 08:05:21 +00:00
func generateSqliteTradingVolumeSQL ( options TradingVolumeQueryOptions ) string {
2021-02-26 09:22:08 +00:00
timeRangeColumn := "traded_at"
sel , groupBys , orderBys := generateSqlite3TimeRangeClauses ( timeRangeColumn , options . GroupByPeriod )
2021-02-06 08:05:21 +00:00
switch options . SegmentBy {
case "symbol" :
sel = append ( sel , "symbol" )
groupBys = append ( [ ] string { "symbol" } , groupBys ... )
orderBys = append ( orderBys , "symbol" )
case "exchange" :
sel = append ( sel , "exchange" )
groupBys = append ( [ ] string { "exchange" } , groupBys ... )
orderBys = append ( orderBys , "exchange" )
}
sel = append ( sel , "SUM(quantity * price) AS quote_volume" )
2021-02-26 09:22:08 +00:00
where := [ ] string { timeRangeColumn + " > :start_time" }
2021-02-06 08:05:21 +00:00
sql := ` SELECT ` + strings . Join ( sel , ", " ) + ` FROM trades ` +
` WHERE ` + strings . Join ( where , " AND " ) +
` GROUP BY ` + strings . Join ( groupBys , ", " ) +
` ORDER BY ` + strings . Join ( orderBys , ", " )
return sql
}
2021-02-26 09:22:08 +00:00
func generateSqlite3TimeRangeClauses ( timeRangeColumn , period string ) ( selectors [ ] string , groupBys [ ] string , orderBys [ ] string ) {
switch period {
2021-01-26 09:21:18 +00:00
case "month" :
2021-02-26 09:22:08 +00:00
selectors = append ( selectors , "strftime('%Y'," + timeRangeColumn + ") AS year" , "strftime('%m'," + timeRangeColumn + ") AS month" )
groupBys = append ( [ ] string { "month" , "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" )
2021-02-06 08:05:21 +00:00
2021-02-26 09:22:08 +00:00
case "year" :
selectors = append ( selectors , "strftime('%Y'," + timeRangeColumn + ") AS year" )
groupBys = append ( [ ] string { "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" )
case "day" :
fallthrough
default :
selectors = append ( selectors , "strftime('%Y'," + timeRangeColumn + ") AS year" , "strftime('%m'," + timeRangeColumn + ") AS month" , "strftime('%d'," + timeRangeColumn + ") AS day" )
groupBys = append ( [ ] string { "day" , "month" , "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" , "day ASC" )
}
return
}
func generateMysqlTimeRangeClauses ( timeRangeColumn , period string ) ( selectors [ ] string , groupBys [ ] string , orderBys [ ] string ) {
switch period {
case "month" :
selectors = append ( selectors , "YEAR(" + timeRangeColumn + ") AS year" , "MONTH(" + timeRangeColumn + ") AS month" )
2021-02-26 08:16:41 +00:00
groupBys = append ( [ ] string { "MONTH(" + timeRangeColumn + ")" , "YEAR(" + timeRangeColumn + ")" } , groupBys ... )
2021-01-26 09:21:18 +00:00
orderBys = append ( orderBys , "year ASC" , "month ASC" )
case "year" :
2021-02-26 09:22:08 +00:00
selectors = append ( selectors , "YEAR(" + timeRangeColumn + ") AS year" )
2021-02-26 08:16:41 +00:00
groupBys = append ( [ ] string { "YEAR(" + timeRangeColumn + ")" } , groupBys ... )
2021-01-26 09:21:18 +00:00
orderBys = append ( orderBys , "year ASC" )
case "day" :
fallthrough
default :
2021-02-26 09:22:08 +00:00
selectors = append ( selectors , "YEAR(" + timeRangeColumn + ") AS year" , "MONTH(" + timeRangeColumn + ") AS month" , "DAY(" + timeRangeColumn + ") AS day" )
groupBys = append ( [ ] string { "DAY(" + timeRangeColumn + ")" , "MONTH(" + timeRangeColumn + ")" , "YEAR(" + timeRangeColumn + ")" } , groupBys ... )
2021-01-26 09:21:18 +00:00
orderBys = append ( orderBys , "year ASC" , "month ASC" , "day ASC" )
}
2021-02-26 09:22:08 +00:00
return
}
func generateMysqlTradingVolumeQuerySQL ( options TradingVolumeQueryOptions ) string {
timeRangeColumn := "traded_at"
sel , groupBys , orderBys := generateMysqlTimeRangeClauses ( timeRangeColumn , options . GroupByPeriod )
2021-01-28 10:51:35 +00:00
switch options . SegmentBy {
case "symbol" :
sel = append ( sel , "symbol" )
groupBys = append ( [ ] string { "symbol" } , groupBys ... )
orderBys = append ( orderBys , "symbol" )
case "exchange" :
2021-01-26 09:21:18 +00:00
sel = append ( sel , "exchange" )
groupBys = append ( [ ] string { "exchange" } , groupBys ... )
orderBys = append ( orderBys , "exchange" )
}
sel = append ( sel , "SUM(quantity * price) AS quote_volume" )
2021-02-26 09:22:08 +00:00
where := [ ] string { timeRangeColumn + " > :start_time" }
2021-01-26 09:21:18 +00:00
sql := ` SELECT ` + strings . Join ( sel , ", " ) + ` FROM trades ` +
` WHERE ` + strings . Join ( where , " AND " ) +
` GROUP BY ` + strings . Join ( groupBys , ", " ) +
` ORDER BY ` + strings . Join ( orderBys , ", " )
2021-02-03 08:51:02 +00:00
return sql
2021-01-26 09:21:18 +00:00
}
2020-11-05 03:00:51 +00:00
func ( s * TradeService ) QueryForTradingFeeCurrency ( ex types . ExchangeName , symbol string , feeCurrency string ) ( [ ] types . Trade , error ) {
2021-02-25 05:47:59 +00:00
sql := "SELECT * FROM trades WHERE exchange = :exchange AND (symbol = :symbol OR fee_currency = :fee_currency) ORDER BY traded_at ASC"
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2020-11-05 03:00:51 +00:00
"exchange" : ex ,
2020-10-09 05:21:42 +00:00
"symbol" : symbol ,
2020-08-04 01:47:54 +00:00
"fee_currency" : feeCurrency ,
2020-08-03 12:06:33 +00:00
} )
if err != nil {
return nil , err
}
defer rows . Close ( )
return s . scanRows ( rows )
}
2021-01-29 10:48:00 +00:00
func ( s * TradeService ) Query ( options QueryTradesOptions ) ( [ ] types . Trade , error ) {
2022-06-22 10:19:11 +00:00
sel := sq . Select ( "*" ) .
From ( "trades" )
2023-10-31 04:36:59 +00:00
if options . LastGID != 0 {
sel = sel . Where ( sq . Gt { "gid" : options . LastGID } )
}
2022-06-22 10:19:11 +00:00
if options . Since != nil {
sel = sel . Where ( sq . GtOrEq { "traded_at" : options . Since } )
2021-02-03 07:44:02 +00:00
}
2023-10-31 04:36:59 +00:00
if options . Until != nil {
sel = sel . Where ( sq . Lt { "traded_at" : options . Until } )
}
2022-06-02 12:02:32 +00:00
2023-10-31 04:36:59 +00:00
if options . Symbol != "" {
sel = sel . Where ( sq . Eq { "symbol" : options . Symbol } )
}
2022-06-22 10:19:11 +00:00
if options . Exchange != "" {
sel = sel . Where ( sq . Eq { "exchange" : options . Exchange } )
}
if len ( options . Sessions ) > 0 {
// FIXME: right now we only have the exchange field in the db, we might need to add the session field too.
sel = sel . Where ( sq . Eq { "exchange" : options . Sessions } )
}
2024-07-09 08:12:35 +00:00
var orderByColumn string
switch options . OrderByColumn {
case "" :
orderByColumn = "traded_at"
case "traded_at" , "gid" :
orderByColumn = options . OrderByColumn
default :
return nil , fmt . Errorf ( "invalid order by column: %s" , options . OrderByColumn )
}
var ordering string
switch strings . ToUpper ( options . Ordering ) {
case "" :
ordering = "ASC"
case "ASC" , "DESC" :
ordering = strings . ToUpper ( options . Ordering )
default :
return nil , fmt . Errorf ( "invalid ordering: %s" , options . Ordering )
2022-06-22 10:19:11 +00:00
}
2024-07-09 08:12:35 +00:00
sel = sel . OrderBy ( orderByColumn + " " + ordering )
2022-06-22 10:19:11 +00:00
if options . Limit > 0 {
sel = sel . Limit ( options . Limit )
}
sql , args , err := sel . ToSql ( )
if err != nil {
return nil , err
}
log . Debug ( sql )
log . Debug ( args )
rows , err := s . DB . Queryx ( sql , args ... )
2021-02-03 07:44:02 +00:00
if err != nil {
return nil , err
}
defer rows . Close ( )
return s . scanRows ( rows )
}
2021-02-16 07:34:01 +00:00
func ( s * TradeService ) Load ( ctx context . Context , id int64 ) ( * types . Trade , error ) {
var trade types . Trade
2024-06-19 07:51:16 +00:00
rows , err := s . DB . NamedQueryContext ( ctx , "SELECT * FROM trades WHERE id = :id" , map [ string ] interface { } {
2021-02-16 07:34:01 +00:00
"id" : id ,
} )
if err != nil {
return nil , err
}
defer rows . Close ( )
if rows . Next ( ) {
err = rows . StructScan ( & trade )
return & trade , err
}
2021-02-16 08:39:56 +00:00
return nil , errors . Wrapf ( ErrTradeNotFound , "trade id:%d not found" , id )
2021-02-16 07:34:01 +00:00
}
2021-02-03 07:44:02 +00:00
func queryTradesSQL ( options QueryTradesOptions ) string {
2021-01-29 10:48:00 +00:00
ordering := "ASC"
switch v := strings . ToUpper ( options . Ordering ) ; v {
case "DESC" , "ASC" :
ordering = v
}
var where [ ] string
if options . LastGID > 0 {
switch ordering {
case "ASC" :
where = append ( where , "gid > :gid" )
case "DESC" :
where = append ( where , "gid < :gid" )
}
}
2022-03-17 05:46:19 +00:00
if len ( options . Symbol ) > 0 {
where = append ( where , ` symbol = :symbol ` )
}
2021-01-29 10:48:00 +00:00
2022-03-17 05:46:19 +00:00
if len ( options . Exchange ) > 0 {
where = append ( where , ` exchange = :exchange ` )
}
sql := ` SELECT * FROM trades `
2021-01-29 10:48:00 +00:00
if len ( where ) > 0 {
sql += ` WHERE ` + strings . Join ( where , " AND " )
}
sql += ` ORDER BY gid ` + ordering
2021-02-16 08:39:56 +00:00
if options . Limit > 0 {
2022-06-22 10:19:11 +00:00
sql += ` LIMIT ` + strconv . FormatUint ( options . Limit , 10 )
2021-02-16 08:39:56 +00:00
}
2021-02-03 07:44:02 +00:00
return sql
2020-08-03 12:06:33 +00:00
}
2020-10-09 05:21:42 +00:00
func ( s * TradeService ) scanRows ( rows * sqlx . Rows ) ( trades [ ] types . Trade , err error ) {
2020-08-03 05:17:17 +00:00
for rows . Next ( ) {
var trade types . Trade
if err := rows . StructScan ( & trade ) ; err != nil {
2020-11-05 03:00:51 +00:00
return trades , err
2020-08-03 05:17:17 +00:00
}
2020-08-03 07:25:06 +00:00
2020-08-03 05:17:17 +00:00
trades = append ( trades , trade )
}
2020-08-03 07:25:06 +00:00
return trades , rows . Err ( )
2020-08-03 05:17:17 +00:00
}
func ( s * TradeService ) Insert ( trade types . Trade ) error {
2022-06-01 10:30:24 +00:00
sql := dbCache . InsertSqlOf ( trade )
_ , err := s . DB . NamedExec ( sql , trade )
2020-08-03 05:17:17 +00:00
return err
}
2021-05-30 07:25:00 +00:00
func ( s * TradeService ) DeleteAll ( ) error {
_ , err := s . DB . Exec ( ` DELETE FROM trades ` )
return err
}
2022-06-01 10:30:24 +00:00
func SelectLastTrades ( ex types . ExchangeName , symbol string , isMargin , isFutures , isIsolated bool , limit uint64 ) sq . SelectBuilder {
return sq . Select ( "*" ) .
From ( "trades" ) .
Where ( sq . And {
sq . Eq { "symbol" : symbol } ,
sq . Eq { "exchange" : ex } ,
sq . Eq { "is_margin" : isMargin } ,
sq . Eq { "is_futures" : isFutures } ,
sq . Eq { "is_isolated" : isIsolated } ,
} ) .
OrderBy ( "traded_at DESC" ) .
Limit ( limit )
}