fix margin order/trade sync

This commit is contained in:
c9s 2021-01-20 02:09:12 +08:00
parent d3f6841a27
commit c79c7d1b11
7 changed files with 84 additions and 54 deletions

View File

@ -24,6 +24,7 @@ var log = logrus.WithFields(logrus.Fields{
func init() {
_ = types.Exchange(&Exchange{})
_ = types.MarginExchange(&Exchange{})
if ok, _ := strconv.ParseBool(os.Getenv("DEBUG_BINANCE_STREAM")); ok {
log.Level = logrus.DebugLevel
@ -31,7 +32,7 @@ func init() {
}
type Exchange struct {
MarginSettings
types.MarginSettings
Client *binance.Client
}
@ -294,9 +295,9 @@ func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
}
func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
if e.useMargin {
if e.IsMargin {
req := e.Client.NewListMarginOpenOrdersService().Symbol(symbol)
req.IsIsolated(e.useMarginIsolated)
req.IsIsolated(e.IsIsolatedMargin)
binanceOrders, err := req.Do(ctx)
if err != nil {
@ -320,10 +321,11 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
}
time.Sleep(3 * time.Second)
log.Infof("querying closed orders %s from %s <=> %s ...", symbol, since, until)
if e.useMargin {
if e.IsMargin {
req := e.Client.NewListMarginOrdersService().Symbol(symbol)
req.IsIsolated(e.useMarginIsolated)
req.IsIsolated(e.IsIsolatedMargin)
if lastOrderID > 0 {
req.OrderID(int64(lastOrderID))
@ -340,7 +342,6 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
return ToGlobalOrders(binanceOrders)
}
log.Infof("querying closed orders %s from %s <=> %s ...", symbol, since, until)
req := e.Client.NewListOrdersService().
Symbol(symbol)
@ -402,8 +403,8 @@ func (e *Exchange) submitMarginOrder(ctx context.Context, order types.SubmitOrde
// use response result format
req.NewOrderRespType(binance.NewOrderRespTypeRESULT)
if e.useMarginIsolated {
req.IsIsolated(e.useMarginIsolated)
if e.IsIsolatedMargin {
req.IsIsolated(e.IsIsolatedMargin)
}
if len(order.MarginSideEffect) > 0 {
@ -540,7 +541,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder
for _, order := range orders {
var createdOrder *types.Order
if e.useMargin {
if e.IsMargin {
createdOrder, err = e.submitMarginOrder(ctx, order)
} else {
createdOrder, err = e.submitSpotOrder(ctx, order)
@ -616,9 +617,9 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
var remoteTrades []*binance.TradeV3
if e.useMargin {
if e.IsMargin {
req := e.Client.NewListMarginTradesService().
IsIsolated(e.useMarginIsolated).
IsIsolated(e.IsIsolatedMargin).
Symbol(symbol)
if options.Limit > 0 {
@ -665,7 +666,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}
for _, t := range remoteTrades {
localTrade, err := ToGlobalTrade(*t, e.useMargin)
localTrade, err := ToGlobalTrade(*t, e.IsMargin)
if err != nil {
log.WithError(err).Errorf("can not convert binance trade: %+v", t)
continue

View File

@ -1,17 +0,0 @@
package binance
type MarginSettings struct {
useMargin bool
useMarginIsolated bool
useMarginIsolatedSymbol string
}
func (e *MarginSettings) UseMargin() {
e.useMargin = true
}
func (e *MarginSettings) UseIsolatedMargin(symbol string) {
e.useMargin = true
e.useMarginIsolated = true
e.useMarginIsolatedSymbol = symbol
}

View File

@ -30,7 +30,7 @@ type StreamRequest struct {
//go:generate callbackgen -type Stream -interface
type Stream struct {
MarginSettings
types.MarginSettings
types.StandardStream
@ -191,11 +191,11 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
}
func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
if s.useMargin {
if s.useMarginIsolated {
log.Infof("isolated margin %s is enabled, requesting margin user stream listen key...", s.useMarginIsolatedSymbol)
if s.IsMargin {
if s.IsIsolatedMargin {
log.Infof("isolated margin %s is enabled, requesting margin user stream listen key...", s.IsolatedMarginSymbol)
req := s.Client.NewStartIsolatedMarginUserStreamService()
req.Symbol(s.useMarginIsolatedSymbol)
req.Symbol(s.IsolatedMarginSymbol)
return req.Do(ctx)
}
@ -208,10 +208,10 @@ func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
}
func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error {
if s.useMargin {
if s.useMarginIsolated {
if s.IsMargin {
if s.IsIsolatedMargin {
req := s.Client.NewKeepaliveIsolatedMarginUserStreamService().ListenKey(listenKey)
req.Symbol(s.useMarginIsolatedSymbol)
req.Symbol(s.IsolatedMarginSymbol)
return req.Do(ctx)
}
@ -389,10 +389,10 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
// should use background context to invalidate the user stream
log.Info("closing listen key")
if s.useMargin {
if s.useMarginIsolated {
if s.IsMargin {
if s.IsIsolatedMargin {
req := s.Client.NewCloseIsolatedMarginUserStreamService().ListenKey(listenKey)
req.Symbol(s.useMarginIsolatedSymbol)
req.Symbol(s.IsolatedMarginSymbol)
err = req.Do(ctx)
} else {
req := s.Client.NewCloseMarginUserStreamService().ListenKey(listenKey)

View File

@ -13,12 +13,14 @@ type OrderService struct {
}
// QueryLast queries the last order from the database
func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string) (*types.Order, error) {
log.Infof("querying last order exchange = %s AND symbol = %s", ex, symbol)
func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin bool, isIsolated bool) (*types.Order, error) {
log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated)
rows, err := s.DB.NamedQuery(`SELECT * FROM orders WHERE exchange = :exchange AND symbol = :symbol ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
rows, err := s.DB.NamedQuery(`SELECT * FROM orders WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
"is_margin": isMargin,
"is_isolated": isIsolated,
})
if err != nil {

View File

@ -15,7 +15,18 @@ type SyncService struct {
}
func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
lastOrder, err := s.OrderService.QueryLast(exchange.Name(), symbol)
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
lastOrder, err := s.OrderService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated)
if err != nil {
return err
}
@ -54,7 +65,18 @@ func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, s
}
func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
lastTrade, err := s.TradeService.QueryLast(exchange.Name(), symbol)
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
lastTrade, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated)
if err != nil {
return err
}

View File

@ -17,12 +17,14 @@ func NewTradeService(db *sqlx.DB) *TradeService {
}
// QueryLast queries the last trade from the database
func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string) (*types.Trade, error) {
log.Infof("querying last trade exchange = %s AND symbol = %s", ex, symbol)
func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin bool, isIsolated bool) (*types.Trade, error) {
log.Infof("querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated)
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
"symbol": symbol,
"exchange": ex,
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
"symbol": symbol,
"exchange": ex,
"is_margin": isMargin,
"is_isolated": isIsolated,
})
if err != nil {
return nil, errors.Wrap(err, "query last trade error")

View File

@ -5,7 +5,6 @@ import (
"strings"
"time"
"github.com/adshao/go-binance/v2"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
@ -67,7 +66,8 @@ type Exchange interface {
type MarginExchange interface {
UseMargin()
UseIsolatedMargin(symbol string)
QueryMarginAccount(ctx context.Context) (*binance.MarginAccount, error)
GetMarginSettings() MarginSettings
// QueryMarginAccount(ctx context.Context) (*binance.MarginAccount, error)
}
type TradeQueryOptions struct {
@ -221,3 +221,23 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
return c, errC
}
type MarginSettings struct {
IsMargin bool
IsIsolatedMargin bool
IsolatedMarginSymbol string
}
func (e MarginSettings) GetMarginSettings() MarginSettings {
return e
}
func (e *MarginSettings) UseMargin() {
e.IsMargin = true
}
func (e *MarginSettings) UseIsolatedMargin(symbol string) {
e.IsMargin = true
e.IsIsolatedMargin = true
e.IsolatedMarginSymbol = symbol
}