split files for batch processor and margin settings

This commit is contained in:
c9s 2021-01-21 00:58:02 +08:00
parent ad4f339b27
commit a73729fb17
3 changed files with 186 additions and 177 deletions

158
pkg/types/batch.go Normal file
View File

@ -0,0 +1,158 @@
package types
import (
"context"
"time"
"github.com/sirupsen/logrus"
)
type ExchangeBatchProcessor struct {
Exchange
}
func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan Order, errC chan error) {
c = make(chan Order, 500)
errC = make(chan error, 1)
go func() {
defer close(c)
defer close(errC)
orderIDs := make(map[uint64]struct{}, 500)
if lastOrderID > 0 {
orderIDs[lastOrderID] = struct{}{}
}
for startTime.Before(endTime) {
time.Sleep(5 * time.Second)
logrus.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime)
orders, err := e.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID)
if err != nil {
errC <- err
return
}
if len(orders) == 0 || (len(orders) == 1 && orders[0].OrderID == lastOrderID) {
return
}
for _, o := range orders {
if _, ok := orderIDs[o.OrderID]; ok {
logrus.Infof("skipping duplicated order id: %d", o.OrderID)
continue
}
c <- o
startTime = o.CreationTime
lastOrderID = o.OrderID
orderIDs[o.OrderID] = struct{}{}
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval Interval, startTime, endTime time.Time) (c chan KLine, errC chan error) {
c = make(chan KLine, 1000)
errC = make(chan error, 1)
go func() {
defer close(c)
defer close(errC)
for startTime.Before(endTime) {
time.Sleep(5 * time.Second)
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
})
if err != nil {
errC <- err
return
}
if len(kLines) == 0 {
return
}
for _, kline := range kLines {
// ignore any kline before the given start time
if kline.StartTime.Before(startTime) {
continue
}
if kline.EndTime.After(endTime) {
return
}
c <- kline
startTime = kline.EndTime
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (c chan Trade, errC chan error) {
c = make(chan Trade, 500)
errC = make(chan error, 1)
// last 7 days
var startTime = time.Now().Add(-7 * 24 * time.Hour)
if options.StartTime != nil {
startTime = *options.StartTime
}
var lastTradeID = options.LastTradeID
go func() {
defer close(c)
defer close(errC)
for {
time.Sleep(5 * time.Second)
logrus.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit)
trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{
StartTime: &startTime,
Limit: options.Limit,
LastTradeID: lastTradeID,
})
if err != nil {
errC <- err
return
}
if len(trades) == 0 {
break
}
if len(trades) == 1 && trades[0].ID == lastTradeID {
break
}
logrus.Infof("returned %d trades", len(trades))
startTime = trades[len(trades)-1].Time
for _, t := range trades {
// ignore the first trade if last TradeID is given
if t.ID == lastTradeID {
continue
}
c <- t
lastTradeID = t.ID
}
}
}()
return c, errC
}

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
const DateFormat = "2006-01-02"
@ -63,13 +62,6 @@ type Exchange interface {
CancelOrders(ctx context.Context, orders ...Order) error
}
type MarginExchange interface {
UseMargin()
UseIsolatedMargin(symbol string)
GetMarginSettings() MarginSettings
// QueryMarginAccount(ctx context.Context) (*binance.MarginAccount, error)
}
type TradeQueryOptions struct {
StartTime *time.Time
EndTime *time.Time
@ -77,172 +69,3 @@ type TradeQueryOptions struct {
LastTradeID int64
}
type ExchangeBatchProcessor struct {
Exchange
}
func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan Order, errC chan error) {
c = make(chan Order, 500)
errC = make(chan error, 1)
go func() {
defer close(c)
defer close(errC)
orderIDs := make(map[uint64]struct{}, 500)
if lastOrderID > 0 {
orderIDs[lastOrderID] = struct{}{}
}
for startTime.Before(endTime) {
time.Sleep(5 * time.Second)
log.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime)
orders, err := e.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID)
if err != nil {
errC <- err
return
}
if len(orders) == 0 || (len(orders) == 1 && orders[0].OrderID == lastOrderID) {
return
}
for _, o := range orders {
if _, ok := orderIDs[o.OrderID]; ok {
log.Infof("skipping duplicated order id: %d", o.OrderID)
continue
}
c <- o
startTime = o.CreationTime
lastOrderID = o.OrderID
orderIDs[o.OrderID] = struct{}{}
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval Interval, startTime, endTime time.Time) (c chan KLine, errC chan error) {
c = make(chan KLine, 1000)
errC = make(chan error, 1)
go func() {
defer close(c)
defer close(errC)
for startTime.Before(endTime) {
time.Sleep(5 * time.Second)
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
})
if err != nil {
errC <- err
return
}
if len(kLines) == 0 {
return
}
for _, kline := range kLines {
// ignore any kline before the given start time
if kline.StartTime.Before(startTime) {
continue
}
if kline.EndTime.After(endTime) {
return
}
c <- kline
startTime = kline.EndTime
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (c chan Trade, errC chan error) {
c = make(chan Trade, 500)
errC = make(chan error, 1)
// last 7 days
var startTime = time.Now().Add(-7 * 24 * time.Hour)
if options.StartTime != nil {
startTime = *options.StartTime
}
var lastTradeID = options.LastTradeID
go func() {
defer close(c)
defer close(errC)
for {
time.Sleep(5 * time.Second)
log.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit)
trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{
StartTime: &startTime,
Limit: options.Limit,
LastTradeID: lastTradeID,
})
if err != nil {
errC <- err
return
}
if len(trades) == 0 {
break
}
if len(trades) == 1 && trades[0].ID == lastTradeID {
break
}
log.Infof("returned %d trades", len(trades))
startTime = trades[len(trades)-1].Time
for _, t := range trades {
// ignore the first trade if last TradeID is given
if t.ID == lastTradeID {
continue
}
c <- t
lastTradeID = t.ID
}
}
}()
return c, errC
}
type MarginSettings struct {
IsMargin bool
IsIsolatedMargin bool
IsolatedMarginSymbol string
}
func (e MarginSettings) GetMarginSettings() MarginSettings {
return e
}
func (e *MarginSettings) UseMargin() {
e.IsMargin = true
}
func (e *MarginSettings) UseIsolatedMargin(symbol string) {
e.IsMargin = true
e.IsIsolatedMargin = true
e.IsolatedMarginSymbol = symbol
}

28
pkg/types/margin.go Normal file
View File

@ -0,0 +1,28 @@
package types
type MarginExchange interface {
UseMargin()
UseIsolatedMargin(symbol string)
GetMarginSettings() MarginSettings
// QueryMarginAccount(ctx context.Context) (*binance.MarginAccount, error)
}
type MarginSettings struct {
IsMargin bool
IsIsolatedMargin bool
IsolatedMarginSymbol string
}
func (e MarginSettings) GetMarginSettings() MarginSettings {
return e
}
func (e *MarginSettings) UseMargin() {
e.IsMargin = true
}
func (e *MarginSettings) UseIsolatedMargin(symbol string) {
e.IsMargin = true
e.IsIsolatedMargin = true
e.IsolatedMarginSymbol = symbol
}