2020-08-03 05:17:17 +00:00
package service
import (
2021-02-15 12:51:25 +00:00
"context"
"fmt"
2021-01-29 10:48:00 +00:00
"strconv"
2021-01-26 09:21:18 +00:00
"strings"
"time"
2020-08-03 05:17:17 +00:00
"github.com/jmoiron/sqlx"
2020-08-03 07:25:06 +00:00
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2020-09-05 08:22:46 +00:00
2021-03-14 03:03:45 +00:00
"github.com/c9s/bbgo/pkg/exchange/batch"
2020-10-11 08:46:15 +00:00
"github.com/c9s/bbgo/pkg/types"
2020-08-03 05:17:17 +00:00
)
2021-02-16 07:34:01 +00:00
var ErrTradeNotFound = errors . New ( "trade not found" )
2021-02-15 12:51:25 +00:00
type QueryTradesOptions struct {
Exchange types . ExchangeName
Symbol string
LastGID int64
// ASC or DESC
Ordering string
Limit int
}
2021-01-26 09:21:18 +00:00
type TradingVolume struct {
Year int ` db:"year" json:"year" `
Month int ` db:"month" json:"month,omitempty" `
Day int ` db:"day" json:"day,omitempty" `
Time time . Time ` json:"time,omitempty" `
Exchange string ` db:"exchange" json:"exchange,omitempty" `
Symbol string ` db:"symbol" json:"symbol,omitempty" `
QuoteVolume float64 ` db:"quote_volume" json:"quoteVolume" `
}
type TradingVolumeQueryOptions struct {
2021-01-29 10:48:00 +00:00
GroupByPeriod string
SegmentBy string
2021-01-26 09:21:18 +00:00
}
2020-08-03 05:17:17 +00:00
type TradeService struct {
2020-08-03 07:25:06 +00:00
DB * sqlx . DB
2020-08-03 05:17:17 +00:00
}
2020-08-03 07:25:06 +00:00
func NewTradeService ( db * sqlx . DB ) * TradeService {
2020-08-03 05:17:17 +00:00
return & TradeService { db }
}
2021-12-31 18:51:58 +00:00
func ( s * TradeService ) Sync ( ctx context . Context , exchange types . Exchange , symbol string , startTime time . Time ) error {
2021-03-14 03:03:45 +00:00
isMargin := false
2021-12-05 07:59:42 +00:00
isFutures := false
2021-03-14 03:03:45 +00:00
isIsolated := false
2021-12-05 07:59:42 +00:00
2021-03-14 03:03:45 +00:00
if marginExchange , ok := exchange . ( types . MarginExchange ) ; ok {
marginSettings := marginExchange . GetMarginSettings ( )
isMargin = marginSettings . IsMargin
isIsolated = marginSettings . IsIsolatedMargin
if marginSettings . IsIsolatedMargin {
symbol = marginSettings . IsolatedMarginSymbol
}
}
2021-12-05 07:59:42 +00:00
if futuresExchange , ok := exchange . ( types . FuturesExchange ) ; ok {
futuresSettings := futuresExchange . GetFuturesSettings ( )
isFutures = futuresSettings . IsFutures
isIsolated = futuresSettings . IsIsolatedFutures
if futuresSettings . IsIsolatedFutures {
symbol = futuresSettings . IsolatedFuturesSymbol
}
}
2021-12-31 06:12:41 +00:00
// records descending ordered, buffer 50 trades and use the trades ID to scan if the new trades are duplicated
2021-12-05 07:59:42 +00:00
records , err := s . QueryLast ( exchange . Name ( ) , symbol , isMargin , isFutures , isIsolated , 50 )
2021-03-14 03:03:45 +00:00
if err != nil {
return err
}
var tradeKeys = map [ types . TradeKey ] struct { } { }
2021-12-23 05:15:27 +00:00
var lastTradeID uint64 = 1
2021-12-31 05:52:16 +00:00
var now = time . Now ( )
2021-03-14 03:03:45 +00:00
if len ( records ) > 0 {
for _ , record := range records {
tradeKeys [ record . Key ( ) ] = struct { } { }
}
lastTradeID = records [ 0 ] . ID
2021-12-31 18:51:58 +00:00
startTime = time . Time ( records [ 0 ] . Time )
2021-03-14 03:03:45 +00:00
}
b := & batch . TradeBatchQuery { Exchange : exchange }
tradeC , errC := b . Query ( ctx , symbol , & types . TradeQueryOptions {
LastTradeID : lastTradeID ,
2021-12-31 18:51:58 +00:00
StartTime : & startTime ,
2021-12-31 18:43:08 +00:00
EndTime : & now ,
2021-03-14 03:03:45 +00:00
} )
for trade := range tradeC {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case err := <- errC :
if err != nil {
return err
}
default :
}
key := trade . Key ( )
if _ , exists := tradeKeys [ key ] ; exists {
continue
}
tradeKeys [ key ] = struct { } { }
log . Infof ( "inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s" ,
trade . Exchange ,
trade . ID ,
trade . Symbol ,
trade . Side ,
trade . Price ,
trade . Quantity ,
2021-05-12 04:37:48 +00:00
trade . Liquidity ( ) ,
2021-03-14 03:03:45 +00:00
trade . Time . String ( ) )
if err := s . Insert ( trade ) ; err != nil {
return err
}
}
return <- errC
}
2021-01-26 09:21:18 +00:00
func ( s * TradeService ) QueryTradingVolume ( startTime time . Time , options TradingVolumeQueryOptions ) ( [ ] TradingVolume , error ) {
args := map [ string ] interface { } {
// "symbol": symbol,
// "exchange": ex,
// "is_margin": isMargin,
// "is_isolated": isIsolated,
"start_time" : startTime ,
}
2021-02-06 08:05:21 +00:00
sql := ""
driverName := s . DB . DriverName ( )
if driverName == "mysql" {
sql = generateMysqlTradingVolumeQuerySQL ( options )
} else {
sql = generateSqliteTradingVolumeSQL ( options )
}
log . Info ( sql )
2021-02-03 08:51:02 +00:00
rows , err := s . DB . NamedQuery ( sql , args )
if err != nil {
return nil , errors . Wrap ( err , "query last trade error" )
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
}
defer rows . Close ( )
var records [ ] TradingVolume
for rows . Next ( ) {
var record TradingVolume
err = rows . StructScan ( & record )
if err != nil {
return records , err
}
record . Time = time . Date ( record . Year , time . Month ( record . Month ) , record . Day , 0 , 0 , 0 , 0 , time . UTC )
records = append ( records , record )
}
return records , rows . Err ( )
}
2021-02-06 08:05:21 +00:00
func generateSqliteTradingVolumeSQL ( options TradingVolumeQueryOptions ) string {
2021-02-26 09:22:08 +00:00
timeRangeColumn := "traded_at"
sel , groupBys , orderBys := generateSqlite3TimeRangeClauses ( timeRangeColumn , options . GroupByPeriod )
2021-02-06 08:05:21 +00:00
switch options . SegmentBy {
case "symbol" :
sel = append ( sel , "symbol" )
groupBys = append ( [ ] string { "symbol" } , groupBys ... )
orderBys = append ( orderBys , "symbol" )
case "exchange" :
sel = append ( sel , "exchange" )
groupBys = append ( [ ] string { "exchange" } , groupBys ... )
orderBys = append ( orderBys , "exchange" )
}
sel = append ( sel , "SUM(quantity * price) AS quote_volume" )
2021-02-26 09:22:08 +00:00
where := [ ] string { timeRangeColumn + " > :start_time" }
2021-02-06 08:05:21 +00:00
sql := ` SELECT ` + strings . Join ( sel , ", " ) + ` FROM trades ` +
` WHERE ` + strings . Join ( where , " AND " ) +
` GROUP BY ` + strings . Join ( groupBys , ", " ) +
` ORDER BY ` + strings . Join ( orderBys , ", " )
return sql
}
2021-02-26 09:22:08 +00:00
func generateSqlite3TimeRangeClauses ( timeRangeColumn , period string ) ( selectors [ ] string , groupBys [ ] string , orderBys [ ] string ) {
switch period {
2021-01-26 09:21:18 +00:00
case "month" :
2021-02-26 09:22:08 +00:00
selectors = append ( selectors , "strftime('%Y'," + timeRangeColumn + ") AS year" , "strftime('%m'," + timeRangeColumn + ") AS month" )
groupBys = append ( [ ] string { "month" , "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" )
2021-02-06 08:05:21 +00:00
2021-02-26 09:22:08 +00:00
case "year" :
selectors = append ( selectors , "strftime('%Y'," + timeRangeColumn + ") AS year" )
groupBys = append ( [ ] string { "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" )
case "day" :
fallthrough
default :
selectors = append ( selectors , "strftime('%Y'," + timeRangeColumn + ") AS year" , "strftime('%m'," + timeRangeColumn + ") AS month" , "strftime('%d'," + timeRangeColumn + ") AS day" )
groupBys = append ( [ ] string { "day" , "month" , "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" , "day ASC" )
}
return
}
func generateMysqlTimeRangeClauses ( timeRangeColumn , period string ) ( selectors [ ] string , groupBys [ ] string , orderBys [ ] string ) {
switch period {
case "month" :
selectors = append ( selectors , "YEAR(" + timeRangeColumn + ") AS year" , "MONTH(" + timeRangeColumn + ") AS month" )
2021-02-26 08:16:41 +00:00
groupBys = append ( [ ] string { "MONTH(" + timeRangeColumn + ")" , "YEAR(" + timeRangeColumn + ")" } , groupBys ... )
2021-01-26 09:21:18 +00:00
orderBys = append ( orderBys , "year ASC" , "month ASC" )
case "year" :
2021-02-26 09:22:08 +00:00
selectors = append ( selectors , "YEAR(" + timeRangeColumn + ") AS year" )
2021-02-26 08:16:41 +00:00
groupBys = append ( [ ] string { "YEAR(" + timeRangeColumn + ")" } , groupBys ... )
2021-01-26 09:21:18 +00:00
orderBys = append ( orderBys , "year ASC" )
case "day" :
fallthrough
default :
2021-02-26 09:22:08 +00:00
selectors = append ( selectors , "YEAR(" + timeRangeColumn + ") AS year" , "MONTH(" + timeRangeColumn + ") AS month" , "DAY(" + timeRangeColumn + ") AS day" )
groupBys = append ( [ ] string { "DAY(" + timeRangeColumn + ")" , "MONTH(" + timeRangeColumn + ")" , "YEAR(" + timeRangeColumn + ")" } , groupBys ... )
2021-01-26 09:21:18 +00:00
orderBys = append ( orderBys , "year ASC" , "month ASC" , "day ASC" )
}
2021-02-26 09:22:08 +00:00
return
}
func generateMysqlTradingVolumeQuerySQL ( options TradingVolumeQueryOptions ) string {
timeRangeColumn := "traded_at"
sel , groupBys , orderBys := generateMysqlTimeRangeClauses ( timeRangeColumn , options . GroupByPeriod )
2021-01-28 10:51:35 +00:00
switch options . SegmentBy {
case "symbol" :
sel = append ( sel , "symbol" )
groupBys = append ( [ ] string { "symbol" } , groupBys ... )
orderBys = append ( orderBys , "symbol" )
case "exchange" :
2021-01-26 09:21:18 +00:00
sel = append ( sel , "exchange" )
groupBys = append ( [ ] string { "exchange" } , groupBys ... )
orderBys = append ( orderBys , "exchange" )
}
sel = append ( sel , "SUM(quantity * price) AS quote_volume" )
2021-02-26 09:22:08 +00:00
where := [ ] string { timeRangeColumn + " > :start_time" }
2021-01-26 09:21:18 +00:00
sql := ` SELECT ` + strings . Join ( sel , ", " ) + ` FROM trades ` +
` WHERE ` + strings . Join ( where , " AND " ) +
` GROUP BY ` + strings . Join ( groupBys , ", " ) +
` ORDER BY ` + strings . Join ( orderBys , ", " )
2021-02-03 08:51:02 +00:00
return sql
2021-01-26 09:21:18 +00:00
}
2020-08-03 05:17:17 +00:00
// QueryLast queries the last trade from the database
2021-12-05 07:59:42 +00:00
func ( s * TradeService ) QueryLast ( ex types . ExchangeName , symbol string , isMargin , isFutures , isIsolated bool , limit int ) ( [ ] types . Trade , error ) {
log . Debugf ( "querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_futures = %v AND is_isolated = %v" , ex , symbol , isMargin , isFutures , isIsolated )
2021-01-19 18:09:12 +00:00
2021-12-05 07:59:42 +00:00
sql := "SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_futures = :is_futures AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT :limit"
2021-02-18 10:20:18 +00:00
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2021-01-19 18:09:12 +00:00
"symbol" : symbol ,
"exchange" : ex ,
"is_margin" : isMargin ,
2021-12-05 07:59:42 +00:00
"is_futures" : isFutures ,
2021-01-19 18:09:12 +00:00
"is_isolated" : isIsolated ,
2021-02-18 10:20:18 +00:00
"limit" : limit ,
2020-08-03 05:17:17 +00:00
} )
if err != nil {
2020-08-03 07:25:06 +00:00
return nil , errors . Wrap ( err , "query last trade error" )
}
2020-08-03 05:17:17 +00:00
defer rows . Close ( )
2021-02-18 10:20:18 +00:00
return s . scanRows ( rows )
2020-08-03 05:17:17 +00:00
}
2020-11-05 03:00:51 +00:00
func ( s * TradeService ) QueryForTradingFeeCurrency ( ex types . ExchangeName , symbol string , feeCurrency string ) ( [ ] types . Trade , error ) {
2021-02-25 05:47:59 +00:00
sql := "SELECT * FROM trades WHERE exchange = :exchange AND (symbol = :symbol OR fee_currency = :fee_currency) ORDER BY traded_at ASC"
rows , err := s . DB . NamedQuery ( sql , map [ string ] interface { } {
2020-11-05 03:00:51 +00:00
"exchange" : ex ,
2020-10-09 05:21:42 +00:00
"symbol" : symbol ,
2020-08-04 01:47:54 +00:00
"fee_currency" : feeCurrency ,
2020-08-03 12:06:33 +00:00
} )
if err != nil {
return nil , err
}
defer rows . Close ( )
return s . scanRows ( rows )
}
2021-01-29 10:48:00 +00:00
func ( s * TradeService ) Query ( options QueryTradesOptions ) ( [ ] types . Trade , error ) {
2021-02-03 07:44:02 +00:00
sql := queryTradesSQL ( options )
log . Info ( sql )
args := map [ string ] interface { } {
"exchange" : options . Exchange ,
"symbol" : options . Symbol ,
}
rows , err := s . DB . NamedQuery ( sql , args )
if err != nil {
return nil , err
}
defer rows . Close ( )
return s . scanRows ( rows )
}
2021-02-16 07:34:01 +00:00
func ( s * TradeService ) Load ( ctx context . Context , id int64 ) ( * types . Trade , error ) {
var trade types . Trade
rows , err := s . DB . NamedQuery ( "SELECT * FROM trades WHERE id = :id" , map [ string ] interface { } {
"id" : id ,
} )
if err != nil {
return nil , err
}
defer rows . Close ( )
if rows . Next ( ) {
err = rows . StructScan ( & trade )
return & trade , err
}
2021-02-16 08:39:56 +00:00
return nil , errors . Wrapf ( ErrTradeNotFound , "trade id:%d not found" , id )
2021-02-16 07:34:01 +00:00
}
2021-03-16 06:07:47 +00:00
func ( s * TradeService ) Mark ( ctx context . Context , id int64 , strategyID string ) error {
2021-02-16 07:34:01 +00:00
result , err := s . DB . NamedExecContext ( ctx , "UPDATE `trades` SET `strategy` = :strategy WHERE `id` = :id" , map [ string ] interface { } {
"id" : id ,
2021-02-15 12:53:19 +00:00
"strategy" : strategyID ,
} )
if err != nil {
return err
}
cnt , err := result . RowsAffected ( )
if err != nil {
return err
}
if cnt == 0 {
return fmt . Errorf ( "trade id:%d not found" , id )
}
return nil
}
2021-02-15 12:51:25 +00:00
func ( s * TradeService ) UpdatePnL ( ctx context . Context , id int64 , pnl float64 ) error {
2021-02-16 07:34:01 +00:00
result , err := s . DB . NamedExecContext ( ctx , "UPDATE `trades` SET `pnl` = :pnl WHERE `id` = :id" , map [ string ] interface { } {
2021-02-15 12:51:25 +00:00
"id" : id ,
"pnl" : pnl ,
} )
if err != nil {
return err
}
cnt , err := result . RowsAffected ( )
if err != nil {
return err
}
if cnt == 0 {
return fmt . Errorf ( "trade id:%d not found" , id )
}
return nil
}
2021-02-03 07:44:02 +00:00
func queryTradesSQL ( options QueryTradesOptions ) string {
2021-01-29 10:48:00 +00:00
ordering := "ASC"
switch v := strings . ToUpper ( options . Ordering ) ; v {
case "DESC" , "ASC" :
ordering = v
}
var where [ ] string
if len ( options . Exchange ) > 0 {
where = append ( where , ` exchange = :exchange ` )
}
if len ( options . Symbol ) > 0 {
where = append ( where , ` symbol = :symbol ` )
}
if options . LastGID > 0 {
switch ordering {
case "ASC" :
where = append ( where , "gid > :gid" )
case "DESC" :
where = append ( where , "gid < :gid" )
}
}
sql := ` SELECT * FROM trades `
if len ( where ) > 0 {
sql += ` WHERE ` + strings . Join ( where , " AND " )
}
sql += ` ORDER BY gid ` + ordering
2021-02-16 08:39:56 +00:00
if options . Limit > 0 {
sql += ` LIMIT ` + strconv . Itoa ( options . Limit )
}
2021-02-03 07:44:02 +00:00
return sql
2020-08-03 12:06:33 +00:00
}
2020-10-09 05:21:42 +00:00
func ( s * TradeService ) scanRows ( rows * sqlx . Rows ) ( trades [ ] types . Trade , err error ) {
2020-08-03 05:17:17 +00:00
for rows . Next ( ) {
var trade types . Trade
if err := rows . StructScan ( & trade ) ; err != nil {
2020-11-05 03:00:51 +00:00
return trades , err
2020-08-03 05:17:17 +00:00
}
2020-08-03 07:25:06 +00:00
2020-08-03 05:17:17 +00:00
trades = append ( trades , trade )
}
2020-08-03 07:25:06 +00:00
return trades , rows . Err ( )
2020-08-03 05:17:17 +00:00
}
func ( s * TradeService ) Insert ( trade types . Trade ) error {
2020-08-03 07:25:06 +00:00
_ , err := s . DB . NamedExec ( `
2021-12-05 07:59:42 +00:00
INSERT INTO trades ( id , exchange , order_id , symbol , price , quantity , quote_quantity , side , is_buyer , is_maker , fee , fee_currency , traded_at , is_margin , is_futures , is_isolated )
VALUES ( : id , : exchange , : order_id , : symbol , : price , : quantity , : quote_quantity , : side , : is_buyer , : is_maker , : fee , : fee_currency , : traded_at , : is_margin , : is_futures , : is_isolated ) ` ,
2020-08-03 05:17:17 +00:00
trade )
return err
}
2021-05-30 07:25:00 +00:00
func ( s * TradeService ) DeleteAll ( ) error {
_ , err := s . DB . Exec ( ` DELETE FROM trades ` )
return err
}