Merge pull request #584 from c9s/add-nav-columns

feature: record nav values into db
This commit is contained in:
Yo-An Lin 2022-05-04 16:25:04 +08:00 committed by GitHub
commit 8cf9218dce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 478 additions and 124 deletions

View File

@ -15,8 +15,13 @@ jobs:
matrix: matrix:
redis-version: redis-version:
- 6.2 - 6.2
env:
MYSQL_DATABASE: bbgo
MYSQL_USER: "root"
MYSQL_PASSWORD: "root"
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/cache@v2 - uses: actions/cache@v2
@ -28,7 +33,12 @@ jobs:
restore-keys: | restore-keys: |
${{ runner.os }}-go- ${{ runner.os }}-go-
- name: Setup redis - name: Set up MySQL
run: |
sudo /etc/init.d/mysql start
mysql -e 'CREATE DATABASE ${{ env.MYSQL_DATABASE }};' -u${{ env.MYSQL_USER }} -p${{ env.MYSQL_PASSWORD }}
- name: Set up redis
uses: shogo82148/actions-setup-redis@v1 uses: shogo82148/actions-setup-redis@v1
with: with:
redis-version: ${{ matrix.redis-version }} redis-version: ${{ matrix.redis-version }}
@ -39,6 +49,17 @@ jobs:
with: with:
go-version: 1.17 go-version: 1.17
- name: Install Migration Tool
run: go install github.com/c9s/rockhopper/cmd/rockhopper@v1.2.1
- name: Test Migration SQL Files For MySQL
run: |
rockhopper --config rockhopper_mysql.yaml up
- name: Test Migration SQL Files For SQLite
run: |
rockhopper --config rockhopper_sqlite.yaml up
- name: Build - name: Build
run: go build -v ./cmd/bbgo run: go build -v ./cmd/bbgo

47
config/xnav.yaml Normal file
View File

@ -0,0 +1,47 @@
---
notifications:
slack:
defaultChannel: "dev-bbgo"
errorChannel: "bbgo-error"
# if you want to route channel by symbol
symbolChannels:
"^BTC": "btc"
"^ETH": "eth"
# if you want to route channel by exchange session
sessionChannels:
max: "bbgo-max"
binance: "bbgo-binance"
# routing rules
routing:
trade: "$symbol"
order: "$slient"
submitOrder: "$slient"
pnL: "bbgo-pnl"
sessions:
max:
exchange: max
envVarPrefix: max
binance:
exchange: binance
envVarPrefix: binance
persistence:
json:
directory: var/data
redis:
host: 127.0.0.1
port: 6379
db: 0
crossExchangeStrategies:
- xnav:
interval: 1h
reportOnStart: true
ignoreDusts: true

View File

@ -2,21 +2,21 @@
-- +begin -- +begin
CREATE TABLE nav_history_details CREATE TABLE nav_history_details
( (
gid bigint unsigned auto_increment PRIMARY KEY, gid bigint unsigned auto_increment PRIMARY KEY,
exchange VARCHAR(30) NOT NULL, exchange VARCHAR(30) NOT NULL,
subaccount VARCHAR(30) NOT NULL, subaccount VARCHAR(30) NOT NULL,
time DATETIME(3) NOT NULL, time DATETIME(3) NOT NULL,
currency VARCHAR(12) NOT NULL, currency VARCHAR(12) NOT NULL,
balance_in_usd DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL, balance_in_usd DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
balance_in_btc DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL, balance_in_btc DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
balance DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL, balance DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
available DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL, available DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
locked DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL locked DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL
); );
-- +end -- +end
-- +begin -- +begin
CREATE INDEX idx_nav_history_details CREATE INDEX idx_nav_history_details
on nav_history_details(time, currency, exchange); on nav_history_details (time, currency, exchange);
-- +end -- +end
-- +down -- +down

View File

