Merge pull request #1131 from c9s/c9s/strategy/funding

strategy: xfunding: improve sync goroutine, add mutex lock, fix binance websocket message parsing ...
This commit is contained in:
Yo-An Lin 2023-03-25 03:05:46 +08:00 committed by GitHub
commit e0a9cd3c6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1119 additions and 454 deletions

2
go.mod
View File

@ -7,7 +7,7 @@ go 1.17
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Masterminds/squirrel v1.5.3
github.com/adshao/go-binance/v2 v2.3.10
github.com/adshao/go-binance/v2 v2.4.1
github.com/c-bata/goptuna v0.8.1
github.com/c9s/requestgen v1.3.0
github.com/c9s/rockhopper v1.2.2-0.20220617053729-ffdc87df194b

2
go.sum
View File

@ -51,6 +51,8 @@ github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdc
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/adshao/go-binance/v2 v2.3.10 h1:iWtHD/sQ8GK6r+cSMMdOynpGI/4Q6P5LZtiEHdWOjag=
github.com/adshao/go-binance/v2 v2.3.10/go.mod h1:Z3MCnWI0gHC4Rea8TWiF3aN1t4nV9z3CaU/TeHcKsLM=
github.com/adshao/go-binance/v2 v2.4.1 h1:fOZ2tCbN7sgDZvvsawUMjhsOoe40X87JVE4DklIyyyc=
github.com/adshao/go-binance/v2 v2.4.1/go.mod h1:6Qoh+CYcj8U43h4HgT6mqJnsGj4mWZKA/nsj8LN8ZTU=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

View File

@ -0,0 +1,33 @@
package binanceapi
import (
"net/url"
"github.com/c9s/requestgen"
)
type FuturesRestClient struct {
RestClient
}
const FuturesRestBaseURL = "https://fapi.binance.com"
func NewFuturesRestClient(baseURL string) *FuturesRestClient {
if len(baseURL) == 0 {
baseURL = FuturesRestBaseURL
}
u, err := url.Parse(baseURL)
if err != nil {
panic(err)
}
return &FuturesRestClient{
RestClient: RestClient{
BaseAPIClient: requestgen.BaseAPIClient{
BaseURL: u,
HttpClient: DefaultHttpClient,
},
},
}
}

View File

@ -0,0 +1,58 @@
package binanceapi
import (
"time"
"github.com/c9s/requestgen"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
// FuturesIncomeType can be one of the following value:
// TRANSFER, WELCOME_BONUS, REALIZED_PNL, FUNDING_FEE, COMMISSION, INSURANCE_CLEAR, REFERRAL_KICKBACK, COMMISSION_REBATE,
// API_REBATE, CONTEST_REWARD, CROSS_COLLATERAL_TRANSFER, OPTIONS_PREMIUM_FEE,
// OPTIONS_SETTLE_PROFIT, INTERNAL_TRANSFER, AUTO_EXCHANGE,
// DELIVERED_SETTELMENT, COIN_SWAP_DEPOSIT, COIN_SWAP_WITHDRAW, POSITION_LIMIT_INCREASE_FEE
type FuturesIncomeType string
const (
FuturesIncomeTransfer FuturesIncomeType = "TRANSFER"
FuturesIncomeWelcomeBonus FuturesIncomeType = "WELCOME_BONUS"
FuturesIncomeFundingFee FuturesIncomeType = "FUNDING_FEE"
FuturesIncomeRealizedPnL FuturesIncomeType = "REALIZED_PNL"
FuturesIncomeCommission FuturesIncomeType = "COMMISSION"
FuturesIncomeReferralKickback FuturesIncomeType = "REFERRAL_KICKBACK"
FuturesIncomeCommissionRebate FuturesIncomeType = "COMMISSION_REBATE"
FuturesIncomeApiRebate FuturesIncomeType = "API_REBATE"
FuturesIncomeContestReward FuturesIncomeType = "CONTEST_REWARD"
)
type FuturesIncome struct {
Symbol string `json:"symbol"`
IncomeType FuturesIncomeType `json:"incomeType"`
Income fixedpoint.Value `json:"income"`
Asset string `json:"asset"`
Info string `json:"info"`
Time types.MillisecondTimestamp `json:"time"`
TranId string `json:"tranId"`
TradeId string `json:"tradeId"`
}
//go:generate requestgen -method GET -url "/fapi/v1/income" -type FuturesGetIncomeHistoryRequest -responseType []FuturesIncome
type FuturesGetIncomeHistoryRequest struct {
client requestgen.AuthenticatedAPIClient
symbol string `param:"symbol"`
incomeType FuturesIncomeType `param:"incomeType"`
startTime *time.Time `param:"startTime,milliseconds"`
endTime *time.Time `param:"endTime,milliseconds"`
limit *uint64 `param:"limit"`
}
func (c *FuturesRestClient) NewFuturesGetIncomeHistoryRequest() *FuturesGetIncomeHistoryRequest {
return &FuturesGetIncomeHistoryRequest{client: c}
}

View File

@ -0,0 +1,212 @@
// Code generated by "requestgen -method GET -url /fapi/v1/income -type FuturesGetIncomeHistoryRequest -responseType []FuturesIncome"; DO NOT EDIT.
package binanceapi
import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"regexp"
"strconv"
"time"
)
func (f *FuturesGetIncomeHistoryRequest) Symbol(symbol string) *FuturesGetIncomeHistoryRequest {
f.symbol = symbol
return f
}
func (f *FuturesGetIncomeHistoryRequest) IncomeType(incomeType FuturesIncomeType) *FuturesGetIncomeHistoryRequest {
f.incomeType = incomeType
return f
}
func (f *FuturesGetIncomeHistoryRequest) StartTime(startTime time.Time) *FuturesGetIncomeHistoryRequest {
f.startTime = &startTime
return f
}
func (f *FuturesGetIncomeHistoryRequest) EndTime(endTime time.Time) *FuturesGetIncomeHistoryRequest {
f.endTime = &endTime
return f
}
func (f *FuturesGetIncomeHistoryRequest) Limit(limit uint64) *FuturesGetIncomeHistoryRequest {
f.limit = &limit
return f
}
// GetQueryParameters builds and checks the query parameters and returns url.Values
func (f *FuturesGetIncomeHistoryRequest) GetQueryParameters() (url.Values, error) {
var params = map[string]interface{}{}
query := url.Values{}
for _k, _v := range params {
query.Add(_k, fmt.Sprintf("%v", _v))
}
return query, nil
}
// GetParameters builds and checks the parameters and return the result in a map object
func (f *FuturesGetIncomeHistoryRequest) GetParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
// check symbol field -> json key symbol
symbol := f.symbol
// assign parameter of symbol
params["symbol"] = symbol
// check incomeType field -> json key incomeType
incomeType := f.incomeType
// TEMPLATE check-valid-values
switch incomeType {
case FuturesIncomeTransfer, FuturesIncomeWelcomeBonus, FuturesIncomeFundingFee, FuturesIncomeRealizedPnL, FuturesIncomeCommission, FuturesIncomeReferralKickback, FuturesIncomeCommissionRebate, FuturesIncomeApiRebate, FuturesIncomeContestReward:
params["incomeType"] = incomeType
default:
return nil, fmt.Errorf("incomeType value %v is invalid", incomeType)
}
// END TEMPLATE check-valid-values
// assign parameter of incomeType
params["incomeType"] = incomeType
// check startTime field -> json key startTime
if f.startTime != nil {
startTime := *f.startTime
// assign parameter of startTime
// convert time.Time to milliseconds time stamp
params["startTime"] = strconv.FormatInt(startTime.UnixNano()/int64(time.Millisecond), 10)
} else {
}
// check endTime field -> json key endTime
if f.endTime != nil {
endTime := *f.endTime
// assign parameter of endTime
// convert time.Time to milliseconds time stamp
params["endTime"] = strconv.FormatInt(endTime.UnixNano()/int64(time.Millisecond), 10)
} else {
}
// check limit field -> json key limit
if f.limit != nil {
limit := *f.limit
// assign parameter of limit
params["limit"] = limit
} else {
}
return params, nil
}
// GetParametersQuery converts the parameters from GetParameters into the url.Values format
func (f *FuturesGetIncomeHistoryRequest) GetParametersQuery() (url.Values, error) {
query := url.Values{}
params, err := f.GetParameters()
if err != nil {
return query, err
}
for _k, _v := range params {
if f.isVarSlice(_v) {
f.iterateSlice(_v, func(it interface{}) {
query.Add(_k+"[]", fmt.Sprintf("%v", it))
})
} else {
query.Add(_k, fmt.Sprintf("%v", _v))
}
}
return query, nil
}
// GetParametersJSON converts the parameters from GetParameters into the JSON format
func (f *FuturesGetIncomeHistoryRequest) GetParametersJSON() ([]byte, error) {
params, err := f.GetParameters()
if err != nil {
return nil, err
}
return json.Marshal(params)
}
// GetSlugParameters builds and checks the slug parameters and return the result in a map object
func (f *FuturesGetIncomeHistoryRequest) GetSlugParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
func (f *FuturesGetIncomeHistoryRequest) applySlugsToUrl(url string, slugs map[string]string) string {
for _k, _v := range slugs {
needleRE := regexp.MustCompile(":" + _k + "\\b")
url = needleRE.ReplaceAllString(url, _v)
}
return url
}
func (f *FuturesGetIncomeHistoryRequest) iterateSlice(slice interface{}, _f func(it interface{})) {
sliceValue := reflect.ValueOf(slice)
for _i := 0; _i < sliceValue.Len(); _i++ {
it := sliceValue.Index(_i).Interface()
_f(it)
}
}
func (f *FuturesGetIncomeHistoryRequest) isVarSlice(_v interface{}) bool {
rt := reflect.TypeOf(_v)
switch rt.Kind() {
case reflect.Slice:
return true
}
return false
}
func (f *FuturesGetIncomeHistoryRequest) GetSlugsMap() (map[string]string, error) {
slugs := map[string]string{}
params, err := f.GetSlugParameters()
if err != nil {
return slugs, nil
}
for _k, _v := range params {
slugs[_k] = fmt.Sprintf("%v", _v)
}
return slugs, nil
}
func (f *FuturesGetIncomeHistoryRequest) Do(ctx context.Context) ([]FuturesIncome, error) {
// empty params for GET operation
var params interface{}
query, err := f.GetParametersQuery()
if err != nil {
return nil, err
}
apiURL := "/fapi/v1/income"
req, err := f.client.NewAuthenticatedRequest(ctx, "GET", apiURL, query, params)
if err != nil {
return nil, err
}
response, err := f.client.SendRequest(req)
if err != nil {
return nil, err
}
var apiResponse []FuturesIncome
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}
return apiResponse, nil
}

