exchange/binance/features/binance.go
2024-06-26 00:59:56 +08:00

548 lines
14 KiB
Go

package features
import (
"context"
"fmt"
"git.qtrade.icu/coin-quant/exchange"
bcommon "git.qtrade.icu/coin-quant/exchange/binance/common"
. "git.qtrade.icu/coin-quant/trademodel"
gobinance "github.com/adshao/go-binance/v2"
bfutures "github.com/adshao/go-binance/v2/futures"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"sync"
"time"
)
var (
background = context.Background()
newLock sync.Mutex
)
var _ exchange.Exchange = &BinanceTrade{}
type BinanceTrade struct {
name string
api *bfutures.Client
tradeCb exchange.WatchFn
positionCb exchange.WatchFn
balanceCb exchange.WatchFn
closeCh chan bool
cancelService *bfutures.CancelAllOpenOrdersService
cancelOneService *bfutures.CancelOrderService
timeService *bfutures.ServerTimeService
klineLimit int
timeout time.Duration
wsUserListenKey string
baseCurrency string
symbols map[string]Symbol
}
func NewBinanceTrader(cfg bcommon.BinanceConfig, cltName, clientProxy string) (b *BinanceTrade, err error) {
b = new(BinanceTrade)
b.name = "binance"
if cltName == "" {
cltName = "binance"
}
b.klineLimit = 1500
b.baseCurrency = "USDT"
if cfg.Currency != "" {
b.baseCurrency = cfg.Currency
}
b.timeout = time.Second * 5
b.closeCh = make(chan bool)
newLock.Lock()
defer func() {
bfutures.UseTestnet = false
newLock.Unlock()
}()
if cfg.IsTest {
bfutures.UseTestnet = true
log.Warnf("binance trade connecting to testnet")
}
bfutures.WebsocketKeepalive = true
b.api = gobinance.NewFuturesClient(cfg.ApiKey, cfg.SecretKey)
if clientProxy != "" {
var proxyURL *url.URL
proxyURL, err = url.Parse(clientProxy)
if err != nil {
return
}
b.api.HTTPClient = &http.Client{Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)}}
websocket.DefaultDialer.Proxy = http.ProxyURL(proxyURL)
websocket.DefaultDialer.HandshakeTimeout = time.Second * 60
}
b.cancelService = b.api.NewCancelAllOpenOrdersService()
b.cancelOneService = b.api.NewCancelOrderService()
b.timeService = b.api.NewServerTimeService()
_, err = b.Symbols()
if err != nil {
return nil, err
}
// err = b.Start()
return
}
// fetchBalance different with spot
func (b *BinanceTrade) fetchBalanceAndPosition() (err error) {
ctx, cancel := context.WithTimeout(background, b.timeout)
defer cancel()
account, err := b.api.NewGetAccountService().Do(ctx)
if err != nil {
return
}
var balance Balance
balance.Balance = parseFloat(account.TotalWalletBalance)
balance.Available = parseFloat(account.TotalCrossWalletBalance)
// balance.Frozen =
if b.balanceCb != nil {
b.balanceCb(&balance)
}
if b.positionCb != nil {
var amount, profit, initMargin float64
for _, v := range account.Positions {
amount = parseFloat(v.PositionAmt)
if amount == 0 {
continue
}
var position Position
position.Symbol = v.Symbol
position.Hold = amount
position.Price = parseFloat(v.EntryPrice)
profit = parseFloat(v.UnrealizedProfit)
initMargin = parseFloat(v.PositionInitialMargin)
position.ProfitRatio = profit / initMargin
if position.Hold > 0 {
position.Type = Long
} else {
position.Type = Short
}
b.positionCb(&position)
}
}
return
}
func (b *BinanceTrade) Info() (info exchange.ExchangeInfo) {
info = exchange.ExchangeInfo{
Name: "binance_futures",
Value: "binance_futures",
Desc: "binance futures api",
KLineLimit: exchange.FetchLimit{
Limit: b.klineLimit,
},
}
return
}
func (b *BinanceTrade) Symbols() (symbols []Symbol, err error) {
ctx, cancel := context.WithTimeout(background, b.timeout)
defer cancel()
resp, err := b.api.NewExchangeInfoService().Do(ctx)
if err != nil {
return
}
symbols = make([]Symbol, len(resp.Symbols))
for i, v := range resp.Symbols {
value := Symbol{
Name: v.Symbol,
Exchange: "binance",
Symbol: v.Symbol,
Resolutions: "1m,5m,15m,30m,1h,4h,1d,1w",
Precision: v.PricePrecision,
AmountPrecision: v.QuantityPrecision,
PriceStep: 0,
AmountStep: 0,
}
for _, f := range v.Filters {
switch f["filterType"] {
case "PRICE_FILTER":
value.PriceStep = parseFloat(f["tickSize"].(string))
case "LOT_SIZE":
value.AmountStep = parseFloat(f["stepSize"].(string))
default:
}
}
symbols[i] = value
}
if len(symbols) > 0 {
symbolMap := make(map[string]Symbol)
for _, v := range symbols {
symbolMap[v.Symbol] = v
}
b.symbols = symbolMap
}
return
}
func (b *BinanceTrade) Start() (err error) {
err = b.fetchBalanceAndPosition()
if err != nil {
return
}
// watch position and order changed
err = b.startUserWS()
return
}
func (b *BinanceTrade) Stop() (err error) {
close(b.closeCh)
return
}
// KlineChan get klines
func (b *BinanceTrade) GetKline(symbol, bSize string, start, end time.Time) (data []*Candle, err error) {
var temp *Candle
ctx, cancel := context.WithTimeout(background, b.timeout)
defer cancel()
defer func() {
if err != nil && strings.Contains(err.Error(), "Too many requests") {
err = fmt.Errorf("%w, retry: %s", exchange.ErrRetry, err.Error())
}
}()
// get server time
nTime, err := b.timeService.Do(ctx)
if err != nil {
return
}
nStart := start.Unix() * 1000
nEnd := end.Unix() * 1000
klines, err := b.api.NewKlinesService().Interval(bSize).Symbol(symbol).StartTime(nStart).EndTime(nEnd).Limit(b.klineLimit).Do(ctx)
if err != nil {
return
}
sort.Slice(klines, func(i, j int) bool {
return klines[i].OpenTime < klines[j].OpenTime
})
if len(klines) == 0 {
log.Warnf("GetKline once, param: [%s]-[%s] no data", start, end)
return
}
log.Infof("GetKline once, param: [%s]-[%s], total: %d, first: %s, last: %s", start, end, len(klines), time.UnixMilli(klines[0].OpenTime), time.UnixMilli(klines[len(klines)-1].OpenTime))
data = []*Candle{}
for k, v := range klines {
temp = transCandle(v)
if k == len(klines)-1 {
// check if candle is unfinished
if v.CloseTime > nTime {
log.Infof("skip unfinished candle: %##v\n", *v)
break
}
}
data = append(data, temp)
}
return
}
func (b *BinanceTrade) handleError(typ string, cb func() error) func(error) {
return func(err error) {
log.Errorf("binance %s error:%s, call callback", typ, err.Error())
if cb != nil {
cb()
}
}
}
func (b *BinanceTrade) handleAggTradeEvent(fn exchange.WatchFn) func(evt *bfutures.WsAggTradeEvent) {
return func(evt *bfutures.WsAggTradeEvent) {
var err error
var trade Trade
trade.ID = fmt.Sprintf("%d", evt.AggregateTradeID)
trade.Amount, err = strconv.ParseFloat(evt.Quantity, 64)
if err != nil {
log.Errorf("AggTradeEvent parse amount failed: %s", evt.Quantity)
}
trade.Price, err = strconv.ParseFloat(evt.Price, 64)
if err != nil {
log.Errorf("AggTradeEvent parse amount failed: %s", evt.Quantity)
}
trade.Time = time.Unix(evt.Time/1000, (evt.Time%1000)*int64(time.Millisecond))
if fn != nil {
fn(&trade)
}
}
}
func (b *BinanceTrade) handleDepth(fn exchange.WatchFn) func(evt *bfutures.WsDepthEvent) {
return func(evt *bfutures.WsDepthEvent) {
var depth Depth
var err error
var price, amount float64
depth.UpdateTime = time.Unix(evt.TransactionTime/1000, (evt.TransactionTime%1000)*int64(time.Millisecond))
for _, v := range evt.Asks {
// depth.Sells
price, err = strconv.ParseFloat(v.Price, 64)
if err != nil {
log.Errorf("handleDepth parse price failed: %s", v.Price)
}
amount, err = strconv.ParseFloat(v.Quantity, 64)
if err != nil {
log.Errorf("handleDepth parse amount failed: %s", v.Quantity)
}
depth.Sells = append(depth.Sells, DepthInfo{Price: price, Amount: amount})
}
for _, v := range evt.Bids {
// depth.Sells
price, err = strconv.ParseFloat(v.Price, 64)
if err != nil {
log.Errorf("handleDepth parse price failed: %s", v.Price)
}
amount, err = strconv.ParseFloat(v.Quantity, 64)
if err != nil {
log.Errorf("handleDepth parse amount failed: %s", v.Quantity)
}
depth.Buys = append(depth.Buys, DepthInfo{Price: price, Amount: amount})
}
if fn != nil {
fn(&depth)
}
}
}
func (b *BinanceTrade) retry(param exchange.WatchParam, fn exchange.WatchFn) func() error {
return func() error {
// retry when error cause
select {
case <-b.closeCh:
return nil
default:
}
return b.Watch(param, fn)
}
}
func (b *BinanceTrade) Watch(param exchange.WatchParam, fn exchange.WatchFn) (err error) {
symbol := param.Param["symbol"]
var stopC chan struct{}
switch param.Type {
case exchange.WatchTypeCandle:
binSize := param.Param["bin"]
if binSize == "" {
binSize = "1m"
}
var doneC chan struct{}
finishC := make(chan struct{})
doneC, stopC, err = bfutures.WsKlineServe(symbol, binSize, processWsCandle(finishC, fn), b.handleError("watchKline", b.retry(param, fn)))
if err != nil {
log.Error("exchange emitCandle error:", err.Error())
}
go func() {
<-doneC
close(finishC)
}()
case exchange.WatchTypeDepth:
_, stopC, err = bfutures.WsPartialDepthServeWithRate(symbol, 10, 100*time.Millisecond, b.handleDepth(fn), b.handleError("depth", b.retry(param, fn)))
case exchange.WatchTypeTradeMarket:
_, stopC, err = bfutures.WsAggTradeServe(symbol, b.handleAggTradeEvent(fn), b.handleError("aggTrade", b.retry(param, fn)))
case exchange.WatchTypeTrade:
b.tradeCb = fn
case exchange.WatchTypePosition:
b.positionCb = fn
err = b.fetchBalanceAndPosition()
case exchange.WatchTypeBalance:
b.balanceCb = fn
err = b.fetchBalanceAndPosition()
default:
err = fmt.Errorf("unknown wathc param: %s", param.Type)
}
if err != nil {
return
}
if stopC != nil {
go func() {
<-b.closeCh
close(stopC)
}()
}
return
}
func (b *BinanceTrade) CancelOrder(old *Order) (order *Order, err error) {
orderID, err := strconv.ParseInt(old.OrderID, 10, 64)
if err != nil {
return
}
resp, err := b.cancelOneService.Symbol(old.Symbol).OrderID(orderID).Do(context.Background())
if err != nil {
return
}
price, err := strconv.ParseFloat(resp.Price, 64)
if err != nil {
panic(fmt.Sprintf("CancelOrder parse price %s error: %s", resp.Price, err.Error()))
}
amount, err := strconv.ParseFloat(resp.OrigQuantity, 64)
if err != nil {
panic(fmt.Sprintf("CancelOrder parse damount %s error: %s", resp.OrigQuantity, err.Error()))
}
order = &Order{
OrderID: strconv.FormatInt(resp.OrderID, 10),
Symbol: resp.Symbol,
Currency: resp.Symbol,
Amount: amount,
Price: price,
Status: strings.ToUpper(string(resp.Status)),
Side: strings.ToLower(string(resp.Side)),
Time: time.Unix(resp.UpdateTime/1000, 0),
}
return
}
func (b *BinanceTrade) ProcessOrder(act TradeAction) (ret *Order, err error) {
ctx, cancel := context.WithTimeout(background, b.timeout)
defer cancel()
orderType := bfutures.OrderTypeLimit
if act.Action.IsStop() {
orderType = bfutures.OrderTypeStopMarket
}
var side bfutures.SideType
if act.Action.IsLong() {
side = bfutures.SideTypeBuy
} else {
side = bfutures.SideTypeSell
}
symbol, ok := b.symbols[act.Symbol]
if ok {
price := symbol.FixPrice(act.Price)
if price != act.Price {
log.Infof("binance change order price form %f to %f", act.Price, price)
act.Price = price
}
}
sent := b.api.NewCreateOrderService().Symbol(act.Symbol)
if act.Action.IsStop() {
sent = sent.StopPrice(fmt.Sprintf("%f", act.Price))
} else {
sent = sent.Price(fmt.Sprintf("%f", act.Price))
}
if !act.Action.IsOpen() {
sent = sent.ReduceOnly(true)
}
resp, err := sent.
Quantity(fmt.Sprintf("%f", act.Amount)).
TimeInForce(bfutures.TimeInForceTypeGTC).
Type(orderType).
Side(side).
Do(ctx)
if err != nil {
return
}
ret = transCreateOrder(resp)
return
}
// TODO: cancel stop order
func (b *BinanceTrade) CancelAllOrders() (orders []*Order, err error) {
ctx, cancel := context.WithTimeout(background, b.timeout)
defer cancel()
ret, err := b.api.NewListOrdersService().Do(ctx)
if err != nil {
err = fmt.Errorf("CancelOrder failed with list: %w", err)
return
}
symbolMap := make(map[string]bool)
var st string
var ok bool
for _, v := range ret {
st = string(v.Status)
if st == OrderStatusFilled || st == OrderStatusCanceled {
continue
}
od := transOrder(v)
orders = append(orders, od)
_, ok = symbolMap[od.Symbol]
if ok {
continue
}
symbolMap[od.Symbol] = true
err = b.cancelService.Symbol(od.Symbol).Do(ctx)
if err != nil {
return nil, err
}
}
return
}
func transOrder(fo *bfutures.Order) (o *Order) {
price, err := strconv.ParseFloat(fo.Price, 64)
if err != nil {
panic(fmt.Sprintf("parse price %s error: %s", fo.Price, err.Error()))
}
amount, err := strconv.ParseFloat(fo.OrigQuantity, 64)
if err != nil {
panic(fmt.Sprintf("parse damount %s error: %s", fo.OrigQuantity, err.Error()))
}
o = &Order{
OrderID: strconv.FormatInt(fo.OrderID, 10),
Symbol: fo.Symbol,
Currency: fo.Symbol,
Amount: amount,
Price: price,
Status: strings.ToUpper(string(fo.Status)),
Side: strings.ToLower(string(fo.Side)),
Time: time.Unix(fo.Time/1000, 0),
}
return
}
func transCreateOrder(fo *bfutures.CreateOrderResponse) (o *Order) {
price, err := strconv.ParseFloat(fo.Price, 64)
if err != nil {
panic(fmt.Sprintf("parse price %s error: %s", fo.Price, err.Error()))
}
amount, err := strconv.ParseFloat(fo.OrigQuantity, 64)
if err != nil {
panic(fmt.Sprintf("parse damount %s error: %s", fo.OrigQuantity, err.Error()))
}
o = &Order{
OrderID: strconv.FormatInt(fo.OrderID, 10),
Symbol: fo.Symbol,
Currency: fo.Symbol,
Amount: amount,
Price: price,
Status: strings.ToUpper(string(fo.Status)),
Side: strings.ToLower(string(fo.Side)),
Time: time.Unix(fo.UpdateTime/1000, 0),
}
return
}
func transCandle(candle *bfutures.Kline) (ret *Candle) {
ret = &Candle{
ID: 0,
Start: candle.OpenTime / 1000,
Open: parseFloat(candle.Open),
High: parseFloat(candle.High),
Low: parseFloat(candle.Low),
Close: parseFloat(candle.Close),
Turnover: parseFloat(candle.QuoteAssetVolume),
Volume: parseFloat(candle.Volume),
Trades: candle.TradeNum,
}
return
}
func parseFloat(str string) float64 {
f, err := strconv.ParseFloat(str, 64)
if err != nil {
panic("binance parseFloat error:" + err.Error())
}
return f
}