@ -0,0 +1,27 @@
-- +up
-- +begin
ALTER TABLE `nav_history_details`
ADD COLUMN `session` VARCHAR(30) NOT NULL,
ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN `isolated_symbol` VARCHAR(30) NOT NULL DEFAULT '',
ADD COLUMN `net_asset` DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
ADD COLUMN `borrowed` DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,
ADD COLUMN `price_in_usd` DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL
;
-- +end
-- +down
-- +begin
ALTER TABLE `nav_history_details`
DROP COLUMN `session`,
DROP COLUMN `net_asset`,
DROP COLUMN `borrowed`,
DROP COLUMN `price_in_usd`,
DROP COLUMN `is_margin`,
DROP COLUMN `is_isolated`,
DROP COLUMN `isolated_symbol`
;
-- +end

View File

@ -0,0 +1,12 @@
-- +up
ALTER TABLE `nav_history_details` ADD COLUMN `session` VARCHAR(50) NOT NULL;
ALTER TABLE `nav_history_details` ADD COLUMN `borrowed` DECIMAL UNSIGNED DEFAULT 0.00000000 NOT NULL;
ALTER TABLE `nav_history_details` ADD COLUMN `net_asset` DECIMAL UNSIGNED DEFAULT 0.00000000 NOT NULL;
ALTER TABLE `nav_history_details` ADD COLUMN `price_in_usd` DECIMAL UNSIGNED DEFAULT 0.00000000 NOT NULL;
ALTER TABLE `nav_history_details` ADD COLUMN `is_margin` BOOL DEFAULT FALSE NOT NULL;
ALTER TABLE `nav_history_details` ADD COLUMN `is_isolated` BOOL DEFAULT FALSE NOT NULL;
ALTER TABLE `nav_history_details` ADD COLUMN `isolated_symbol` VARCHAR(30) DEFAULT '' NOT NULL;
-- +down
-- we can not rollback alter table change in sqlite
SELECT 1;

View File

@ -649,6 +649,29 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err
return nil return nil
} }
func (environ *Environment) RecordAsset(t time.Time, session *ExchangeSession, assets types.AssetMap) {
// skip for back-test
if environ.BacktestService != nil {
return
}
if environ.DatabaseService == nil || environ.AccountService == nil {
return
}
if err := environ.AccountService.InsertAsset(
t,
session.Name,
session.ExchangeName,
session.SubAccount,
session.Margin,
session.IsolatedMargin,
session.IsolatedMarginSymbol,
assets); err != nil {
log.WithError(err).Errorf("can not insert asset record")
}
}
func (environ *Environment) RecordPosition(position *types.Position, trade types.Trade, profit *types.Profit) { func (environ *Environment) RecordPosition(position *types.Position, trade types.Trade, profit *types.Profit) {
// skip for back-test // skip for back-test
if environ.BacktestService != nil { if environ.BacktestService != nil {
@ -679,17 +702,6 @@ func (environ *Environment) RecordPosition(position *types.Position, trade types
log.WithError(err).Errorf("can not insert position record") log.WithError(err).Errorf("can not insert position record")
} }
} }
// if:
// 1) we are not using sync
// 2) and not sync-ing trades from the user data stream
if environ.TradeService != nil && (environ.syncConfig == nil ||
(environ.syncConfig.UserDataStream == nil) ||
(environ.syncConfig.UserDataStream != nil && !environ.syncConfig.UserDataStream.Trades)) {
if err := environ.TradeService.Insert(trade); err != nil {
log.WithError(err).Errorf("can not insert trade record: %+v", trade)
}
}
} }
func (environ *Environment) RecordProfit(profit types.Profit) { func (environ *Environment) RecordProfit(profit types.Profit) {

View File

@ -7,6 +7,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/cache" "github.com/c9s/bbgo/pkg/cache"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -646,21 +648,18 @@ func (session *ExchangeSession) FormatOrder(order types.SubmitOrder) (types.Subm
return order, nil return order, nil
} }
func (session *ExchangeSession) UpdatePrices(ctx context.Context) (err error) { func (session *ExchangeSession) UpdatePrices(ctx context.Context, currencies []string, fiat string) (err error) {
if session.lastPriceUpdatedAt.After(time.Now().Add(-time.Hour)) { if session.lastPriceUpdatedAt.After(time.Now().Add(-time.Hour)) {
return nil return nil
} }
balances := session.GetAccount().Balances()
var symbols []string var symbols []string
for _, b := range balances { for _, c := range currencies {
symbols = append(symbols, b.Currency+"USDT") symbols = append(symbols, c+fiat) // BTC/USDT
symbols = append(symbols, "USDT"+b.Currency) symbols = append(symbols, fiat+c) // USDT/TWD
} }
tickers, err := session.Exchange.QueryTickers(ctx, symbols...) tickers, err := session.Exchange.QueryTickers(ctx, symbols...)
if err != nil || len(tickers) == 0 { if err != nil || len(tickers) == 0 {
return err return err
} }
@ -668,12 +667,7 @@ func (session *ExchangeSession) UpdatePrices(ctx context.Context) (err error) {
var lastTime time.Time var lastTime time.Time
for k, v := range tickers { for k, v := range tickers {
// for {Crypto}/USDT markets // for {Crypto}/USDT markets
if strings.HasSuffix(k, "USDT") { session.lastPrices[k] = v.Last
session.lastPrices[k] = v.Last
} else if strings.HasPrefix(k, "USDT") {
session.lastPrices[k] = fixedpoint.One.Div(v.Last)
}
if v.Time.After(lastTime) { if v.Time.After(lastTime) {
lastTime = v.Time lastTime = v.Time
} }
@ -929,3 +923,16 @@ func (session *ExchangeSession) bindConnectionStatusNotification(stream types.St
session.Notifiability.Notify("session %s %s stream connected", session.Name, streamName) session.Notifiability.Notify("session %s %s stream connected", session.Name, streamName)
}) })
} }
func (session *ExchangeSession) SlackAttachment() slack.Attachment {
var fields []slack.AttachmentField
var footerIcon = types.ExchangeFooterIcon(session.ExchangeName)
return slack.Attachment{
// Pretext: "",
// Text: text,
Title: session.Name,
Fields: fields,
FooterIcon: footerIcon,
Footer: util.Render("update time {{ . }}", time.Now().Format(time.RFC822)),
}
}

