account: add nav_history_details and account_service for #302

This commit is contained in:
TonyQ 2021-12-14 07:19:21 +08:00
parent 0c7bbba675
commit 4eb5a099ae
13 changed files with 276 additions and 31 deletions

1
.gitignore vendored
View File

@ -36,3 +36,4 @@
bbgo.sqlite3
node_modules
otp*png

2
go.mod
View File

@ -54,6 +54,8 @@ require (
github.com/webview/webview v0.0.0-20210216142346-e0bfdf0e5d90
github.com/x-cray/logrus-prefixed-formatter v0.5.2
github.com/zserge/lorca v0.1.9
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/net v0.0.0-20211205041911-012df41ee64c // indirect
golang.org/x/sys v0.0.0-20211204120058-94396e421777 // indirect

5
go.sum
View File

@ -395,7 +395,12 @@ go.opentelemetry.io/otel/oteltest v0.19.0/go.mod h1:tI4yxwh8U21v7JD6R3BcA/2+RBoT
go.opentelemetry.io/otel/trace v0.19.0 h1:1ucYlenXIDA1OlHVLDZKX0ObXV5RLaq06DtUKz5e5zc=
go.opentelemetry.io/otel/trace v0.19.0/go.mod h1:4IXiNextNOpPnRlI4ryK69mn5iC84bjBWZQA5DXz/qg=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View File

@ -0,0 +1,26 @@
-- +up
-- +begin
CREATE TABLE nav_history_details
(
gid bigint unsigned auto_increment PRIMARY KEY,
exchange VARCHAR(30) NOT NULL,
subaccount VARCHAR(30) NOT NULL,
time DATETIME(3) NOT NULL,
currency VARCHAR(12) NOT NULL,
balance_in_usd DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
balance_in_btc DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
balance DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
available DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
locked DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL
);
-- +end
-- +begin
CREATE INDEX idx_nav_history_details
on nav_history_details(time, currency, exchange);
-- +end
-- +down
-- +begin
DROP TABLE nav_history_details;
-- +end

View File

@ -0,0 +1,26 @@
-- +up
-- +begin
CREATE TABLE `nav_history_details`
(
gid bigint unsigned auto_increment PRIMARY KEY,
`exchange` VARCHAR NOT NULL DEFAULT '',
`subaccount` VARCHAR NOT NULL DEFAULT '',
time DATETIME(3) NOT NULL DEFAULT (strftime('%s','now')),
currency VARCHAR NOT NULL,
balance_in_usd DECIMAL DEFAULT 0.00000000 NOT NULL,
balance_in_btc DECIMAL DEFAULT 0.00000000 NOT NULL,
balance DECIMAL DEFAULT 0.00000000 NOT NULL,
available DECIMAL DEFAULT 0.00000000 NOT NULL,
locked DECIMAL DEFAULT 0.00000000 NOT NULL
);
-- +end
-- +begin
CREATE INDEX idx_nav_history_details
on nav_history_details (time, currency, exchange);
-- +end
-- +down
-- +begin
DROP TABLE nav_history_details;
-- +end

View File

@ -70,6 +70,7 @@ type Environment struct {
BacktestService *service.BacktestService
RewardService *service.RewardService
SyncService *service.SyncService
AccountService *service.AccountService
// startTime is the time of start point (which is used in the backtest)
startTime time.Time
@ -159,6 +160,7 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver
environ.OrderService = &service.OrderService{DB: db}
environ.TradeService = &service.TradeService{DB: db}
environ.RewardService = &service.RewardService{DB: db}
environ.AccountService = &service.AccountService{DB: db}
environ.SyncService = &service.SyncService{
TradeService: environ.TradeService,

View File

@ -597,9 +597,10 @@ func (session *ExchangeSession) UpdatePrices(ctx context.Context) (err error) {
balances := session.Account.Balances()
symbols := make([]string, len(balances))
var symbols []string
for _, b := range balances {
symbols = append(symbols, b.Currency+"USDT")
symbols = append(symbols, "USDT"+b.Currency)
}
tickers, err := session.Exchange.QueryTickers(ctx, symbols...)

View File

@ -352,6 +352,13 @@ func (trader *Trader) injectCommonServices(rs reflect.Value) error {
}
}
if trader.environment.AccountService != nil {
if err := injectField(rs, "AccountService", trader.environment.AccountService, true); err != nil {
return errors.Wrap(err, "failed to inject AccountService")
}
}
if field, ok := hasField(rs, "Persistence"); ok {
if trader.environment.PersistenceServiceFacade == nil {
log.Warnf("strategy has Persistence field but persistence service is not defined")

View File

@ -35,6 +35,16 @@ type Exchange struct {
restEndpoint *url.URL
}
type MarketTicker struct {
Market types.Market
Price float64
Ask float64
Bid float64
Last float64
}
type MarketMap map[string]MarketTicker
// FTX does not have broker ID
const spotBrokerID = "BBGO"
@ -96,6 +106,18 @@ func (e *Exchange) NewStream() types.Stream {
}
func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
markets, err := e._queryMarkets(ctx)
if err != nil {
return nil, err
}
marketMap := types.MarketMap{}
for k, v := range markets {
marketMap[k] = v.Market
}
return marketMap, nil
}
func (e *Exchange) _queryMarkets(ctx context.Context) (MarketMap, error) {
resp, err := e.newRest().Markets(ctx)
if err != nil {
return nil, err
@ -104,15 +126,15 @@ func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
return nil, fmt.Errorf("ftx returns querying markets failure")
}
markets := types.MarketMap{}
markets := MarketMap{}
for _, m := range resp.Result {
symbol := toGlobalSymbol(m.Name)
symbolMap[symbol] = m.Name
market := types.Market{
mkt2 := MarketTicker{
Market: types.Market{
Symbol: symbol,
LocalSymbol: m.Name,
// The max precision is length(DefaultPow). For example, currently fixedpoint.DefaultPow
// is 1e8, so the max precision will be 8.
PricePrecision: fixedpoint.NumFractionalDigits(fixedpoint.NewFromFloat(m.PriceIncrement)),
@ -129,8 +151,13 @@ func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
MinPrice: 0,
MaxPrice: 0,
TickSize: m.PriceIncrement,
},
Price: m.Price,
Bid: m.Bid,
Ask: m.Ask,
Last: m.Last,
}
markets[symbol] = market
markets[symbol] = mkt2
}
return markets, nil
}
@ -461,11 +488,65 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro
}
func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) {
panic("implement me")
ticketMap, err := e.QueryTickers(ctx, symbol)
if err != nil {
return nil, err
}
if ticker, ok := ticketMap[symbol]; ok {
return &ticker, nil
}
return nil, fmt.Errorf("ticker %s not found", symbol)
}
func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) {
panic("implement me")
var tickers = make(map[string]types.Ticker)
markets, err := e._queryMarkets(ctx)
if err != nil {
return nil, err
}
m := make(map[string]struct{})
for _, s := range symbol {
m[toGlobalSymbol(s)] = struct{}{}
}
rest := e.newRest()
for k, v := range markets {
// if we provide symbol as condition then we only query the gieven symbol ,
// or we should query "ALL" symbol in the market.
if _, ok := m[toGlobalSymbol(k)]; len(symbol) != 0 && !ok {
continue
}
if err := requestLimit.Wait(ctx); err != nil {
logrus.WithError(err).Errorf("order rate limiter wait error")
}
//ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time
prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, time.Now().Add(time.Duration(-1)*time.Hour), time.Now())
if err != nil || !prices.Success || len(prices.Result) == 0 {
continue
}
lastCandle := prices.Result[0]
tickers[toGlobalSymbol(k)] = types.Ticker{
Time: lastCandle.StartTime.Time,
Volume: lastCandle.Volume,
Last: v.Last,
Open: lastCandle.Open,
High: lastCandle.High,
Low: lastCandle.Low,
Buy: v.Bid,
Sell: v.Ask,
}
}
return tickers, nil
}
func (e *Exchange) Transfer(ctx context.Context, coin string, size float64, destination string) (string, error) {

View File

@ -0,0 +1,54 @@
package ftx
import (
"context"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestExchange_QueryTickers_AllSymbols(t *testing.T) {
key := os.Getenv("FTX_API_KEY")
secret := os.Getenv("FTX_API_SECRET")
subAccount := os.Getenv("FTX_SUBACCOUNT")
if len(key) == 0 && len(secret) == 0 {
t.Skip("api key/secret are not configured")
}
e := NewExchange(key, secret, subAccount)
got, err := e.QueryTickers(context.Background())
if assert.NoError(t, err) {
assert.True(t, len(got) > 1, "binance: attempting to get all symbol tickers, but get 1 or less")
}
}
func TestExchange_QueryTickers_SomeSymbols(t *testing.T) {
key := os.Getenv("FTX_API_KEY")
secret := os.Getenv("FTX_API_SECRET")
subAccount := os.Getenv("FTX_SUBACCOUNT")
if len(key) == 0 && len(secret) == 0 {
t.Skip("api key/secret are not configured")
}
e := NewExchange(key, secret, subAccount)
got, err := e.QueryTickers(context.Background(), "BTCUSDT", "ETHUSDT")
if assert.NoError(t, err) {
assert.Len(t, got, 2, "binance: attempting to get two symbols, but number of tickers do not match")
}
}
func TestExchange_QueryTickers_SingleSymbol(t *testing.T) {
key := os.Getenv("FTX_API_KEY")
secret := os.Getenv("FTX_API_SECRET")
subAccount := os.Getenv("FTX_SUBACCOUNT")
if len(key) == 0 && len(secret) == 0 {
t.Skip("api key/secret are not configured")
}
e := NewExchange(key, secret, subAccount)
got, err := e.QueryTickers(context.Background(), "BTCUSDT")
if assert.NoError(t, err) {
assert.Len(t, got, 1, "binance: attempting to get one symbol, but number of tickers do not match")
}
}

37
pkg/service/account.go Normal file
View File

@ -0,0 +1,37 @@
package service
import (
"github.com/c9s/bbgo/pkg/types"
"github.com/jmoiron/sqlx"
"go.uber.org/multierr"
"time"
)
type AccountService struct {
DB *sqlx.DB
}
func NewAccountService(db *sqlx.DB) *AccountService {
return &AccountService{DB: db}
}
func (s *AccountService) InsertAsset(time time.Time, name types.ExchangeName, account string, assets types.AssetMap) error {
if s.DB == nil {
//skip db insert when no db connection setting.
return nil
}
var err error
for _, v := range assets {
_, _err := s.DB.Exec(`
insert into nav_history_details ( exchange, subaccount, time, currency, balance_in_usd, balance_in_btc,
balance,available,locked)
values (?,?,?,?,?,?,?,?,?);
`, name, account, time, v.Currency, v.InUSD, v.InBTC, v.Total, 0, 0 /* v.Available, v.Lock */)
err = multierr.Append(err, _err) // successful request
}
return err
}

View File

@ -31,12 +31,15 @@ func NewDefaultTotpKey() (*otp.Key, error) {
}
if len(totpAccountName) == 0 {
//unix like os
user, ok := os.LookupEnv("USER")
if !ok {
user, ok = os.LookupEnv("USERNAME")
}
if !ok {
return nil, fmt.Errorf("can not get USER env var for totp account name")
return nil, fmt.Errorf("can not get USER or USERNAME env var for totp account name")
}
totpAccountName = user

View File

@ -41,11 +41,11 @@ func (b Balance) String() string {
}
type Asset struct {
Currency string `json:"currency"`
Total fixedpoint.Value `json:"total"`
InUSD fixedpoint.Value `json:"inUSD"`
InBTC fixedpoint.Value `json:"inBTC"`
Time time.Time `json:"time"`
Currency string `json:"currency" db:"currency"`
Total fixedpoint.Value `json:"total" db:"total"`
InUSD fixedpoint.Value `json:"inUSD" db:"inUSD"`
InBTC fixedpoint.Value `json:"inBTC" db:"inBTC"`
Time time.Time `json:"time" db:"time"`
}
type AssetMap map[string]Asset