138 lines
3.3 KiB
Go
138 lines
3.3 KiB
Go
package features
|
|
|
|
import (
|
|
"context"
|
|
. "git.qtrade.icu/coin-quant/trademodel"
|
|
bfutures "github.com/adshao/go-binance/v2/futures"
|
|
log "github.com/sirupsen/logrus"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
func (b *BinanceTrade) updateUserListenKey() {
|
|
var err error
|
|
ticker := time.NewTicker(time.Minute * 30)
|
|
Out:
|
|
for {
|
|
select {
|
|
case <-b.closeCh:
|
|
break Out
|
|
case <-ticker.C:
|
|
for i := 0; i < 10; i++ {
|
|
ctx, cancel := context.WithTimeout(background, b.timeout)
|
|
err = b.api.NewKeepaliveUserStreamService().ListenKey(b.wsUserListenKey).Do(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
log.Error("update listen key failed:", err.Error())
|
|
time.Sleep(time.Minute)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
if err != nil {
|
|
log.Error("update listen key failed 10 times,just exist::", err.Error())
|
|
break Out
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *BinanceTrade) startUserWS() (err error) {
|
|
ctx, cancel := context.WithTimeout(background, b.timeout)
|
|
defer cancel()
|
|
|
|
listenKey, err := b.api.NewStartUserStreamService().Do(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
b.wsUserListenKey = listenKey
|
|
|
|
doneC, stopC, err := bfutures.WsUserDataServe(listenKey, b.handleUserData, b.handleUserDataError)
|
|
if err != nil {
|
|
return
|
|
}
|
|
go func() {
|
|
select {
|
|
case <-doneC:
|
|
case <-b.closeCh:
|
|
}
|
|
close(stopC)
|
|
}()
|
|
go b.updateUserListenKey()
|
|
|
|
return
|
|
}
|
|
|
|
func (b *BinanceTrade) handleUserData(event *bfutures.WsUserDataEvent) {
|
|
switch event.Event {
|
|
case bfutures.UserDataEventTypeAccountUpdate:
|
|
var profit, total float64
|
|
var balance Balance
|
|
for _, v := range event.AccountUpdate.Balances {
|
|
if v.Asset == b.baseCurrency {
|
|
balance.Balance = parseFloat(v.Balance)
|
|
balance.Available = parseFloat(v.CrossWalletBalance)
|
|
if b.balanceCb != nil {
|
|
b.balanceCb(&balance)
|
|
}
|
|
total = balance.Balance
|
|
}
|
|
}
|
|
var pos Position
|
|
for _, v := range event.AccountUpdate.Positions {
|
|
pos.Symbol = v.Symbol
|
|
profit = parseFloat(v.UnrealizedPnL)
|
|
if v.IsolatedWallet != "" {
|
|
total = parseFloat(v.IsolatedWallet)
|
|
}
|
|
if total > 0 {
|
|
pos.ProfitRatio = profit / total
|
|
}
|
|
pos.Price = parseFloat(v.EntryPrice)
|
|
pos.Hold = parseFloat(v.Amount)
|
|
if pos.Hold > 0 {
|
|
pos.Type = Long
|
|
} else if pos.Hold < 0 {
|
|
pos.Type = Short
|
|
}
|
|
if b.positionCb != nil {
|
|
b.positionCb(&pos)
|
|
}
|
|
}
|
|
case bfutures.UserDataEventTypeOrderTradeUpdate:
|
|
var order Order
|
|
order.OrderID = strconv.FormatInt(event.OrderTradeUpdate.ID, 10)
|
|
order.Symbol = event.OrderTradeUpdate.Symbol
|
|
order.Currency = event.OrderTradeUpdate.Symbol
|
|
|
|
order.Amount = parseFloat(event.OrderTradeUpdate.OriginalQty)
|
|
order.Filled = parseFloat(event.OrderTradeUpdate.AccumulatedFilledQty)
|
|
if order.Filled == order.Amount {
|
|
order.Price = parseFloat(event.OrderTradeUpdate.AveragePrice)
|
|
} else {
|
|
order.Price = parseFloat(event.OrderTradeUpdate.OriginalPrice)
|
|
}
|
|
order.Status = string(event.OrderTradeUpdate.Status)
|
|
order.Side = string(event.OrderTradeUpdate.Side)
|
|
order.Time = time.UnixMilli(event.OrderTradeUpdate.TradeTime)
|
|
if b.tradeCb != nil {
|
|
b.tradeCb(&order)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *BinanceTrade) handleUserDataError(err error) {
|
|
log.Errorf("binance userdata error: %s,reconnect", err.Error())
|
|
doneC, stopC, err := bfutures.WsUserDataServe(b.wsUserListenKey, b.handleUserData, b.handleUserDataError)
|
|
if err != nil {
|
|
return
|
|
}
|
|
go func() {
|
|
select {
|
|
case <-doneC:
|
|
case <-b.closeCh:
|
|
}
|
|
close(stopC)
|
|
}()
|
|
}
|