View File

@ -242,7 +242,7 @@ func (s *Slack) listen(ctx context.Context) {
innerEvent := eventsAPIEvent.InnerEvent innerEvent := eventsAPIEvent.InnerEvent
switch ev := innerEvent.Data.(type) { switch ev := innerEvent.Data.(type) {
case *slackevents.MessageEvent: case *slackevents.MessageEvent:
log.Infof("message event: text=%+v", ev.Text) log.Debugf("message event: text=%+v", ev.Text)
if len(ev.BotID) > 0 { if len(ev.BotID) > 0 {
log.Debug("skip bot message") log.Debug("skip bot message")

View File

@ -14,12 +14,12 @@ func init() {
func upAddNavHistoryDetails(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { func upAddNavHistoryDetails(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied. // This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE nav_history_details\n(\n gid bigint unsigned auto_increment PRIMARY KEY,\n exchange VARCHAR(30) NOT NULL,\n subaccount VARCHAR(30) NOT NULL,\n time DATETIME(3) NOT NULL,\n currency VARCHAR(12) NOT NULL,\n balance_in_usd DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n balance_in_btc DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n balance DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n available DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n locked DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL\n);") _, err = tx.ExecContext(ctx, "CREATE TABLE nav_history_details\n(\n gid bigint unsigned auto_increment PRIMARY KEY,\n exchange VARCHAR(30) NOT NULL,\n subaccount VARCHAR(30) NOT NULL,\n time DATETIME(3) NOT NULL,\n currency VARCHAR(12) NOT NULL,\n balance_in_usd DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n balance_in_btc DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n balance DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n available DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n locked DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL\n);")
if err != nil { if err != nil {
return err return err
} }
_, err = tx.ExecContext(ctx, "CREATE INDEX idx_nav_history_details\n on nav_history_details(time, currency, exchange);") _, err = tx.ExecContext(ctx, "CREATE INDEX idx_nav_history_details\n on nav_history_details (time, currency, exchange);")
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,34 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddMarginInfoToNav, downAddMarginInfoToNav)
}
func upAddMarginInfoToNav(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details`\n ADD COLUMN `session` VARCHAR(30) NOT NULL,\n ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE,\n ADD COLUMN `isolated_symbol` VARCHAR(30) NOT NULL DEFAULT '',\n ADD COLUMN `net_asset` DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n ADD COLUMN `borrowed` DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL,\n ADD COLUMN `price_in_usd` DECIMAL(32, 8) UNSIGNED DEFAULT 0.00000000 NOT NULL\n;")
if err != nil {
return err
}
return err
}
func downAddMarginInfoToNav(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details`\n DROP COLUMN `session`,\n DROP COLUMN `net_asset`,\n DROP COLUMN `borrowed`,\n DROP COLUMN `price_in_usd`,\n DROP COLUMN `is_margin`,\n DROP COLUMN `is_isolated`,\n DROP COLUMN `isolated_symbol`\n;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,64 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddMarginInfoToNav, downAddMarginInfoToNav)
}
func upAddMarginInfoToNav(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details` ADD COLUMN `session` VARCHAR(50) NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details` ADD COLUMN `borrowed` DECIMAL UNSIGNED DEFAULT 0.00000000 NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details` ADD COLUMN `net_asset` DECIMAL UNSIGNED DEFAULT 0.00000000 NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details` ADD COLUMN `price_in_usd` DECIMAL UNSIGNED DEFAULT 0.00000000 NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details` ADD COLUMN `is_margin` BOOL DEFAULT FALSE NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details` ADD COLUMN `is_isolated` BOOL DEFAULT FALSE NOT NULL;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `nav_history_details` ADD COLUMN `isolated_symbol` VARCHAR(30) DEFAULT '' NOT NULL;")
if err != nil {
return err
}
return err
}
func downAddMarginInfoToNav(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}

View File

@ -442,7 +442,7 @@ func genFakeAssets() types.AssetMap {
"DOTUSDT": fixedpoint.NewFromFloat(20.0), "DOTUSDT": fixedpoint.NewFromFloat(20.0),
"SANDUSDT": fixedpoint.NewFromFloat(0.13), "SANDUSDT": fixedpoint.NewFromFloat(0.13),
"MAXUSDT": fixedpoint.NewFromFloat(0.122), "MAXUSDT": fixedpoint.NewFromFloat(0.122),
}) }, time.Now())
for currency, asset := range assets { for currency, asset := range assets {
totalAssets[currency] = asset totalAssets[currency] = asset
} }
@ -460,13 +460,13 @@ func (s *Server) listAssets(c *gin.Context) {
for _, session := range s.Environ.Sessions() { for _, session := range s.Environ.Sessions() {
balances := session.GetAccount().Balances() balances := session.GetAccount().Balances()
if err := session.UpdatePrices(c); err != nil { if err := session.UpdatePrices(c, balances.Currencies(), "USDT"); err != nil {
logrus.WithError(err).Error("price update failed") logrus.WithError(err).Error("price update failed")
c.Status(http.StatusInternalServerError) c.Status(http.StatusInternalServerError)
return return
} }
assets := balances.Assets(session.LastPrices()) assets := balances.Assets(session.LastPrices(), time.Now())
for currency, asset := range assets { for currency, asset := range assets {
totalAssets[currency] = asset totalAssets[currency] = asset

View File

@ -15,20 +15,48 @@ func NewAccountService(db *sqlx.DB) *AccountService {
return &AccountService{DB: db} return &AccountService{DB: db}
} }
func (s *AccountService) InsertAsset(time time.Time, name types.ExchangeName, account string, assets types.AssetMap) error { // TODO: should pass bbgo.ExchangeSession to this function, but that might cause cyclic import
func (s *AccountService) InsertAsset(time time.Time, session string, name types.ExchangeName, account string, isMargin bool, isIsolatedMargin bool, isolatedMarginSymbol string, assets types.AssetMap) error {
if s.DB == nil { if s.DB == nil {
//skip db insert when no db connection setting. // skip db insert when no db connection setting.
return nil return nil
} }
var err error var err error
for _, v := range assets { for _, v := range assets {
_, _err := s.DB.Exec(` _, _err := s.DB.Exec(`
insert into nav_history_details ( exchange, subaccount, time, currency, balance_in_usd, balance_in_btc, INSERT INTO nav_history_details (
balance,available,locked) session,
values (?,?,?,?,?,?,?,?,?); exchange,
`, name, account, time, v.Currency, v.InUSD, v.InBTC, v.Total, v.Available, v.Locked) subaccount,
time,
currency,
balance_in_usd,
balance_in_btc,
balance,
available,
locked,
borrowed,
net_asset,
price_in_usd,
is_margin, is_isolated, isolated_symbol)
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);`,
session,
name,
account,
time,
v.Currency,
v.InUSD,
v.InBTC,
v.Total,
v.Available,
v.Locked,
v.Borrowed,
v.NetAsset,
v.PriceInUSD,
isMargin,
isIsolatedMargin,
isolatedMarginSymbol)
err = multierr.Append(err, _err) // successful request err = multierr.Append(err, _err) // successful request

View File

@ -0,0 +1,41 @@
package service
import (
"testing"
"time"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func TestAccountService(t *testing.T) {
db, err := prepareDB(t)
if err != nil {
t.Fatal(err)
}
defer db.Close()
xdb := sqlx.NewDb(db.DB, "sqlite3")
service := &AccountService{DB: xdb}
t1 := time.Now()
err = service.InsertAsset(t1, "binance", types.ExchangeBinance, "main", false, false, "", types.AssetMap{
"BTC": types.Asset{
Currency: "BTC",
Total: fixedpoint.MustNewFromString("1.0"),
InUSD: fixedpoint.MustNewFromString("10.0"),
InBTC: fixedpoint.MustNewFromString("0.0001"),
Time: t1,
Locked: fixedpoint.MustNewFromString("0"),
Available: fixedpoint.MustNewFromString("1.0"),
Borrowed: fixedpoint.MustNewFromString("0"),
NetAsset: fixedpoint.MustNewFromString("1"),
PriceInUSD: fixedpoint.MustNewFromString("44870"),
},
})
assert.NoError(t, err)
}

View File

@ -8,7 +8,7 @@ import (
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slack-go/slack" "github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
@ -21,6 +21,8 @@ const ID = "xnav"
const stateKey = "state-v1" const stateKey = "state-v1"
var log = logrus.WithField("strategy", ID)
func init() { func init() {
bbgo.RegisterStrategy(ID, &Strategy{}) bbgo.RegisterStrategy(ID, &Strategy{})
} }
@ -59,8 +61,9 @@ type Strategy struct {
Notifiability *bbgo.Notifiability Notifiability *bbgo.Notifiability
*bbgo.Graceful *bbgo.Graceful
*bbgo.Persistence *bbgo.Persistence
*bbgo.Environment
Interval types.Duration `json:"interval"` Interval types.Interval `json:"interval"`
ReportOnStart bool `json:"reportOnStart"` ReportOnStart bool `json:"reportOnStart"`
IgnoreDusts bool `json:"ignoreDusts"` IgnoreDusts bool `json:"ignoreDusts"`
state *State state *State
@ -77,44 +80,41 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {}
func (s *Strategy) recordNetAssetValue(ctx context.Context, sessions map[string]*bbgo.ExchangeSession) { func (s *Strategy) recordNetAssetValue(ctx context.Context, sessions map[string]*bbgo.ExchangeSession) {
totalAssets := types.AssetMap{} totalAssets := types.AssetMap{}
totalBalances := types.BalanceMap{} totalBalances := types.BalanceMap{}
totalBorrowed := map[string]fixedpoint.Value{} allPrices := map[string]fixedpoint.Value{}
lastPrices := map[string]fixedpoint.Value{} sessionBalances := map[string]types.BalanceMap{}
for _, session := range sessions { priceTime := time.Now()
if err := session.UpdateAccount(ctx) ; err != nil {
// iterate the sessions and record them
for sessionName, session := range sessions {
// update the account balances and the margin information
if err := session.UpdateAccount(ctx); err != nil {
log.WithError(err).Errorf("can not update account") log.WithError(err).Errorf("can not update account")
return return
} }
account := session.GetAccount() account := session.GetAccount()
balances := account.Balances() balances := account.Balances()
if err := session.UpdatePrices(ctx); err != nil { if err := session.UpdatePrices(ctx, balances.Currencies(), "USDT"); err != nil {
log.WithError(err).Error("price update failed") log.WithError(err).Error("price update failed")
return return
} }
for _, b := range balances { sessionBalances[sessionName] = balances
if tb, ok := totalBalances[b.Currency]; ok { totalBalances = totalBalances.Add(balances)
tb.Available = tb.Available.Add(b.Available)
tb.Locked = tb.Locked.Add(b.Locked)
totalBalances[b.Currency] = tb
if b.Borrowed.Sign() > 0 {
totalBorrowed[b.Currency] = totalBorrowed[b.Currency].Add(b.Borrowed)
}
} else {
totalBalances[b.Currency] = b
totalBorrowed[b.Currency] = b.Borrowed
}
}
prices := session.LastPrices() prices := session.LastPrices()
assets := balances.Assets(prices, priceTime)
// merge prices
for m, p := range prices { for m, p := range prices {
lastPrices[m] = p allPrices[m] = p
} }
s.Environment.RecordAsset(priceTime, session, assets)
} }
assets := totalBalances.Assets(lastPrices) allAssets := totalBalances.Assets(allPrices, priceTime)
for currency, asset := range assets { for currency, asset := range allAssets {
// calculated if it's dust only when InUSD (usd value) is defined. // calculated if it's dust only when InUSD (usd value) is defined.
if s.IgnoreDusts && !asset.InUSD.IsZero() && asset.InUSD.Compare(Ten) < 0 { if s.IgnoreDusts && !asset.InUSD.IsZero() && asset.InUSD.Compare(Ten) < 0 {
continue continue
@ -123,6 +123,8 @@ func (s *Strategy) recordNetAssetValue(ctx context.Context, sessions map[string]
totalAssets[currency] = asset totalAssets[currency] = asset
} }
s.Environment.RecordAsset(priceTime, &bbgo.ExchangeSession{Name: "ALL"}, totalAssets)
s.Notifiability.Notify(totalAssets) s.Notifiability.Notify(totalAssets)
if s.state != nil { if s.state != nil {
@ -171,8 +173,8 @@ func (s *Strategy) LoadState() error {
} }
func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error { func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
if s.Interval == 0 { if s.Interval == "" {
return errors.New("interval can not be zero") return errors.New("interval can not be empty")
} }
if err := s.LoadState(); err != nil { if err := s.LoadState(); err != nil {
@ -189,6 +191,15 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
s.recordNetAssetValue(ctx, sessions) s.recordNetAssetValue(ctx, sessions)
} }
if s.Environment.BacktestService != nil {
log.Warnf("xnav does not support backtesting")
}
// TODO: if interval is supported, we can use kline as the ticker
if _, ok := types.SupportedIntervals[s.Interval] ; ok {
}
go func() { go func() {
ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000)) ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000))
defer ticker.Stop() defer ticker.Stop()

View File

@ -54,13 +54,16 @@ func (b Balance) String() (o string) {
} }
type Asset struct { type Asset struct {
Currency string `json:"currency" db:"currency"` Currency string `json:"currency" db:"currency"`
Total fixedpoint.Value `json:"total" db:"total"` Total fixedpoint.Value `json:"total" db:"total"`
InUSD fixedpoint.Value `json:"inUSD" db:"inUSD"` InUSD fixedpoint.Value `json:"inUSD" db:"in_usd"`
InBTC fixedpoint.Value `json:"inBTC" db:"inBTC"` InBTC fixedpoint.Value `json:"inBTC" db:"in_btc"`
Time time.Time `json:"time" db:"time"` Time time.Time `json:"time" db:"time"`
Locked fixedpoint.Value `json:"lock" db:"lock" ` Locked fixedpoint.Value `json:"lock" db:"lock" `
Available fixedpoint.Value `json:"available" db:"available"` Available fixedpoint.Value `json:"available" db:"available"`
Borrowed fixedpoint.Value `json:"borrowed" db:"borrowed"`
NetAsset fixedpoint.Value `json:"netAsset" db:"net_asset"`
PriceInUSD fixedpoint.Value `json:"priceInUSD" db:"price_in_usd"`
} }
type AssetMap map[string]Asset type AssetMap map[string]Asset
@ -161,6 +164,27 @@ type MarginAssetMap map[string]MarginUserAsset
type FuturesAssetMap map[string]FuturesUserAsset type FuturesAssetMap map[string]FuturesUserAsset
type FuturesPositionMap map[string]FuturesPosition type FuturesPositionMap map[string]FuturesPosition
func (m BalanceMap) Currencies() (currencies []string) {
for _, b := range m {
currencies = append(currencies, b.Currency)
}
return currencies
}
func (m BalanceMap) Add(bm BalanceMap) BalanceMap {
var total = BalanceMap{}
for _, b := range bm {
tb := total[b.Currency]
tb.Available = tb.Available.Add(b.Available)
tb.Locked = tb.Locked.Add(b.Locked)
tb.Borrowed = tb.Borrowed.Add(b.Borrowed)
tb.NetAsset = tb.NetAsset.Add(b.NetAsset)
tb.Interest = tb.Interest.Add(b.Interest)
total[b.Currency] = tb
}
return total
}
func (m BalanceMap) String() string { func (m BalanceMap) String() string {
var ss []string var ss []string
for _, b := range m { for _, b := range m {
@ -178,39 +202,47 @@ func (m BalanceMap) Copy() (d BalanceMap) {
return d return d
} }
func (m BalanceMap) Assets(prices map[string]fixedpoint.Value) AssetMap { // Assets converts balances into assets with the given prices
func (m BalanceMap) Assets(prices map[string]fixedpoint.Value, priceTime time.Time) AssetMap {
assets := make(AssetMap) assets := make(AssetMap)
btcusdt, hasBtcPrice := prices["BTCUSDT"]
now := time.Now()
for currency, b := range m { for currency, b := range m {
if b.Locked.IsZero() && b.Available.IsZero() { if b.Locked.IsZero() && b.Available.IsZero() && b.Borrowed.IsZero() {
continue continue
} }
asset := Asset{ asset := Asset{
Currency: currency, Currency: currency,
Total: b.Available.Add(b.Locked), Total: b.Available.Add(b.Locked),
Time: now, Time: priceTime,
Locked: b.Locked, Locked: b.Locked,
Available: b.Available, Available: b.Available,
Borrowed: b.Borrowed,
NetAsset: b.NetAsset,
} }
btcusdt, hasBtcPrice := prices["BTCUSDT"]
usdMarkets := []string{currency + "USDT", currency + "USDC", currency + "USD", "USDT" + currency} usdMarkets := []string{currency + "USDT", currency + "USDC", currency + "USD", "USDT" + currency}
for _, market := range usdMarkets { for _, market := range usdMarkets {
if val, ok := prices[market]; ok { usdPrice, ok := prices[market]
if !ok {
continue
}
if strings.HasPrefix(market, "USD") { // this includes USDT, USD, USDC and so on
asset.InUSD = asset.Total.Div(val) if strings.HasPrefix(market, "USD") {
} else { if !asset.Total.IsZero() {
asset.InUSD = asset.Total.Mul(val) asset.InUSD = asset.Total.Div(usdPrice)
} }
asset.PriceInUSD = usdPrice
} else {
if !asset.Total.IsZero() {
asset.InUSD = asset.Total.Mul(usdPrice)
}
asset.PriceInUSD = fixedpoint.One.Div(usdPrice)
}
if hasBtcPrice { if hasBtcPrice && !asset.InUSD.IsZero() {
asset.InBTC = asset.InUSD.Div(btcusdt) asset.InBTC = asset.InUSD.Div(btcusdt)
}
} }
} }
@ -280,6 +312,8 @@ type Account struct {
balances BalanceMap balances BalanceMap
} }
type FuturesAccountInfo struct { type FuturesAccountInfo struct {
// Futures fields // Futures fields
Assets FuturesAssetMap `json:"assets"` Assets FuturesAssetMap `json:"assets"`

View File

@ -8,6 +8,19 @@ import (
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
) )
func TestBalanceMap_Add(t *testing.T) {
var bm = BalanceMap{}
var bm2 = bm.Add(BalanceMap{
"BTC": Balance{
Currency: "BTC",
Available: fixedpoint.MustNewFromString("10.0"),
Locked: fixedpoint.MustNewFromString("0"),
NetAsset: fixedpoint.MustNewFromString("10.0"),
},
})
assert.Len(t, bm2, 1)
}
func TestAccountLockAndUnlock(t *testing.T) { func TestAccountLockAndUnlock(t *testing.T) {
a := NewAccount() a := NewAccount()
a.AddBalance("USDT", fixedpoint.NewFromInt(1000)) a.AddBalance("USDT", fixedpoint.NewFromInt(1000))

View File

@ -0,0 +1,20 @@
package types
func ExchangeFooterIcon(exName ExchangeName) string {
footerIcon := ""
switch exName {
case ExchangeBinance:
footerIcon = "https://bin.bnbstatic.com/static/images/common/favicon.ico"
case ExchangeMax:
footerIcon = "https://max.maicoin.com/favicon-16x16.png"
case ExchangeFTX:
footerIcon = "https://ftx.com/favicon.ico?v=2"
case ExchangeOKEx:
footerIcon = "https://static.okex.com/cdn/assets/imgs/MjAxODg/D91A7323087D31A588E0D2A379DD7747.png"
case ExchangeKucoin:
footerIcon = "https://assets.staticimg.com/cms/media/7AV75b9jzr9S8H3eNuOuoqj8PwdUjaDQGKGczGqTS.png"
}
return footerIcon
}

View File

@ -300,7 +300,7 @@ func (o Order) SlackAttachment() slack.Attachment {
Short: true, Short: true,
}) })
footerIcon := exchangeFooterIcon(o.Exchange) footerIcon := ExchangeFooterIcon(o.Exchange)
return slack.Attachment{ return slack.Attachment{
Color: SideToColorName(o.Side), Color: SideToColorName(o.Side),

View File

@ -151,25 +151,6 @@ func (trade Trade) PlainText() string {
var slackTradeTextTemplate = ":handshake: Trade {{ .Symbol }} {{ .Side }} {{ .Quantity }} @ {{ .Price }}" var slackTradeTextTemplate = ":handshake: Trade {{ .Symbol }} {{ .Side }} {{ .Quantity }} @ {{ .Price }}"
func exchangeFooterIcon(exName ExchangeName) string {
footerIcon := ""
switch exName {
case ExchangeBinance:
footerIcon = "https://bin.bnbstatic.com/static/images/common/favicon.ico"
case ExchangeMax:
footerIcon = "https://max.maicoin.com/favicon-16x16.png"
case ExchangeFTX:
footerIcon = "https://ftx.com/favicon.ico?v=2"
case ExchangeOKEx:
footerIcon = "https://static.okex.com/cdn/assets/imgs/MjAxODg/D91A7323087D31A588E0D2A379DD7747.png"
case ExchangeKucoin:
footerIcon = "https://assets.staticimg.com/cms/media/7AV75b9jzr9S8H3eNuOuoqj8PwdUjaDQGKGczGqTS.png"
}
return footerIcon
}
func (trade Trade) SlackAttachment() slack.Attachment { func (trade Trade) SlackAttachment() slack.Attachment {
var color = "#DC143C" var color = "#DC143C"
@ -179,7 +160,7 @@ func (trade Trade) SlackAttachment() slack.Attachment {
liquidity := trade.Liquidity() liquidity := trade.Liquidity()
text := util.Render(slackTradeTextTemplate, trade) text := util.Render(slackTradeTextTemplate, trade)
footerIcon := exchangeFooterIcon(trade.Exchange) footerIcon := ExchangeFooterIcon(trade.Exchange)
return slack.Attachment{ return slack.Attachment{
Text: text, Text: text,

View File

@ -0,0 +1,2 @@
package util

View File

@ -8,7 +8,7 @@ dialect: mysql
# dsn: "root:123123@unix(/opt/local/var/run/mysql57/mysqld.sock)/bbgo_dev?parseTime=true" # dsn: "root:123123@unix(/opt/local/var/run/mysql57/mysqld.sock)/bbgo_dev?parseTime=true"
# tcp connection to mysql with password # tcp connection to mysql with password
# dsn: "root:123123@tcp(localhost:3306)/bbgo_dev?parseTime=true" dsn: "root:root@tcp(localhost:3306)/bbgo?parseTime=true"
# tcp connection to mysql without password # tcp connection to mysql without password
# dsn: "root@tcp(localhost:3306)/bbgo_dev?parseTime=true" # dsn: "root@tcp(localhost:3306)/bbgo_dev?parseTime=true"