View File

@ -0,0 +1,37 @@
package binanceapi
import (
"github.com/c9s/requestgen"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type FuturesPositionRisk struct {
EntryPrice string `json:"entryPrice"`
MarginType string `json:"marginType"`
IsAutoAddMargin string `json:"isAutoAddMargin"`
IsolatedMargin string `json:"isolatedMargin"`
Leverage fixedpoint.Value `json:"leverage"`
LiquidationPrice fixedpoint.Value `json:"liquidationPrice"`
MarkPrice fixedpoint.Value `json:"markPrice"`
MaxNotionalValue fixedpoint.Value `json:"maxNotionalValue"`
PositionAmount fixedpoint.Value `json:"positionAmt"`
Notional fixedpoint.Value `json:"notional"`
IsolatedWallet string `json:"isolatedWallet"`
Symbol string `json:"symbol"`
UnRealizedProfit fixedpoint.Value `json:"unRealizedProfit"`
PositionSide string `json:"positionSide"`
UpdateTime types.MillisecondTimestamp `json:"updateTime"`
}
//go:generate requestgen -method GET -url "/fapi/v2/positionRisk" -type FuturesGetPositionRisksRequest -responseType []FuturesPositionRisk
type FuturesGetPositionRisksRequest struct {
client requestgen.AuthenticatedAPIClient
symbol string `param:"symbol"`
}
func (c *FuturesRestClient) NewGetPositionRisksRequest() *FuturesGetPositionRisksRequest {
return &FuturesGetPositionRisksRequest{client: c}
}

View File

@ -0,0 +1,148 @@
// Code generated by "requestgen -method GET -url /fapi/v2/positionRisk -type FuturesGetPositionRisksRequest -responseType []FuturesPositionRisk"; DO NOT EDIT.
package binanceapi
import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"regexp"
)
func (f *FuturesGetPositionRisksRequest) Symbol(symbol string) *FuturesGetPositionRisksRequest {
f.symbol = symbol
return f
}
// GetQueryParameters builds and checks the query parameters and returns url.Values
func (f *FuturesGetPositionRisksRequest) GetQueryParameters() (url.Values, error) {
var params = map[string]interface{}{}
query := url.Values{}
for _k, _v := range params {
query.Add(_k, fmt.Sprintf("%v", _v))
}
return query, nil
}
// GetParameters builds and checks the parameters and return the result in a map object
func (f *FuturesGetPositionRisksRequest) GetParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
// check symbol field -> json key symbol
symbol := f.symbol
// assign parameter of symbol
params["symbol"] = symbol
return params, nil
}
// GetParametersQuery converts the parameters from GetParameters into the url.Values format
func (f *FuturesGetPositionRisksRequest) GetParametersQuery() (url.Values, error) {
query := url.Values{}
params, err := f.GetParameters()
if err != nil {
return query, err
}
for _k, _v := range params {
if f.isVarSlice(_v) {
f.iterateSlice(_v, func(it interface{}) {
query.Add(_k+"[]", fmt.Sprintf("%v", it))
})
} else {
query.Add(_k, fmt.Sprintf("%v", _v))
}
}
return query, nil
}
// GetParametersJSON converts the parameters from GetParameters into the JSON format
func (f *FuturesGetPositionRisksRequest) GetParametersJSON() ([]byte, error) {
params, err := f.GetParameters()
if err != nil {
return nil, err
}
return json.Marshal(params)
}
// GetSlugParameters builds and checks the slug parameters and return the result in a map object
func (f *FuturesGetPositionRisksRequest) GetSlugParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
func (f *FuturesGetPositionRisksRequest) applySlugsToUrl(url string, slugs map[string]string) string {
for _k, _v := range slugs {
needleRE := regexp.MustCompile(":" + _k + "\\b")
url = needleRE.ReplaceAllString(url, _v)
}
return url
}
func (f *FuturesGetPositionRisksRequest) iterateSlice(slice interface{}, _f func(it interface{})) {
sliceValue := reflect.ValueOf(slice)
for _i := 0; _i < sliceValue.Len(); _i++ {
it := sliceValue.Index(_i).Interface()
_f(it)
}
}
func (f *FuturesGetPositionRisksRequest) isVarSlice(_v interface{}) bool {
rt := reflect.TypeOf(_v)
switch rt.Kind() {
case reflect.Slice:
return true
}
return false
}
func (f *FuturesGetPositionRisksRequest) GetSlugsMap() (map[string]string, error) {
slugs := map[string]string{}
params, err := f.GetSlugParameters()
if err != nil {
return slugs, nil
}
for _k, _v := range params {
slugs[_k] = fmt.Sprintf("%v", _v)
}
return slugs, nil
}
func (f *FuturesGetPositionRisksRequest) Do(ctx context.Context) ([]FuturesPositionRisk, error) {
// empty params for GET operation
var params interface{}
query, err := f.GetParametersQuery()
if err != nil {
return nil, err
}
apiURL := "/fapi/v2/positionRisk"
req, err := f.client.NewAuthenticatedRequest(ctx, "GET", apiURL, query, params)
if err != nil {
return nil, err
}
response, err := f.client.SendRequest(req)
if err != nil {
return nil, err
}
var apiResponse []FuturesPositionRisk
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}
return apiResponse, nil
}

