2020-08-03 05:17:17 +00:00
package service
import (
2021-02-15 12:51:25 +00:00
"context"
"fmt"
2021-01-29 10:48:00 +00:00
"strconv"
2021-01-26 09:21:18 +00:00
"strings"
"time"
2020-08-03 05:17:17 +00:00
"github.com/jmoiron/sqlx"
2020-08-03 07:25:06 +00:00
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2020-09-05 08:22:46 +00:00
2020-10-11 08:46:15 +00:00
"github.com/c9s/bbgo/pkg/types"
2020-08-03 05:17:17 +00:00
)
2021-02-16 07:34:01 +00:00
var ErrTradeNotFound = errors . New ( "trade not found" )
2021-02-15 12:51:25 +00:00
type QueryTradesOptions struct {
Exchange types . ExchangeName
Symbol string
LastGID int64
// ASC or DESC
Ordering string
Limit int
}
2021-01-26 09:21:18 +00:00
type TradingVolume struct {
Year int ` db:"year" json:"year" `
Month int ` db:"month" json:"month,omitempty" `
Day int ` db:"day" json:"day,omitempty" `
Time time . Time ` json:"time,omitempty" `
Exchange string ` db:"exchange" json:"exchange,omitempty" `
Symbol string ` db:"symbol" json:"symbol,omitempty" `
QuoteVolume float64 ` db:"quote_volume" json:"quoteVolume" `
}
type TradingVolumeQueryOptions struct {
2021-01-29 10:48:00 +00:00
GroupByPeriod string
SegmentBy string
2021-01-26 09:21:18 +00:00
}
2020-08-03 05:17:17 +00:00
type TradeService struct {
2020-08-03 07:25:06 +00:00
DB * sqlx . DB
2020-08-03 05:17:17 +00:00
}
2020-08-03 07:25:06 +00:00
func NewTradeService ( db * sqlx . DB ) * TradeService {
2020-08-03 05:17:17 +00:00
return & TradeService { db }
}
2021-01-26 09:21:18 +00:00
func ( s * TradeService ) QueryTradingVolume ( startTime time . Time , options TradingVolumeQueryOptions ) ( [ ] TradingVolume , error ) {
args := map [ string ] interface { } {
// "symbol": symbol,
// "exchange": ex,
// "is_margin": isMargin,
// "is_isolated": isIsolated,
"start_time" : startTime ,
}
2021-02-06 08:05:21 +00:00
sql := ""
driverName := s . DB . DriverName ( )
if driverName == "mysql" {
sql = generateMysqlTradingVolumeQuerySQL ( options )
} else {
sql = generateSqliteTradingVolumeSQL ( options )
}
log . Info ( sql )
2021-02-03 08:51:02 +00:00
rows , err := s . DB . NamedQuery ( sql , args )
if err != nil {
return nil , errors . Wrap ( err , "query last trade error" )
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
}
defer rows . Close ( )
var records [ ] TradingVolume
for rows . Next ( ) {
var record TradingVolume
err = rows . StructScan ( & record )
if err != nil {
return records , err
}
record . Time = time . Date ( record . Year , time . Month ( record . Month ) , record . Day , 0 , 0 , 0 , 0 , time . UTC )
records = append ( records , record )
}
return records , rows . Err ( )
}
2021-02-06 08:05:21 +00:00
func generateSqliteTradingVolumeSQL ( options TradingVolumeQueryOptions ) string {
2021-02-03 08:51:02 +00:00
var sel [ ] string
var groupBys [ ] string
var orderBys [ ] string
where := [ ] string { "traded_at > :start_time" }
2021-02-06 08:05:21 +00:00
2021-01-26 09:21:18 +00:00
switch options . GroupByPeriod {
2021-02-06 08:05:21 +00:00
case "month" :
sel = append ( sel , "strftime('%Y',traded_at) AS year" , "strftime('%m',traded_at) AS month" )
groupBys = append ( [ ] string { "month" , "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" )
case "year" :
sel = append ( sel , "strftime('%Y',traded_at) AS year" )
groupBys = append ( [ ] string { "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" )
case "day" :
fallthrough
default :
sel = append ( sel , "strftime('%Y',traded_at) AS year" , "strftime('%m',traded_at) AS month" , "strftime('%d',traded_at) AS day" )
groupBys = append ( [ ] string { "day" , "month" , "year" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" , "day ASC" )
}
switch options . SegmentBy {
case "symbol" :
sel = append ( sel , "symbol" )
groupBys = append ( [ ] string { "symbol" } , groupBys ... )
orderBys = append ( orderBys , "symbol" )
case "exchange" :
sel = append ( sel , "exchange" )
groupBys = append ( [ ] string { "exchange" } , groupBys ... )
orderBys = append ( orderBys , "exchange" )
}
sel = append ( sel , "SUM(quantity * price) AS quote_volume" )
sql := ` SELECT ` + strings . Join ( sel , ", " ) + ` FROM trades ` +
` WHERE ` + strings . Join ( where , " AND " ) +
` GROUP BY ` + strings . Join ( groupBys , ", " ) +
` ORDER BY ` + strings . Join ( orderBys , ", " )
return sql
}
func generateMysqlTradingVolumeQuerySQL ( options TradingVolumeQueryOptions ) string {
var sel [ ] string
var groupBys [ ] string
var orderBys [ ] string
where := [ ] string { "traded_at > :start_time" }
2021-01-26 09:21:18 +00:00
2021-02-06 08:05:21 +00:00
switch options . GroupByPeriod {
2021-01-26 09:21:18 +00:00
case "month" :
2021-02-06 08:05:21 +00:00
2021-01-26 09:21:18 +00:00
sel = append ( sel , "YEAR(traded_at) AS year" , "MONTH(traded_at) AS month" )
groupBys = append ( [ ] string { "MONTH(traded_at)" , "YEAR(traded_at)" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" )
case "year" :
sel = append ( sel , "YEAR(traded_at) AS year" )
groupBys = append ( [ ] string { "YEAR(traded_at)" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" )
case "day" :
fallthrough
default :
sel = append ( sel , "YEAR(traded_at) AS year" , "MONTH(traded_at) AS month" , "DAY(traded_at) AS day" )
groupBys = append ( [ ] string { "DAY(traded_at)" , "MONTH(traded_at)" , "YEAR(traded_at)" } , groupBys ... )
orderBys = append ( orderBys , "year ASC" , "month ASC" , "day ASC" )
}
2021-01-28 10:51:35 +00:00
switch options . SegmentBy {
case "symbol" :
sel = append ( sel , "symbol" )
groupBys = append ( [ ] string { "symbol" } , groupBys ... )
orderBys = append ( orderBys , "symbol" )
case "exchange" :
2021-01-26 09:21:18 +00:00
sel = append ( sel , "exchange" )
groupBys = append ( [ ] string { "exchange" } , groupBys ... )
orderBys = append ( orderBys , "exchange" )
}
sel = append ( sel , "SUM(quantity * price) AS quote_volume" )
sql := ` SELECT ` + strings . Join ( sel , ", " ) + ` FROM trades ` +
` WHERE ` + strings . Join ( where , " AND " ) +
` GROUP BY ` + strings . Join ( groupBys , ", " ) +
` ORDER BY ` + strings . Join ( orderBys , ", " )
2021-02-03 08:51:02 +00:00
return sql
2021-01-26 09:21:18 +00:00
}
2020-08-03 05:17:17 +00:00
// QueryLast queries the last trade from the database
2021-01-19 18:09:12 +00:00
func ( s * TradeService ) QueryLast ( ex types . ExchangeName , symbol string , isMargin bool , isIsolated bool ) ( * types . Trade , error ) {
log . Infof ( "querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v" , ex , symbol , isMargin , isIsolated )
rows , err := s . DB . NamedQuery ( ` SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT 1 ` , map [ string ] interface { } {
"symbol" : symbol ,
"exchange" : ex ,
"is_margin" : isMargin ,
"is_isolated" : isIsolated ,
2020-08-03 05:17:17 +00:00
} )
if err != nil {
2020-08-03 07:25:06 +00:00
return nil , errors . Wrap ( err , "query last trade error" )
}
if rows . Err ( ) != nil {
return nil , rows . Err ( )
2020-08-03 05:17:17 +00:00
}
defer rows . Close ( )
2020-08-03 07:25:06 +00:00
if rows . Next ( ) {
var trade types . Trade
err = rows . StructScan ( & trade )
return & trade , err
}
2020-08-03 05:17:17 +00:00
2020-08-03 07:25:06 +00:00
return nil , rows . Err ( )
2020-08-03 05:17:17 +00:00
}
2020-11-05 03:00:51 +00:00
func ( s * TradeService ) QueryForTradingFeeCurrency ( ex types . ExchangeName , symbol string , feeCurrency string ) ( [ ] types . Trade , error ) {
rows , err := s . DB . NamedQuery ( ` SELECT * FROM trades WHERE exchange = :exchange AND (symbol = :symbol OR fee_currency = :fee_currency) ORDER BY traded_at ASC ` , map [ string ] interface { } {
"exchange" : ex ,
2020-10-09 05:21:42 +00:00
"symbol" : symbol ,
2020-08-04 01:47:54 +00:00
"fee_currency" : feeCurrency ,
2020-08-03 12:06:33 +00:00
} )
if err != nil {
return nil , err
}
defer rows . Close ( )
return s . scanRows ( rows )
}
2021-01-29 10:48:00 +00:00
func ( s * TradeService ) Query ( options QueryTradesOptions ) ( [ ] types . Trade , error ) {
2021-02-03 07:44:02 +00:00
sql := queryTradesSQL ( options )
log . Info ( sql )
args := map [ string ] interface { } {
"exchange" : options . Exchange ,
"symbol" : options . Symbol ,
}
rows , err := s . DB . NamedQuery ( sql , args )
if err != nil {
return nil , err
}
defer rows . Close ( )
return s . scanRows ( rows )
}
2021-02-16 07:34:01 +00:00
func ( s * TradeService ) Load ( ctx context . Context , id int64 ) ( * types . Trade , error ) {
var trade types . Trade
rows , err := s . DB . NamedQuery ( "SELECT * FROM trades WHERE id = :id" , map [ string ] interface { } {
"id" : id ,
} )
if err != nil {
return nil , err
}
defer rows . Close ( )
if rows . Next ( ) {
err = rows . StructScan ( & trade )
return & trade , err
}
2021-02-16 08:39:56 +00:00
return nil , errors . Wrapf ( ErrTradeNotFound , "trade id:%d not found" , id )
2021-02-16 07:34:01 +00:00
}
2021-02-15 12:53:19 +00:00
func ( s * TradeService ) MarkStrategyID ( ctx context . Context , id int64 , strategyID string ) error {
2021-02-16 07:34:01 +00:00
result , err := s . DB . NamedExecContext ( ctx , "UPDATE `trades` SET `strategy` = :strategy WHERE `id` = :id" , map [ string ] interface { } {
"id" : id ,
2021-02-15 12:53:19 +00:00
"strategy" : strategyID ,
} )
if err != nil {
return err
}
cnt , err := result . RowsAffected ( )
if err != nil {
return err
}
if cnt == 0 {
return fmt . Errorf ( "trade id:%d not found" , id )
}
return nil
}
2021-02-15 12:51:25 +00:00
func ( s * TradeService ) UpdatePnL ( ctx context . Context , id int64 , pnl float64 ) error {
2021-02-16 07:34:01 +00:00
result , err := s . DB . NamedExecContext ( ctx , "UPDATE `trades` SET `pnl` = :pnl WHERE `id` = :id" , map [ string ] interface { } {
2021-02-15 12:51:25 +00:00
"id" : id ,
"pnl" : pnl ,
} )
if err != nil {
return err
}
cnt , err := result . RowsAffected ( )
if err != nil {
return err
}
if cnt == 0 {
return fmt . Errorf ( "trade id:%d not found" , id )
}
return nil
}
2021-02-03 07:44:02 +00:00
func queryTradesSQL ( options QueryTradesOptions ) string {
2021-01-29 10:48:00 +00:00
ordering := "ASC"
switch v := strings . ToUpper ( options . Ordering ) ; v {
case "DESC" , "ASC" :
ordering = v
}
var where [ ] string
if len ( options . Exchange ) > 0 {
where = append ( where , ` exchange = :exchange ` )
}
if len ( options . Symbol ) > 0 {
where = append ( where , ` symbol = :symbol ` )
}
if options . LastGID > 0 {
switch ordering {
case "ASC" :
where = append ( where , "gid > :gid" )
case "DESC" :
where = append ( where , "gid < :gid" )
}
}
sql := ` SELECT * FROM trades `
if len ( where ) > 0 {
sql += ` WHERE ` + strings . Join ( where , " AND " )
}
sql += ` ORDER BY gid ` + ordering
2021-02-16 08:39:56 +00:00
if options . Limit > 0 {
sql += ` LIMIT ` + strconv . Itoa ( options . Limit )
}
2021-02-03 07:44:02 +00:00
return sql
2020-08-03 12:06:33 +00:00
}
2020-10-09 05:21:42 +00:00
func ( s * TradeService ) scanRows ( rows * sqlx . Rows ) ( trades [ ] types . Trade , err error ) {
2020-08-03 05:17:17 +00:00
for rows . Next ( ) {
var trade types . Trade
if err := rows . StructScan ( & trade ) ; err != nil {
2020-11-05 03:00:51 +00:00
return trades , err
2020-08-03 05:17:17 +00:00
}
2020-08-03 07:25:06 +00:00
2020-08-03 05:17:17 +00:00
trades = append ( trades , trade )
}
2020-08-03 07:25:06 +00:00
return trades , rows . Err ( )
2020-08-03 05:17:17 +00:00
}
func ( s * TradeService ) Insert ( trade types . Trade ) error {
2020-08-03 07:25:06 +00:00
_ , err := s . DB . NamedExec ( `
2021-02-06 03:44:49 +00:00
INSERT INTO trades ( id , exchange , order_id , symbol , price , quantity , quote_quantity , side , is_buyer , is_maker , fee , fee_currency , traded_at , is_margin , is_isolated )
2021-01-19 15:33:06 +00:00
VALUES ( : id , : exchange , : order_id , : symbol , : price , : quantity , : quote_quantity , : side , : is_buyer , : is_maker , : fee , : fee_currency , : traded_at , : is_margin , : is_isolated ) ` ,
2020-08-03 05:17:17 +00:00
trade )
return err
}