548 lines
14 KiB
Go
548 lines
14 KiB
Go
|
package features
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"git.qtrade.icu/coin-quant/exchange"
|
||
|
bcommon "git.qtrade.icu/coin-quant/exchange/binance/common"
|
||
|
. "git.qtrade.icu/coin-quant/trademodel"
|
||
|
gobinance "github.com/adshao/go-binance/v2"
|
||
|
bfutures "github.com/adshao/go-binance/v2/futures"
|
||
|
"github.com/gorilla/websocket"
|
||
|
log "github.com/sirupsen/logrus"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"sort"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
background = context.Background()
|
||
|
newLock sync.Mutex
|
||
|
)
|
||
|
|
||
|
var _ exchange.Exchange = &BinanceTrade{}
|
||
|
|
||
|
type BinanceTrade struct {
|
||
|
name string
|
||
|
api *bfutures.Client
|
||
|
|
||
|
tradeCb exchange.WatchFn
|
||
|
positionCb exchange.WatchFn
|
||
|
balanceCb exchange.WatchFn
|
||
|
closeCh chan bool
|
||
|
|
||
|
cancelService *bfutures.CancelAllOpenOrdersService
|
||
|
cancelOneService *bfutures.CancelOrderService
|
||
|
timeService *bfutures.ServerTimeService
|
||
|
|
||
|
klineLimit int
|
||
|
timeout time.Duration
|
||
|
|
||
|
wsUserListenKey string
|
||
|
|
||
|
baseCurrency string
|
||
|
symbols map[string]Symbol
|
||
|
}
|
||
|
|
||
|
func NewBinanceTrader(cfg bcommon.BinanceConfig, cltName, clientProxy string) (b *BinanceTrade, err error) {
|
||
|
b = new(BinanceTrade)
|
||
|
b.name = "binance"
|
||
|
if cltName == "" {
|
||
|
cltName = "binance"
|
||
|
}
|
||
|
b.klineLimit = 1500
|
||
|
b.baseCurrency = "USDT"
|
||
|
if cfg.Currency != "" {
|
||
|
b.baseCurrency = cfg.Currency
|
||
|
}
|
||
|
|
||
|
b.timeout = time.Second * 5
|
||
|
b.closeCh = make(chan bool)
|
||
|
|
||
|
newLock.Lock()
|
||
|
defer func() {
|
||
|
bfutures.UseTestnet = false
|
||
|
newLock.Unlock()
|
||
|
}()
|
||
|
if cfg.IsTest {
|
||
|
bfutures.UseTestnet = true
|
||
|
log.Warnf("binance trade connecting to testnet")
|
||
|
}
|
||
|
bfutures.WebsocketKeepalive = true
|
||
|
b.api = gobinance.NewFuturesClient(cfg.ApiKey, cfg.SecretKey)
|
||
|
if clientProxy != "" {
|
||
|
var proxyURL *url.URL
|
||
|
proxyURL, err = url.Parse(clientProxy)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
b.api.HTTPClient = &http.Client{Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)}}
|
||
|
|
||
|
websocket.DefaultDialer.Proxy = http.ProxyURL(proxyURL)
|
||
|
websocket.DefaultDialer.HandshakeTimeout = time.Second * 60
|
||
|
}
|
||
|
b.cancelService = b.api.NewCancelAllOpenOrdersService()
|
||
|
b.cancelOneService = b.api.NewCancelOrderService()
|
||
|
b.timeService = b.api.NewServerTimeService()
|
||
|
_, err = b.Symbols()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
// err = b.Start()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// fetchBalance different with spot
|
||
|
func (b *BinanceTrade) fetchBalanceAndPosition() (err error) {
|
||
|
ctx, cancel := context.WithTimeout(background, b.timeout)
|
||
|
defer cancel()
|
||
|
account, err := b.api.NewGetAccountService().Do(ctx)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
var balance Balance
|
||
|
balance.Balance = parseFloat(account.TotalWalletBalance)
|
||
|
balance.Available = parseFloat(account.TotalCrossWalletBalance)
|
||
|
// balance.Frozen =
|
||
|
if b.balanceCb != nil {
|
||
|
b.balanceCb(&balance)
|
||
|
}
|
||
|
if b.positionCb != nil {
|
||
|
var amount, profit, initMargin float64
|
||
|
for _, v := range account.Positions {
|
||
|
amount = parseFloat(v.PositionAmt)
|
||
|
if amount == 0 {
|
||
|
continue
|
||
|
}
|
||
|
var position Position
|
||
|
position.Symbol = v.Symbol
|
||
|
position.Hold = amount
|
||
|
position.Price = parseFloat(v.EntryPrice)
|
||
|
profit = parseFloat(v.UnrealizedProfit)
|
||
|
initMargin = parseFloat(v.PositionInitialMargin)
|
||
|
position.ProfitRatio = profit / initMargin
|
||
|
if position.Hold > 0 {
|
||
|
position.Type = Long
|
||
|
} else {
|
||
|
position.Type = Short
|
||
|
}
|
||
|
b.positionCb(&position)
|
||
|
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) Info() (info exchange.ExchangeInfo) {
|
||
|
info = exchange.ExchangeInfo{
|
||
|
Name: "binance_futures",
|
||
|
Value: "binance_futures",
|
||
|
Desc: "binance futures api",
|
||
|
KLineLimit: exchange.FetchLimit{
|
||
|
Limit: b.klineLimit,
|
||
|
},
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) Symbols() (symbols []Symbol, err error) {
|
||
|
ctx, cancel := context.WithTimeout(background, b.timeout)
|
||
|
defer cancel()
|
||
|
resp, err := b.api.NewExchangeInfoService().Do(ctx)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
symbols = make([]Symbol, len(resp.Symbols))
|
||
|
for i, v := range resp.Symbols {
|
||
|
value := Symbol{
|
||
|
Name: v.Symbol,
|
||
|
Exchange: "binance",
|
||
|
Symbol: v.Symbol,
|
||
|
Resolutions: "1m,5m,15m,30m,1h,4h,1d,1w",
|
||
|
Precision: v.PricePrecision,
|
||
|
AmountPrecision: v.QuantityPrecision,
|
||
|
PriceStep: 0,
|
||
|
AmountStep: 0,
|
||
|
}
|
||
|
for _, f := range v.Filters {
|
||
|
switch f["filterType"] {
|
||
|
case "PRICE_FILTER":
|
||
|
value.PriceStep = parseFloat(f["tickSize"].(string))
|
||
|
case "LOT_SIZE":
|
||
|
value.AmountStep = parseFloat(f["stepSize"].(string))
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
symbols[i] = value
|
||
|
}
|
||
|
if len(symbols) > 0 {
|
||
|
symbolMap := make(map[string]Symbol)
|
||
|
for _, v := range symbols {
|
||
|
symbolMap[v.Symbol] = v
|
||
|
}
|
||
|
b.symbols = symbolMap
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) Start() (err error) {
|
||
|
err = b.fetchBalanceAndPosition()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
// watch position and order changed
|
||
|
err = b.startUserWS()
|
||
|
return
|
||
|
}
|
||
|
func (b *BinanceTrade) Stop() (err error) {
|
||
|
close(b.closeCh)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// KlineChan get klines
|
||
|
func (b *BinanceTrade) GetKline(symbol, bSize string, start, end time.Time) (data []*Candle, err error) {
|
||
|
var temp *Candle
|
||
|
ctx, cancel := context.WithTimeout(background, b.timeout)
|
||
|
defer cancel()
|
||
|
defer func() {
|
||
|
if err != nil && strings.Contains(err.Error(), "Too many requests") {
|
||
|
err = fmt.Errorf("%w, retry: %s", exchange.ErrRetry, err.Error())
|
||
|
}
|
||
|
}()
|
||
|
// get server time
|
||
|
nTime, err := b.timeService.Do(ctx)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
nStart := start.Unix() * 1000
|
||
|
nEnd := end.Unix() * 1000
|
||
|
|
||
|
klines, err := b.api.NewKlinesService().Interval(bSize).Symbol(symbol).StartTime(nStart).EndTime(nEnd).Limit(b.klineLimit).Do(ctx)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
sort.Slice(klines, func(i, j int) bool {
|
||
|
return klines[i].OpenTime < klines[j].OpenTime
|
||
|
})
|
||
|
if len(klines) == 0 {
|
||
|
log.Warnf("GetKline once, param: [%s]-[%s] no data", start, end)
|
||
|
return
|
||
|
}
|
||
|
log.Infof("GetKline once, param: [%s]-[%s], total: %d, first: %s, last: %s", start, end, len(klines), time.UnixMilli(klines[0].OpenTime), time.UnixMilli(klines[len(klines)-1].OpenTime))
|
||
|
data = []*Candle{}
|
||
|
for k, v := range klines {
|
||
|
temp = transCandle(v)
|
||
|
if k == len(klines)-1 {
|
||
|
// check if candle is unfinished
|
||
|
if v.CloseTime > nTime {
|
||
|
log.Infof("skip unfinished candle: %##v\n", *v)
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
data = append(data, temp)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) handleError(typ string, cb func() error) func(error) {
|
||
|
return func(err error) {
|
||
|
log.Errorf("binance %s error:%s, call callback", typ, err.Error())
|
||
|
if cb != nil {
|
||
|
cb()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) handleAggTradeEvent(fn exchange.WatchFn) func(evt *bfutures.WsAggTradeEvent) {
|
||
|
return func(evt *bfutures.WsAggTradeEvent) {
|
||
|
var err error
|
||
|
var trade Trade
|
||
|
trade.ID = fmt.Sprintf("%d", evt.AggregateTradeID)
|
||
|
trade.Amount, err = strconv.ParseFloat(evt.Quantity, 64)
|
||
|
if err != nil {
|
||
|
log.Errorf("AggTradeEvent parse amount failed: %s", evt.Quantity)
|
||
|
}
|
||
|
trade.Price, err = strconv.ParseFloat(evt.Price, 64)
|
||
|
if err != nil {
|
||
|
log.Errorf("AggTradeEvent parse amount failed: %s", evt.Quantity)
|
||
|
}
|
||
|
trade.Time = time.Unix(evt.Time/1000, (evt.Time%1000)*int64(time.Millisecond))
|
||
|
if fn != nil {
|
||
|
fn(&trade)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) handleDepth(fn exchange.WatchFn) func(evt *bfutures.WsDepthEvent) {
|
||
|
return func(evt *bfutures.WsDepthEvent) {
|
||
|
var depth Depth
|
||
|
var err error
|
||
|
var price, amount float64
|
||
|
depth.UpdateTime = time.Unix(evt.TransactionTime/1000, (evt.TransactionTime%1000)*int64(time.Millisecond))
|
||
|
for _, v := range evt.Asks {
|
||
|
// depth.Sells
|
||
|
price, err = strconv.ParseFloat(v.Price, 64)
|
||
|
if err != nil {
|
||
|
log.Errorf("handleDepth parse price failed: %s", v.Price)
|
||
|
}
|
||
|
amount, err = strconv.ParseFloat(v.Quantity, 64)
|
||
|
if err != nil {
|
||
|
log.Errorf("handleDepth parse amount failed: %s", v.Quantity)
|
||
|
}
|
||
|
depth.Sells = append(depth.Sells, DepthInfo{Price: price, Amount: amount})
|
||
|
}
|
||
|
for _, v := range evt.Bids {
|
||
|
// depth.Sells
|
||
|
price, err = strconv.ParseFloat(v.Price, 64)
|
||
|
if err != nil {
|
||
|
log.Errorf("handleDepth parse price failed: %s", v.Price)
|
||
|
}
|
||
|
amount, err = strconv.ParseFloat(v.Quantity, 64)
|
||
|
if err != nil {
|
||
|
log.Errorf("handleDepth parse amount failed: %s", v.Quantity)
|
||
|
}
|
||
|
depth.Buys = append(depth.Buys, DepthInfo{Price: price, Amount: amount})
|
||
|
}
|
||
|
if fn != nil {
|
||
|
fn(&depth)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) retry(param exchange.WatchParam, fn exchange.WatchFn) func() error {
|
||
|
return func() error {
|
||
|
// retry when error cause
|
||
|
select {
|
||
|
case <-b.closeCh:
|
||
|
return nil
|
||
|
default:
|
||
|
}
|
||
|
return b.Watch(param, fn)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) Watch(param exchange.WatchParam, fn exchange.WatchFn) (err error) {
|
||
|
symbol := param.Param["symbol"]
|
||
|
var stopC chan struct{}
|
||
|
switch param.Type {
|
||
|
case exchange.WatchTypeCandle:
|
||
|
binSize := param.Param["bin"]
|
||
|
if binSize == "" {
|
||
|
binSize = "1m"
|
||
|
}
|
||
|
var doneC chan struct{}
|
||
|
finishC := make(chan struct{})
|
||
|
doneC, stopC, err = bfutures.WsKlineServe(symbol, binSize, processWsCandle(finishC, fn), b.handleError("watchKline", b.retry(param, fn)))
|
||
|
if err != nil {
|
||
|
log.Error("exchange emitCandle error:", err.Error())
|
||
|
}
|
||
|
go func() {
|
||
|
<-doneC
|
||
|
close(finishC)
|
||
|
}()
|
||
|
case exchange.WatchTypeDepth:
|
||
|
_, stopC, err = bfutures.WsPartialDepthServeWithRate(symbol, 10, 100*time.Millisecond, b.handleDepth(fn), b.handleError("depth", b.retry(param, fn)))
|
||
|
case exchange.WatchTypeTradeMarket:
|
||
|
_, stopC, err = bfutures.WsAggTradeServe(symbol, b.handleAggTradeEvent(fn), b.handleError("aggTrade", b.retry(param, fn)))
|
||
|
case exchange.WatchTypeTrade:
|
||
|
b.tradeCb = fn
|
||
|
case exchange.WatchTypePosition:
|
||
|
b.positionCb = fn
|
||
|
err = b.fetchBalanceAndPosition()
|
||
|
case exchange.WatchTypeBalance:
|
||
|
b.balanceCb = fn
|
||
|
err = b.fetchBalanceAndPosition()
|
||
|
default:
|
||
|
err = fmt.Errorf("unknown wathc param: %s", param.Type)
|
||
|
}
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
if stopC != nil {
|
||
|
go func() {
|
||
|
<-b.closeCh
|
||
|
close(stopC)
|
||
|
}()
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) CancelOrder(old *Order) (order *Order, err error) {
|
||
|
orderID, err := strconv.ParseInt(old.OrderID, 10, 64)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
resp, err := b.cancelOneService.Symbol(old.Symbol).OrderID(orderID).Do(context.Background())
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
price, err := strconv.ParseFloat(resp.Price, 64)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("CancelOrder parse price %s error: %s", resp.Price, err.Error()))
|
||
|
}
|
||
|
amount, err := strconv.ParseFloat(resp.OrigQuantity, 64)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("CancelOrder parse damount %s error: %s", resp.OrigQuantity, err.Error()))
|
||
|
}
|
||
|
order = &Order{
|
||
|
OrderID: strconv.FormatInt(resp.OrderID, 10),
|
||
|
Symbol: resp.Symbol,
|
||
|
Currency: resp.Symbol,
|
||
|
Amount: amount,
|
||
|
Price: price,
|
||
|
Status: strings.ToUpper(string(resp.Status)),
|
||
|
Side: strings.ToLower(string(resp.Side)),
|
||
|
Time: time.Unix(resp.UpdateTime/1000, 0),
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (b *BinanceTrade) ProcessOrder(act TradeAction) (ret *Order, err error) {
|
||
|
ctx, cancel := context.WithTimeout(background, b.timeout)
|
||
|
defer cancel()
|
||
|
orderType := bfutures.OrderTypeLimit
|
||
|
if act.Action.IsStop() {
|
||
|
orderType = bfutures.OrderTypeStopMarket
|
||
|
}
|
||
|
var side bfutures.SideType
|
||
|
if act.Action.IsLong() {
|
||
|
side = bfutures.SideTypeBuy
|
||
|
} else {
|
||
|
side = bfutures.SideTypeSell
|
||
|
}
|
||
|
symbol, ok := b.symbols[act.Symbol]
|
||
|
if ok {
|
||
|
price := symbol.FixPrice(act.Price)
|
||
|
if price != act.Price {
|
||
|
log.Infof("binance change order price form %f to %f", act.Price, price)
|
||
|
act.Price = price
|
||
|
}
|
||
|
}
|
||
|
|
||
|
sent := b.api.NewCreateOrderService().Symbol(act.Symbol)
|
||
|
if act.Action.IsStop() {
|
||
|
sent = sent.StopPrice(fmt.Sprintf("%f", act.Price))
|
||
|
} else {
|
||
|
sent = sent.Price(fmt.Sprintf("%f", act.Price))
|
||
|
}
|
||
|
if !act.Action.IsOpen() {
|
||
|
sent = sent.ReduceOnly(true)
|
||
|
}
|
||
|
resp, err := sent.
|
||
|
Quantity(fmt.Sprintf("%f", act.Amount)).
|
||
|
TimeInForce(bfutures.TimeInForceTypeGTC).
|
||
|
Type(orderType).
|
||
|
Side(side).
|
||
|
Do(ctx)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
ret = transCreateOrder(resp)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// TODO: cancel stop order
|
||
|
func (b *BinanceTrade) CancelAllOrders() (orders []*Order, err error) {
|
||
|
ctx, cancel := context.WithTimeout(background, b.timeout)
|
||
|
defer cancel()
|
||
|
ret, err := b.api.NewListOrdersService().Do(ctx)
|
||
|
if err != nil {
|
||
|
err = fmt.Errorf("CancelOrder failed with list: %w", err)
|
||
|
return
|
||
|
}
|
||
|
symbolMap := make(map[string]bool)
|
||
|
var st string
|
||
|
var ok bool
|
||
|
for _, v := range ret {
|
||
|
st = string(v.Status)
|
||
|
if st == OrderStatusFilled || st == OrderStatusCanceled {
|
||
|
continue
|
||
|
}
|
||
|
od := transOrder(v)
|
||
|
orders = append(orders, od)
|
||
|
_, ok = symbolMap[od.Symbol]
|
||
|
if ok {
|
||
|
continue
|
||
|
}
|
||
|
symbolMap[od.Symbol] = true
|
||
|
err = b.cancelService.Symbol(od.Symbol).Do(ctx)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func transOrder(fo *bfutures.Order) (o *Order) {
|
||
|
price, err := strconv.ParseFloat(fo.Price, 64)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("parse price %s error: %s", fo.Price, err.Error()))
|
||
|
}
|
||
|
amount, err := strconv.ParseFloat(fo.OrigQuantity, 64)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("parse damount %s error: %s", fo.OrigQuantity, err.Error()))
|
||
|
}
|
||
|
o = &Order{
|
||
|
OrderID: strconv.FormatInt(fo.OrderID, 10),
|
||
|
Symbol: fo.Symbol,
|
||
|
Currency: fo.Symbol,
|
||
|
Amount: amount,
|
||
|
Price: price,
|
||
|
Status: strings.ToUpper(string(fo.Status)),
|
||
|
Side: strings.ToLower(string(fo.Side)),
|
||
|
Time: time.Unix(fo.Time/1000, 0),
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func transCreateOrder(fo *bfutures.CreateOrderResponse) (o *Order) {
|
||
|
price, err := strconv.ParseFloat(fo.Price, 64)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("parse price %s error: %s", fo.Price, err.Error()))
|
||
|
}
|
||
|
amount, err := strconv.ParseFloat(fo.OrigQuantity, 64)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("parse damount %s error: %s", fo.OrigQuantity, err.Error()))
|
||
|
}
|
||
|
o = &Order{
|
||
|
OrderID: strconv.FormatInt(fo.OrderID, 10),
|
||
|
Symbol: fo.Symbol,
|
||
|
Currency: fo.Symbol,
|
||
|
Amount: amount,
|
||
|
Price: price,
|
||
|
Status: strings.ToUpper(string(fo.Status)),
|
||
|
Side: strings.ToLower(string(fo.Side)),
|
||
|
Time: time.Unix(fo.UpdateTime/1000, 0),
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func transCandle(candle *bfutures.Kline) (ret *Candle) {
|
||
|
ret = &Candle{
|
||
|
ID: 0,
|
||
|
Start: candle.OpenTime / 1000,
|
||
|
Open: parseFloat(candle.Open),
|
||
|
High: parseFloat(candle.High),
|
||
|
Low: parseFloat(candle.Low),
|
||
|
Close: parseFloat(candle.Close),
|
||
|
Turnover: parseFloat(candle.QuoteAssetVolume),
|
||
|
Volume: parseFloat(candle.Volume),
|
||
|
Trades: candle.TradeNum,
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func parseFloat(str string) float64 {
|
||
|
f, err := strconv.ParseFloat(str, 64)
|
||
|
if err != nil {
|
||
|
panic("binance parseFloat error:" + err.Error())
|
||
|
}
|
||
|
return f
|
||
|
}
|