View File

@ -367,31 +367,6 @@ func (e *Exchange) QueryMarginBorrowHistory(ctx context.Context, asset string) e
return nil
}
func (e *Exchange) TransferFuturesAccountAsset(ctx context.Context, asset string, amount fixedpoint.Value, io types.TransferDirection) error {
req := e.client2.NewFuturesTransferRequest()
req.Asset(asset)
req.Amount(amount.String())
if io == types.TransferIn {
req.TransferType(binanceapi.FuturesTransferSpotToUsdtFutures)
} else if io == types.TransferOut {
req.TransferType(binanceapi.FuturesTransferUsdtFuturesToSpot)
} else {
return fmt.Errorf("unexpected transfer direction: %d given", io)
}
resp, err := req.Do(ctx)
switch io {
case types.TransferIn:
log.Infof("internal transfer (spot) => (futures) %s %s, transaction = %+v, err = %+v", amount.String(), asset, resp, err)
case types.TransferOut:
log.Infof("internal transfer (futures) => (spot) %s %s, transaction = %+v, err = %+v", amount.String(), asset, resp, err)
}
return err
}
// transferCrossMarginAccountAsset transfer asset to the cross margin account or to the main account
func (e *Exchange) transferCrossMarginAccountAsset(ctx context.Context, asset string, amount fixedpoint.Value, io types.TransferDirection) error {
req := e.client.NewMarginTransferService()
@ -676,42 +651,6 @@ func (e *Exchange) QuerySpotAccount(ctx context.Context) (*types.Account, error)
return a, nil
}
// QueryFuturesAccount gets the futures account balances from Binance
// Balance.Available = Wallet Balance(in Binance UI) - Used Margin
// Balance.Locked = Used Margin
func (e *Exchange) QueryFuturesAccount(ctx context.Context) (*types.Account, error) {
account, err := e.futuresClient.NewGetAccountService().Do(ctx)
if err != nil {
return nil, err
}
accountBalances, err := e.futuresClient.NewGetBalanceService().Do(ctx)
if err != nil {
return nil, err
}
var balances = map[string]types.Balance{}
for _, b := range accountBalances {
balanceAvailable := fixedpoint.Must(fixedpoint.NewFromString(b.AvailableBalance))
balanceTotal := fixedpoint.Must(fixedpoint.NewFromString(b.Balance))
unrealizedPnl := fixedpoint.Must(fixedpoint.NewFromString(b.CrossUnPnl))
balances[b.Asset] = types.Balance{
Currency: b.Asset,
Available: balanceAvailable,
Locked: balanceTotal.Sub(balanceAvailable.Sub(unrealizedPnl)),
}
}
a := &types.Account{
AccountType: types.AccountTypeFutures,
FuturesInfo: toGlobalFuturesAccountInfo(account), // In binance GO api, Account define account info which mantain []*AccountAsset and []*AccountPosition.
CanDeposit: account.CanDeposit, // if can transfer in asset
CanTrade: account.CanTrade, // if can trade
CanWithdraw: account.CanWithdraw, // if can transfer out asset
}
a.UpdateBalances(balances)
return a, nil
}
func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
var account *types.Account
var err error
@ -848,22 +787,7 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
}
if e.IsFutures {
req := e.futuresClient.NewListOrdersService().Symbol(symbol)
if lastOrderID > 0 {
req.OrderID(int64(lastOrderID))
} else {
req.StartTime(since.UnixNano() / int64(time.Millisecond))
if until.Sub(since) < 24*time.Hour {
req.EndTime(until.UnixNano() / int64(time.Millisecond))
}
}
binanceOrders, err := req.Do(ctx)
if err != nil {
return orders, err
}
return toGlobalFuturesOrders(binanceOrders, false)
return e.queryFuturesClosedOrders(ctx, symbol, since, until, lastOrderID)
}
// If orderId is set, it will get orders >= that orderId. Otherwise most recent orders are returned.
@ -898,28 +822,7 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) (err
}
if e.IsFutures {
for _, o := range orders {
var req = e.futuresClient.NewCancelOrderService()
// Mandatory
req.Symbol(o.Symbol)
if o.OrderID > 0 {
req.OrderID(int64(o.OrderID))
} else {
err = multierr.Append(err, types.NewOrderError(
fmt.Errorf("can not cancel %s order, order does not contain orderID or clientOrderID", o.Symbol),
o))
continue
}
_, err2 := req.Do(ctx)
if err2 != nil {
err = multierr.Append(err, types.NewOrderError(err2, o))
}
}
return err
return e.cancelFuturesOrders(ctx, orders...)
}
for _, o := range orders {
@ -1064,91 +967,6 @@ func (e *Exchange) submitMarginOrder(ctx context.Context, order types.SubmitOrde
return createdOrder, err
}
func (e *Exchange) submitFuturesOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) {
orderType, err := toLocalFuturesOrderType(order.Type)
if err != nil {
return nil, err
}
req := e.futuresClient.NewCreateOrderService().
Symbol(order.Symbol).
Type(orderType).
Side(futures.SideType(order.Side)).
ReduceOnly(order.ReduceOnly)
clientOrderID := newFuturesClientOrderID(order.ClientOrderID)
if len(clientOrderID) > 0 {
req.NewClientOrderID(clientOrderID)
}
// use response result format
req.NewOrderResponseType(futures.NewOrderRespTypeRESULT)
if order.Market.Symbol != "" {
req.Quantity(order.Market.FormatQuantity(order.Quantity))
} else {
// TODO report error
req.Quantity(order.Quantity.FormatString(8))
}
// set price field for limit orders
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker:
if order.Market.Symbol != "" {
req.Price(order.Market.FormatPrice(order.Price))
} else {
// TODO report error
req.Price(order.Price.FormatString(8))
}
}
// set stop price
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeStopMarket:
if order.Market.Symbol != "" {
req.StopPrice(order.Market.FormatPrice(order.StopPrice))
} else {
// TODO report error
req.StopPrice(order.StopPrice.FormatString(8))
}
}
// could be IOC or FOK
if len(order.TimeInForce) > 0 {
// TODO: check the TimeInForce value
req.TimeInForce(futures.TimeInForceType(order.TimeInForce))
} else {
switch order.Type {
case types.OrderTypeLimit, types.OrderTypeLimitMaker, types.OrderTypeStopLimit:
req.TimeInForce(futures.TimeInForceTypeGTC)
}
}
response, err := req.Do(ctx)
if err != nil {
return nil, err
}
log.Infof("futures order creation response: %+v", response)
createdOrder, err := toGlobalFuturesOrder(&futures.Order{
Symbol: response.Symbol,
OrderID: response.OrderID,
ClientOrderID: response.ClientOrderID,
Price: response.Price,
OrigQuantity: response.OrigQuantity,
ExecutedQuantity: response.ExecutedQuantity,
Status: response.Status,
TimeInForce: response.TimeInForce,
Type: response.Type,
Side: response.Side,
ReduceOnly: response.ReduceOnly,
}, false)
return createdOrder, err
}
// BBGO is a broker on Binance
const spotBrokerID = "NSUYEBKM"
@ -1179,36 +997,6 @@ func newSpotClientOrderID(originalID string) (clientOrderID string) {
return clientOrderID
}
// BBGO is a futures broker on Binance
const futuresBrokerID = "gBhMvywy"
func newFuturesClientOrderID(originalID string) (clientOrderID string) {
if originalID == types.NoClientOrderID {
return ""
}
prefix := "x-" + futuresBrokerID
prefixLen := len(prefix)
if originalID != "" {
// try to keep the whole original client order ID if user specifies it.
if prefixLen+len(originalID) > 32 {
return originalID
}
clientOrderID = prefix + originalID
return clientOrderID
}
clientOrderID = uuid.New().String()
clientOrderID = prefix + clientOrderID
if len(clientOrderID) > 32 {
return clientOrderID[0:32]
}
return clientOrderID
}
func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) {
orderType, err := toLocalOrderType(order.Type)
if err != nil {
@ -1375,60 +1163,6 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
return kLines, nil
}
func (e *Exchange) QueryFuturesKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var limit = 1000
if options.Limit > 0 {
// default limit == 1000
limit = options.Limit
}
log.Infof("querying kline %s %s %v", symbol, interval, options)
req := e.futuresClient.NewKlinesService().
Symbol(symbol).
Interval(string(interval)).
Limit(limit)
if options.StartTime != nil {
req.StartTime(options.StartTime.UnixMilli())
}
if options.EndTime != nil {
req.EndTime(options.EndTime.UnixMilli())
}
resp, err := req.Do(ctx)
if err != nil {
return nil, err
}
var kLines []types.KLine
for _, k := range resp {
kLines = append(kLines, types.KLine{
Exchange: types.ExchangeBinance,
Symbol: symbol,
Interval: interval,
StartTime: types.NewTimeFromUnix(0, k.OpenTime*int64(time.Millisecond)),
EndTime: types.NewTimeFromUnix(0, k.CloseTime*int64(time.Millisecond)),
Open: fixedpoint.MustNewFromString(k.Open),
Close: fixedpoint.MustNewFromString(k.Close),
High: fixedpoint.MustNewFromString(k.High),
Low: fixedpoint.MustNewFromString(k.Low),
Volume: fixedpoint.MustNewFromString(k.Volume),
QuoteVolume: fixedpoint.MustNewFromString(k.QuoteAssetVolume),
TakerBuyBaseAssetVolume: fixedpoint.MustNewFromString(k.TakerBuyBaseAssetVolume),
TakerBuyQuoteAssetVolume: fixedpoint.MustNewFromString(k.TakerBuyQuoteAssetVolume),
LastTradeID: 0,
NumberOfTrades: uint64(k.TradeNum),
Closed: true,
})
}
kLines = types.SortKLinesAscending(kLines)
return kLines, nil
}
func (e *Exchange) queryMarginTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
var remoteTrades []*binance.TradeV3
req := e.client.NewListMarginTradesService().
@ -1478,55 +1212,6 @@ func (e *Exchange) queryMarginTrades(ctx context.Context, symbol string, options
return trades, nil
}
func (e *Exchange) queryFuturesTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
var remoteTrades []*futures.AccountTrade
req := e.futuresClient.NewListAccountTradeService().
Symbol(symbol)
if options.Limit > 0 {
req.Limit(int(options.Limit))
} else {
req.Limit(1000)
}
// BINANCE uses inclusive last trade ID
if options.LastTradeID > 0 {
req.FromID(int64(options.LastTradeID))
}
// The parameter fromId cannot be sent with startTime or endTime.
// Mentioned in binance futures docs
if options.LastTradeID <= 0 {
if options.StartTime != nil && options.EndTime != nil {
if options.EndTime.Sub(*options.StartTime) < 24*time.Hour {
req.StartTime(options.StartTime.UnixMilli())
req.EndTime(options.EndTime.UnixMilli())
} else {
req.StartTime(options.StartTime.UnixMilli())
}
} else if options.EndTime != nil {
req.EndTime(options.EndTime.UnixMilli())
}
}
remoteTrades, err = req.Do(ctx)
if err != nil {
return nil, err
}
for _, t := range remoteTrades {
localTrade, err := toGlobalFuturesTrade(*t)
if err != nil {
log.WithError(err).Errorf("can not convert binance futures trade: %+v", t)
continue
}
trades = append(trades, *localTrade)
}
trades = types.SortTradesAscending(trades)
return trades, nil
}
func (e *Exchange) querySpotTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
req := e.client2.NewGetMyTradesRequest()
req.Symbol(symbol)
@ -1589,33 +1274,51 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
// DefaultFeeRates returns the Binance VIP 0 fee schedule
// See also https://www.binance.com/en/fee/schedule
// See futures fee at: https://www.binance.com/en/fee/futureFee
func (e *Exchange) DefaultFeeRates() types.ExchangeFee {
if e.IsFutures {
return types.ExchangeFee{
MakerFeeRate: fixedpoint.NewFromFloat(0.01 * 0.075), // 0.075%
TakerFeeRate: fixedpoint.NewFromFloat(0.01 * 0.075), // 0.075%
MakerFeeRate: fixedpoint.NewFromFloat(0.01 * 0.0180), // 0.0180% -USDT with BNB
TakerFeeRate: fixedpoint.NewFromFloat(0.01 * 0.0360), // 0.0360% -USDT with BNB
}
}
return types.ExchangeFee{
MakerFeeRate: fixedpoint.NewFromFloat(0.01 * 0.075), // 0.075% with BNB
TakerFeeRate: fixedpoint.NewFromFloat(0.01 * 0.075), // 0.075% with BNB
}
}
// QueryDepth query the order book depth of a symbol
func (e *Exchange) QueryDepth(ctx context.Context, symbol string) (snapshot types.SliceOrderBook, finalUpdateID int64, err error) {
var response *binance.DepthResponse
if e.IsFutures {
func (e *Exchange) queryFuturesDepth(ctx context.Context, symbol string) (snapshot types.SliceOrderBook, finalUpdateID int64, err error) {
res, err := e.futuresClient.NewDepthService().Symbol(symbol).Do(ctx)
if err != nil {
return snapshot, finalUpdateID, err
}
response = &binance.DepthResponse{
response := &binance.DepthResponse{
LastUpdateID: res.LastUpdateID,
Bids: res.Bids,
Asks: res.Asks,
}
} else {
response, err = e.client.NewDepthService().Symbol(symbol).Do(ctx)
return convertDepth(snapshot, symbol, finalUpdateID, response)
}
// QueryDepth query the order book depth of a symbol
func (e *Exchange) QueryDepth(ctx context.Context, symbol string) (snapshot types.SliceOrderBook, finalUpdateID int64, err error) {
if e.IsFutures {
return e.queryFuturesDepth(ctx, symbol)
}
response, err := e.client.NewDepthService().Symbol(symbol).Do(ctx)
if err != nil {
return snapshot, finalUpdateID, err
}
}
return convertDepth(snapshot, symbol, finalUpdateID, response)
}
func convertDepth(snapshot types.SliceOrderBook, symbol string, finalUpdateID int64, response *binance.DepthResponse) (types.SliceOrderBook, int64, error) {
snapshot.Symbol = symbol
finalUpdateID = response.LastUpdateID
for _, entry := range response.Bids {

View File

@ -0,0 +1,351 @@
package binance
import (
"context"
"fmt"
"time"
"github.com/adshao/go-binance/v2/futures"
"github.com/google/uuid"
"go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/exchange/binance/binanceapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func (e *Exchange) queryFuturesClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
req := e.futuresClient.NewListOrdersService().Symbol(symbol)
if lastOrderID > 0 {
req.OrderID(int64(lastOrderID))
} else {
req.StartTime(since.UnixNano() / int64(time.Millisecond))
if until.Sub(since) < 24*time.Hour {
req.EndTime(until.UnixNano() / int64(time.Millisecond))
}
}
binanceOrders, err := req.Do(ctx)
if err != nil {
return orders, err
}
return toGlobalFuturesOrders(binanceOrders, false)
}
func (e *Exchange) TransferFuturesAccountAsset(ctx context.Context, asset string, amount fixedpoint.Value, io types.TransferDirection) error {
req := e.client2.NewFuturesTransferRequest()
req.Asset(asset)
req.Amount(amount.String())
if io == types.TransferIn {
req.TransferType(binanceapi.FuturesTransferSpotToUsdtFutures)
} else if io == types.TransferOut {
req.TransferType(binanceapi.FuturesTransferUsdtFuturesToSpot)
} else {
return fmt.Errorf("unexpected transfer direction: %d given", io)
}
resp, err := req.Do(ctx)
switch io {
case types.TransferIn:
log.Infof("internal transfer (spot) => (futures) %s %s, transaction = %+v, err = %+v", amount.String(), asset, resp, err)
case types.TransferOut:
log.Infof("internal transfer (futures) => (spot) %s %s, transaction = %+v, err = %+v", amount.String(), asset, resp, err)
}
return err
}
// QueryFuturesAccount gets the futures account balances from Binance
// Balance.Available = Wallet Balance(in Binance UI) - Used Margin
// Balance.Locked = Used Margin
func (e *Exchange) QueryFuturesAccount(ctx context.Context) (*types.Account, error) {
account, err := e.futuresClient.NewGetAccountService().Do(ctx)
if err != nil {
return nil, err
}
accountBalances, err := e.futuresClient.NewGetBalanceService().Do(ctx)
if err != nil {
return nil, err
}
var balances = map[string]types.Balance{}
for _, b := range accountBalances {
balanceAvailable := fixedpoint.Must(fixedpoint.NewFromString(b.AvailableBalance))
balanceTotal := fixedpoint.Must(fixedpoint.NewFromString(b.Balance))
unrealizedPnl := fixedpoint.Must(fixedpoint.NewFromString(b.CrossUnPnl))
balances[b.Asset] = types.Balance{
Currency: b.Asset,
Available: balanceAvailable,
Locked: balanceTotal.Sub(balanceAvailable.Sub(unrealizedPnl)),
}
}
a := &types.Account{
AccountType: types.AccountTypeFutures,
FuturesInfo: toGlobalFuturesAccountInfo(account), // In binance GO api, Account define account info which mantain []*AccountAsset and []*AccountPosition.
CanDeposit: account.CanDeposit, // if can transfer in asset
CanTrade: account.CanTrade, // if can trade
CanWithdraw: account.CanWithdraw, // if can transfer out asset
}
a.UpdateBalances(balances)
return a, nil
}
func (e *Exchange) cancelFuturesOrders(ctx context.Context, orders ...types.Order) (err error) {
for _, o := range orders {
var req = e.futuresClient.NewCancelOrderService()
// Mandatory
req.Symbol(o.Symbol)
if o.OrderID > 0 {
req.OrderID(int64(o.OrderID))
} else {
err = multierr.Append(err, types.NewOrderError(
fmt.Errorf("can not cancel %s order, order does not contain orderID or clientOrderID", o.Symbol),
o))
continue
}
_, err2 := req.Do(ctx)
if err2 != nil {
err = multierr.Append(err, types.NewOrderError(err2, o))
}
}
return err
}
func (e *Exchange) submitFuturesOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) {
orderType, err := toLocalFuturesOrderType(order.Type)
if err != nil {
return nil, err
}
req := e.futuresClient.NewCreateOrderService().
Symbol(order.Symbol).
Type(orderType).
Side(futures.SideType(order.Side)).
ReduceOnly(order.ReduceOnly)
clientOrderID := newFuturesClientOrderID(order.ClientOrderID)
if len(clientOrderID) > 0 {
req.NewClientOrderID(clientOrderID)
}
// use response result format
req.NewOrderResponseType(futures.NewOrderRespTypeRESULT)
if order.Market.Symbol != "" {
req.Quantity(order.Market.FormatQuantity(order.Quantity))
} else {
// TODO report error
req.Quantity(order.Quantity.FormatString(8))
}
// set price field for limit orders
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker:
if order.Market.Symbol != "" {
req.Price(order.Market.FormatPrice(order.Price))
} else {
// TODO report error
req.Price(order.Price.FormatString(8))
}
}
// set stop price
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeStopMarket:
if order.Market.Symbol != "" {
req.StopPrice(order.Market.FormatPrice(order.StopPrice))
} else {
// TODO report error
req.StopPrice(order.StopPrice.FormatString(8))
}
}
// could be IOC or FOK
if len(order.TimeInForce) > 0 {
// TODO: check the TimeInForce value
req.TimeInForce(futures.TimeInForceType(order.TimeInForce))
} else {
switch order.Type {
case types.OrderTypeLimit, types.OrderTypeLimitMaker, types.OrderTypeStopLimit:
req.TimeInForce(futures.TimeInForceTypeGTC)
}
}
response, err := req.Do(ctx)
if err != nil {
return nil, err
}
log.Infof("futures order creation response: %+v", response)
createdOrder, err := toGlobalFuturesOrder(&futures.Order{
Symbol: response.Symbol,
OrderID: response.OrderID,
ClientOrderID: response.ClientOrderID,
Price: response.Price,
OrigQuantity: response.OrigQuantity,
ExecutedQuantity: response.ExecutedQuantity,
Status: response.Status,
TimeInForce: response.TimeInForce,
Type: response.Type,
Side: response.Side,
ReduceOnly: response.ReduceOnly,
}, false)
return createdOrder, err
}
func (e *Exchange) QueryFuturesKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var limit = 1000
if options.Limit > 0 {
// default limit == 1000
limit = options.Limit
}
log.Infof("querying kline %s %s %v", symbol, interval, options)
req := e.futuresClient.NewKlinesService().
Symbol(symbol).
Interval(string(interval)).
Limit(limit)
if options.StartTime != nil {
req.StartTime(options.StartTime.UnixMilli())
}
if options.EndTime != nil {
req.EndTime(options.EndTime.UnixMilli())
}
resp, err := req.Do(ctx)
if err != nil {
return nil, err
}
var kLines []types.KLine
for _, k := range resp {
kLines = append(kLines, types.KLine{
Exchange: types.ExchangeBinance,
Symbol: symbol,
Interval: interval,
StartTime: types.NewTimeFromUnix(0, k.OpenTime*int64(time.Millisecond)),
EndTime: types.NewTimeFromUnix(0, k.CloseTime*int64(time.Millisecond)),
Open: fixedpoint.MustNewFromString(k.Open),
Close: fixedpoint.MustNewFromString(k.Close),
High: fixedpoint.MustNewFromString(k.High),
Low: fixedpoint.MustNewFromString(k.Low),
Volume: fixedpoint.MustNewFromString(k.Volume),
QuoteVolume: fixedpoint.MustNewFromString(k.QuoteAssetVolume),
TakerBuyBaseAssetVolume: fixedpoint.MustNewFromString(k.TakerBuyBaseAssetVolume),
TakerBuyQuoteAssetVolume: fixedpoint.MustNewFromString(k.TakerBuyQuoteAssetVolume),
LastTradeID: 0,
NumberOfTrades: uint64(k.TradeNum),
Closed: true,
})
}
kLines = types.SortKLinesAscending(kLines)
return kLines, nil
}
func (e *Exchange) queryFuturesTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
var remoteTrades []*futures.AccountTrade
req := e.futuresClient.NewListAccountTradeService().
Symbol(symbol)
if options.Limit > 0 {
req.Limit(int(options.Limit))
} else {
req.Limit(1000)
}
// BINANCE uses inclusive last trade ID
if options.LastTradeID > 0 {
req.FromID(int64(options.LastTradeID))
}
// The parameter fromId cannot be sent with startTime or endTime.
// Mentioned in binance futures docs
if options.LastTradeID <= 0 {
if options.StartTime != nil && options.EndTime != nil {
if options.EndTime.Sub(*options.StartTime) < 24*time.Hour {
req.StartTime(options.StartTime.UnixMilli())
req.EndTime(options.EndTime.UnixMilli())
} else {
req.StartTime(options.StartTime.UnixMilli())
}
} else if options.EndTime != nil {
req.EndTime(options.EndTime.UnixMilli())
}
}
remoteTrades, err = req.Do(ctx)
if err != nil {
return nil, err
}
for _, t := range remoteTrades {
localTrade, err := toGlobalFuturesTrade(*t)
if err != nil {
log.WithError(err).Errorf("can not convert binance futures trade: %+v", t)
continue
}
trades = append(trades, *localTrade)
}
trades = types.SortTradesAscending(trades)
return trades, nil
}
func (e *Exchange) QueryFuturesPositionRisks(ctx context.Context, symbol string) error {
req := e.futuresClient.NewGetPositionRiskService()
req.Symbol(symbol)
res, err := req.Do(ctx)
if err != nil {
return err
}
_ = res
return nil
}
// BBGO is a futures broker on Binance
const futuresBrokerID = "gBhMvywy"
func newFuturesClientOrderID(originalID string) (clientOrderID string) {
if originalID == types.NoClientOrderID {
return ""
}
prefix := "x-" + futuresBrokerID
prefixLen := len(prefix)
if originalID != "" {
// try to keep the whole original client order ID if user specifies it.
if prefixLen+len(originalID) > 32 {
return originalID
}
clientOrderID = prefix + originalID
return clientOrderID
}
clientOrderID = uuid.New().String()
clientOrderID = prefix + clientOrderID
if len(clientOrderID) > 32 {
return clientOrderID[0:32]
}
return clientOrderID
}

View File

@ -15,6 +15,11 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
type EventBase struct {
Event string `json:"e"` // event name
Time int64 `json:"E"` // event time
}
/*
executionReport
@ -314,22 +319,40 @@ func parseWebSocketEvent(message []byte) (interface{}, error) {
case "depthUpdate":
return parseDepthEvent(val)
case "markPriceUpdate":
var event MarkPriceUpdateEvent
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "listenKeyExpired":
var event ListenKeyExpired
err = json.Unmarshal([]byte(message), &event)
return &event, err
// Binance futures data --------------
case "trade":
var event MarketTradeEvent
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "aggTrade":
var event AggTradeEvent
err = json.Unmarshal([]byte(message), &event)
return &event, err
}
// futures stream
switch eventType {
// futures market data stream
// ========================================================
case "continuousKline":
var event ContinuousKLineEvent
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "markPriceUpdate":
var event MarkPriceUpdateEvent
err = json.Unmarshal([]byte(message), &event)
return &event, err
// futures user data stream
// ========================================================
case "ORDER_TRADE_UPDATE":
var event OrderTradeUpdateEvent
err = json.Unmarshal([]byte(message), &event)
@ -347,13 +370,8 @@ func parseWebSocketEvent(message []byte) (interface{}, error) {
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "trade":
var event MarketTradeEvent
err = json.Unmarshal([]byte(message), &event)
return &event, err
case "aggTrade":
var event AggTradeEvent
case "MARGIN_CALL":
var event MarginCallEvent
err = json.Unmarshal([]byte(message), &event)
return &event, err
@ -891,34 +909,94 @@ func (e *OrderTradeUpdateEvent) TradeFutures() (*types.Trade, error) {
}, nil
}
type AccountUpdate struct {
EventReasonType string `json:"m"`
Balances []*futures.Balance `json:"B,omitempty"`
Positions []*futures.AccountPosition `json:"P,omitempty"`
type FuturesStreamBalance struct {
Asset string `json:"a"`
WalletBalance fixedpoint.Value `json:"wb"`
CrossWalletBalance fixedpoint.Value `json:"cw"`
BalanceChange fixedpoint.Value `json:"bc"`
}
type FuturesStreamPosition struct {
Symbol string `json:"s"`
PositionAmount fixedpoint.Value `json:"pa"`
EntryPrice fixedpoint.Value `json:"ep"`
AccumulatedRealizedPnL fixedpoint.Value `json:"cr"` // (Pre-fee) Accumulated Realized PnL
UnrealizedPnL fixedpoint.Value `json:"up"`
MarginType string `json:"mt"`
IsolatedWallet fixedpoint.Value `json:"iw"`
PositionSide string `json:"ps"`
}
type AccountUpdateEventReasonType string
const (
AccountUpdateEventReasonDeposit AccountUpdateEventReasonType = "DEPOSIT"
AccountUpdateEventReasonWithdraw AccountUpdateEventReasonType = "WITHDRAW"
AccountUpdateEventReasonOrder AccountUpdateEventReasonType = "ORDER"
AccountUpdateEventReasonFundingFee AccountUpdateEventReasonType = "FUNDING_FEE"
AccountUpdateEventReasonMarginTransfer AccountUpdateEventReasonType = "MARGIN_TRANSFER"
AccountUpdateEventReasonMarginTypeChange AccountUpdateEventReasonType = "MARGIN_TYPE_CHANGE"
AccountUpdateEventReasonAssetTransfer AccountUpdateEventReasonType = "ASSET_TRANSFER"
AccountUpdateEventReasonAdminDeposit AccountUpdateEventReasonType = "ADMIN_DEPOSIT"
AccountUpdateEventReasonAdminWithdraw AccountUpdateEventReasonType = "ADMIN_WITHDRAW"
)
type AccountUpdate struct {
// m: DEPOSIT WITHDRAW
// ORDER FUNDING_FEE
// WITHDRAW_REJECT ADJUSTMENT
// INSURANCE_CLEAR
// ADMIN_DEPOSIT ADMIN_WITHDRAW
// MARGIN_TRANSFER MARGIN_TYPE_CHANGE
// ASSET_TRANSFER
// OPTIONS_PREMIUM_FEE OPTIONS_SETTLE_PROFIT
// AUTO_EXCHANGE
// COIN_SWAP_DEPOSIT COIN_SWAP_WITHDRAW
EventReasonType AccountUpdateEventReasonType `json:"m"`
Balances []FuturesStreamBalance `json:"B,omitempty"`
Positions []FuturesStreamPosition `json:"P,omitempty"`
}
type MarginCallEvent struct {
EventBase
CrossWalletBalance fixedpoint.Value `json:"cw"`
P []struct {
Symbol string `json:"s"`
PositionSide string `json:"ps"`
PositionAmount fixedpoint.Value `json:"pa"`
MarginType string `json:"mt"`
IsolatedWallet fixedpoint.Value `json:"iw"`
MarkPrice fixedpoint.Value `json:"mp"`
UnrealizedPnL fixedpoint.Value `json:"up"`
MaintenanceMarginRequired fixedpoint.Value `json:"mm"`
} `json:"p"` // Position(s) of Margin Call
}
// AccountUpdateEvent is only used in the futures user data stream
type AccountUpdateEvent struct {
EventBase
Transaction int64 `json:"T"`
AccountUpdate AccountUpdate `json:"a"`
}
type AccountConfig struct {
Symbol string `json:"s"`
Leverage fixedpoint.Value `json:"l"`
}
type AccountConfigUpdateEvent struct {
EventBase
Transaction int64 `json:"T"`
AccountConfig AccountConfig `json:"ac"`
}
// When the leverage of a trade pair changes,
// the payload will contain the object ac to represent the account configuration of the trade pair,
// where s represents the specific trade pair and l represents the leverage
AccountConfig struct {
Symbol string `json:"s"`
Leverage fixedpoint.Value `json:"l"`
} `json:"ac"`
type EventBase struct {
Event string `json:"e"` // event
Time int64 `json:"E"`
// When the user Multi-Assets margin mode changes the payload will contain the object ai representing the user account configuration,
// where j represents the user Multi-Assets margin mode
MarginModeConfig struct {
MultiAssetsMode bool `json:"j"`
} `json:"ai"`
}
type BookTickerEvent struct {

View File

@ -46,25 +46,28 @@ type Stream struct {
kLineEventCallbacks []func(e *KLineEvent)
kLineClosedEventCallbacks []func(e *KLineEvent)
markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent)
marketTradeEventCallbacks []func(e *MarketTradeEvent)
aggTradeEventCallbacks []func(e *AggTradeEvent)
continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)
continuousKLineClosedEventCallbacks []func(e *ContinuousKLineEvent)
balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent)
outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent)
outboundAccountPositionEventCallbacks []func(event *OutboundAccountPositionEvent)
executionReportEventCallbacks []func(event *ExecutionReportEvent)
bookTickerEventCallbacks []func(event *BookTickerEvent)
// futures market data stream
markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent)
continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)
continuousKLineClosedEventCallbacks []func(e *ContinuousKLineEvent)
// futures user data stream event callbacks
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
accountUpdateEventCallbacks []func(e *AccountUpdateEvent)
accountConfigUpdateEventCallbacks []func(e *AccountConfigUpdateEvent)
marginCallEventCallbacks []func(e *MarginCallEvent)
listenKeyExpiredCallbacks []func(e *ListenKeyExpired)
// depthBuffers is used for storing the depth info
depthBuffers map[string]*depth.Buffer
}
@ -123,10 +126,12 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
stream.OnMarketTradeEvent(stream.handleMarketTradeEvent)
stream.OnAggTradeEvent(stream.handleAggTradeEvent)
// Futures User Data Stream
// ===================================
// Event type ACCOUNT_UPDATE from user data stream updates Balance and FuturesPosition.
stream.OnAccountUpdateEvent(stream.handleAccountUpdateEvent)
stream.OnAccountConfigUpdateEvent(stream.handleAccountConfigUpdateEvent)
stream.OnOrderTradeUpdateEvent(stream.handleOrderTradeUpdateEvent)
// ===================================
stream.OnDisconnect(stream.handleDisconnect)
stream.OnConnect(stream.handleConnect)
stream.OnListenKeyExpired(func(e *ListenKeyExpired) {
@ -246,18 +251,6 @@ func (s *Stream) handleOutboundAccountPositionEvent(e *OutboundAccountPositionEv
s.EmitBalanceSnapshot(snapshot)
}
func (s *Stream) handleAccountUpdateEvent(e *AccountUpdateEvent) {
futuresPositionSnapshot := toGlobalFuturesPositions(e.AccountUpdate.Positions)
s.EmitFuturesPositionSnapshot(futuresPositionSnapshot)
balanceSnapshot := toGlobalFuturesBalance(e.AccountUpdate.Balances)
s.EmitBalanceSnapshot(balanceSnapshot)
}
// TODO: emit account config leverage updates
func (s *Stream) handleAccountConfigUpdateEvent(e *AccountConfigUpdateEvent) {
}
func (s *Stream) handleOrderTradeUpdateEvent(e *OrderTradeUpdateEvent) {
switch e.OrderTrade.CurrentExecutionType {
@ -381,6 +374,8 @@ func (s *Stream) dispatchEvent(e interface{}) {
case *ListenKeyExpired:
s.EmitListenKeyExpired(e)
case *MarginCallEvent:
}
}

View File

@ -34,16 +34,6 @@ func (s *Stream) EmitKLineClosedEvent(e *KLineEvent) {
}
}
func (s *Stream) OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent)) {
s.markPriceUpdateEventCallbacks = append(s.markPriceUpdateEventCallbacks, cb)
}
func (s *Stream) EmitMarkPriceUpdateEvent(e *MarkPriceUpdateEvent) {
for _, cb := range s.markPriceUpdateEventCallbacks {
cb(e)
}
}
func (s *Stream) OnMarketTradeEvent(cb func(e *MarketTradeEvent)) {
s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb)
}
@ -64,26 +54,6 @@ func (s *Stream) EmitAggTradeEvent(e *AggTradeEvent) {
}
}
func (s *Stream) OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent)) {
s.continuousKLineEventCallbacks = append(s.continuousKLineEventCallbacks, cb)
}
func (s *Stream) EmitContinuousKLineEvent(e *ContinuousKLineEvent) {
for _, cb := range s.continuousKLineEventCallbacks {
cb(e)
}
}
func (s *Stream) OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent)) {
s.continuousKLineClosedEventCallbacks = append(s.continuousKLineClosedEventCallbacks, cb)
}
func (s *Stream) EmitContinuousKLineClosedEvent(e *ContinuousKLineEvent) {
for _, cb := range s.continuousKLineClosedEventCallbacks {
cb(e)
}
}
func (s *Stream) OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) {
s.balanceUpdateEventCallbacks = append(s.balanceUpdateEventCallbacks, cb)
}
@ -134,6 +104,36 @@ func (s *Stream) EmitBookTickerEvent(event *BookTickerEvent) {
}
}
func (s *Stream) OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent)) {
s.markPriceUpdateEventCallbacks = append(s.markPriceUpdateEventCallbacks, cb)
}
func (s *Stream) EmitMarkPriceUpdateEvent(e *MarkPriceUpdateEvent) {
for _, cb := range s.markPriceUpdateEventCallbacks {
cb(e)
}
}
func (s *Stream) OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent)) {
s.continuousKLineEventCallbacks = append(s.continuousKLineEventCallbacks, cb)
}
func (s *Stream) EmitContinuousKLineEvent(e *ContinuousKLineEvent) {
for _, cb := range s.continuousKLineEventCallbacks {
cb(e)
}
}
func (s *Stream) OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent)) {
s.continuousKLineClosedEventCallbacks = append(s.continuousKLineClosedEventCallbacks, cb)
}
func (s *Stream) EmitContinuousKLineClosedEvent(e *ContinuousKLineEvent) {
for _, cb := range s.continuousKLineClosedEventCallbacks {
cb(e)
}
}
func (s *Stream) OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) {
s.orderTradeUpdateEventCallbacks = append(s.orderTradeUpdateEventCallbacks, cb)
}
@ -164,6 +164,16 @@ func (s *Stream) EmitAccountConfigUpdateEvent(e *AccountConfigUpdateEvent) {
}
}
func (s *Stream) OnMarginCallEvent(cb func(e *MarginCallEvent)) {
s.marginCallEventCallbacks = append(s.marginCallEventCallbacks, cb)
}
func (s *Stream) EmitMarginCallEvent(e *MarginCallEvent) {
for _, cb := range s.marginCallEventCallbacks {
cb(e)
}
}
func (s *Stream) OnListenKeyExpired(cb func(e *ListenKeyExpired)) {
s.listenKeyExpiredCallbacks = append(s.listenKeyExpiredCallbacks, cb)
}
@ -181,16 +191,10 @@ type StreamEventHub interface {
OnKLineClosedEvent(cb func(e *KLineEvent))
OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent))
OnMarketTradeEvent(cb func(e *MarketTradeEvent))
OnAggTradeEvent(cb func(e *AggTradeEvent))
OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent))
OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent))
OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent))
OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent))
@ -201,11 +205,19 @@ type StreamEventHub interface {
OnBookTickerEvent(cb func(event *BookTickerEvent))
OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent))
OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent))
OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent))
OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent))
OnAccountUpdateEvent(cb func(e *AccountUpdateEvent))
OnAccountConfigUpdateEvent(cb func(e *AccountConfigUpdateEvent))
OnMarginCallEvent(cb func(e *MarginCallEvent))
OnListenKeyExpired(cb func(e *ListenKeyExpired))
}

View File

@ -333,9 +333,24 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
})
s.futuresSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(kline types.KLine) {
// s.queryAndDetectPremiumIndex(ctx, binanceFutures)
s.queryAndDetectPremiumIndex(ctx, binanceFutures)
}))
if binanceStream, ok := s.futuresSession.UserDataStream.(*binance.Stream); ok {
binanceStream.OnAccountUpdateEvent(func(e *binance.AccountUpdateEvent) {
log.Infof("onAccountUpdateEvent: %+v", e)
switch e.AccountUpdate.EventReasonType {
case binance.AccountUpdateEventReasonDeposit:
case binance.AccountUpdateEventReasonWithdraw:
case binance.AccountUpdateEventReasonFundingFee:
}
})
}
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
@ -346,17 +361,25 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
return
case <-ticker.C:
s.queryAndDetectPremiumIndex(ctx, binanceFutures)
s.sync(ctx)
s.syncSpotAccount(ctx)
}
}
}()
// TODO: use go routine and time.Ticker to trigger spot sync and futures sync
/*
s.spotSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(k types.KLine) {
}))
*/
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.syncFuturesAccount(ctx)
}
}
}()
return nil
}
@ -375,14 +398,21 @@ func (s *Strategy) queryAndDetectPremiumIndex(ctx context.Context, binanceFuture
}
}
func (s *Strategy) sync(ctx context.Context) {
func (s *Strategy) syncSpotAccount(ctx context.Context) {
switch s.getPositionState() {
case PositionOpening:
s.increaseSpotPosition(ctx)
case PositionClosing:
s.syncSpotPosition(ctx)
}
}
func (s *Strategy) syncFuturesAccount(ctx context.Context) {
switch s.getPositionState() {
case PositionOpening:
s.syncFuturesPosition(ctx)
case PositionClosing:
s.reduceFuturesPosition(ctx)
s.syncSpotPosition(ctx)
}
}
@ -622,8 +652,10 @@ func (s *Strategy) increaseSpotPosition(ctx context.Context) {
}
s.mu.Lock()
defer s.mu.Unlock()
if s.State.UsedQuoteInvestment.Compare(s.QuoteInvestment) >= 0 {
usedQuoteInvestment := s.State.UsedQuoteInvestment
s.mu.Unlock()
if usedQuoteInvestment.Compare(s.QuoteInvestment) >= 0 {
// stop increase the position
s.setPositionState(PositionReady)
@ -640,7 +672,7 @@ func (s *Strategy) increaseSpotPosition(ctx context.Context) {
return
}
leftQuota := s.QuoteInvestment.Sub(s.State.UsedQuoteInvestment)
leftQuota := s.QuoteInvestment.Sub(usedQuoteInvestment)
orderPrice := ticker.Buy
orderQuantity := fixedpoint.Min(s.IncrementalQuoteQuantity, leftQuota).Div(orderPrice)
@ -686,16 +718,21 @@ func (s *Strategy) detectPremiumIndex(premiumIndex *types.PremiumIndex) bool {
switch s.getPositionState() {
case PositionClosed:
if fundingRate.Compare(s.ShortFundingRate.High) >= 0 {
if fundingRate.Compare(s.ShortFundingRate.High) < 0 {
return false
}
log.Infof("funding rate %s is higher than the High threshold %s, start opening position...",
fundingRate.Percentage(), s.ShortFundingRate.High.Percentage())
s.startOpeningPosition(types.PositionShort, premiumIndex.Time)
return true
}
case PositionReady:
if fundingRate.Compare(s.ShortFundingRate.Low) <= 0 {
if fundingRate.Compare(s.ShortFundingRate.Low) > 0 {
return false
}
log.Infof("funding rate %s is lower than the Low threshold %s, start closing position...",
fundingRate.Percentage(), s.ShortFundingRate.Low.Percentage())
@ -708,7 +745,6 @@ func (s *Strategy) detectPremiumIndex(premiumIndex *types.PremiumIndex) bool {
s.startClosingPosition()
return true
}
}
return false
}