mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 14:55:16 +00:00
Merge pull request #70 from c9s/persistent-tests
fix: add tests for persistence and fix fixedpoint marshalling issue
This commit is contained in:
commit
aecbe180e6
|
@ -3,7 +3,9 @@ language: go
|
|||
go:
|
||||
- 1.14
|
||||
- 1.15
|
||||
services:
|
||||
- redis-server
|
||||
before_script:
|
||||
- go mod download
|
||||
- go mod download
|
||||
script:
|
||||
- go test -v ./pkg/...
|
||||
- go test -v ./pkg/...
|
||||
|
|
|
@ -20,6 +20,7 @@ type PersistenceService interface {
|
|||
type Store interface {
|
||||
Load(val interface{}) error
|
||||
Save(val interface{}) error
|
||||
Reset() error
|
||||
}
|
||||
|
||||
type MemoryService struct {
|
||||
|
@ -54,10 +55,17 @@ func (store *MemoryStore) Load(val interface{}) error {
|
|||
v := reflect.ValueOf(val)
|
||||
if data, ok := store.memory.Slots[store.Key]; ok {
|
||||
v.Elem().Set(reflect.ValueOf(data).Elem())
|
||||
} else {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *MemoryStore) Reset() error {
|
||||
delete(store.memory.Slots, store.Key)
|
||||
return nil
|
||||
}
|
||||
|
||||
type JsonPersistenceService struct {
|
||||
Directory string
|
||||
}
|
||||
|
@ -74,6 +82,19 @@ type JsonStore struct {
|
|||
Directory string
|
||||
}
|
||||
|
||||
func (store JsonStore) Reset() error {
|
||||
if _, err := os.Stat(store.Directory); os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
p := filepath.Join(store.Directory, store.ID) + ".json"
|
||||
if _, err := os.Stat(p); os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return os.Remove(p)
|
||||
}
|
||||
|
||||
func (store JsonStore) Load(val interface{}) error {
|
||||
if _, err := os.Stat(store.Directory); os.IsNotExist(err) {
|
||||
if err2 := os.Mkdir(store.Directory, 0777); err2 != nil {
|
||||
|
@ -82,13 +103,18 @@ func (store JsonStore) Load(val interface{}) error {
|
|||
}
|
||||
|
||||
p := filepath.Join(store.Directory, store.ID) + ".json"
|
||||
|
||||
if _, err := os.Stat(p); os.IsNotExist(err) {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
data, err := ioutil.ReadFile(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
return json.Unmarshal(data, val)
|
||||
|
@ -149,21 +175,17 @@ func (store *RedisStore) Load(val interface{}) error {
|
|||
data, err := cmd.Result()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
if err := json.Unmarshal([]byte(data), val); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return json.Unmarshal([]byte(data), val)
|
||||
}
|
||||
|
||||
func (store *RedisStore) Save(val interface{}) error {
|
||||
|
@ -176,3 +198,8 @@ func (store *RedisStore) Save(val interface{}) error {
|
|||
_, err = cmd.Result()
|
||||
return err
|
||||
}
|
||||
|
||||
func (store *RedisStore) Reset() error {
|
||||
_, err := store.redis.Del(context.Background(), store.ID).Result()
|
||||
return err
|
||||
}
|
||||
|
|
69
pkg/bbgo/redis_persistence_test.go
Normal file
69
pkg/bbgo/redis_persistence_test.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package bbgo
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
)
|
||||
|
||||
func TestRedisPersistentService(t *testing.T) {
|
||||
redisService := NewRedisPersistenceService(&RedisPersistenceConfig{
|
||||
Host: "127.0.0.1",
|
||||
Port: "6379",
|
||||
DB: 0,
|
||||
})
|
||||
assert.NotNil(t, redisService)
|
||||
|
||||
store := redisService.NewStore("bbgo", "test")
|
||||
assert.NotNil(t, store)
|
||||
|
||||
err := store.Reset()
|
||||
assert.NoError(t, err)
|
||||
|
||||
var fp fixedpoint.Value
|
||||
err = store.Load(fp)
|
||||
assert.Error(t, err)
|
||||
assert.EqualError(t, os.ErrNotExist, err.Error())
|
||||
|
||||
fp = fixedpoint.NewFromFloat(3.1415)
|
||||
err = store.Save(&fp)
|
||||
assert.NoError(t, err, "should store value without error")
|
||||
|
||||
var fp2 fixedpoint.Value
|
||||
err = store.Load(&fp2)
|
||||
assert.NoError(t, err, "should load value without error")
|
||||
assert.Equal(t, fp, fp2)
|
||||
|
||||
err = store.Reset()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestMemoryService(t *testing.T) {
|
||||
t.Run("load_empty", func(t *testing.T) {
|
||||
service := NewMemoryService()
|
||||
store := service.NewStore("test")
|
||||
|
||||
j := 0
|
||||
err := store.Load(&j)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("save_and_load", func(t *testing.T) {
|
||||
service := NewMemoryService()
|
||||
store := service.NewStore("test")
|
||||
|
||||
i := 3
|
||||
err := store.Save(&i)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
var j = 0
|
||||
err = store.Load(&j)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, j)
|
||||
})
|
||||
}
|
||||
|
|
@ -76,6 +76,12 @@ func (v *Value) UnmarshalYAML(unmarshal func(a interface{}) error) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
func (v Value) MarshalJSON() ([]byte, error) {
|
||||
f := float64(v) / DefaultPow
|
||||
o := fmt.Sprintf("%f", f)
|
||||
return []byte(o), nil
|
||||
}
|
||||
|
||||
func (v *Value) UnmarshalJSON(data []byte) error {
|
||||
var a interface{}
|
||||
var err = json.Unmarshal(data, &a)
|
||||
|
|
|
@ -64,13 +64,13 @@ func (inc *EWMA) calculateAndUpdate(allKLines []types.KLine) {
|
|||
}
|
||||
|
||||
v1 := math.Floor(inc.Values[len(inc.Values)-1]*100.0) / 100.0
|
||||
v2 := math.Floor(CalculateKLineEWMA(allKLines, priceF, inc.Window)*100.0) / 100.0
|
||||
v2 := math.Floor(CalculateKLinesEMA(allKLines, priceF, inc.Window)*100.0) / 100.0
|
||||
if v1 != v2 {
|
||||
log.Warnf("ACCUMULATED %s EMA (%d) %f != EMA %f", inc.Interval, inc.Window, v1, v2)
|
||||
}
|
||||
}
|
||||
|
||||
func CalculateKLineEWMA(allKLines []types.KLine, priceF KLinePriceMapper, window int) float64 {
|
||||
func CalculateKLinesEMA(allKLines []types.KLine, priceF KLinePriceMapper, window int) float64 {
|
||||
var multiplier = 2.0 / (float64(window) + 1)
|
||||
return ewma(MapKLinePrice(allKLines, priceF), multiplier)
|
||||
}
|
||||
|
|
|
@ -1034,21 +1034,39 @@ func Test_calculateEWMA(t *testing.T) {
|
|||
want float64
|
||||
}{
|
||||
{
|
||||
name: "ethusdt ewma 7",
|
||||
name: "ETHUSDT EMA 7",
|
||||
args: args{
|
||||
allKLines: buildKLines(ethusdt5m),
|
||||
priceF: KLineClosePriceMapper,
|
||||
window: 7,
|
||||
},
|
||||
want: 571.72, // with open price, binance disktop returns 571.45, trading view returns 570.8957, for close price, binance mobile returns 571.72
|
||||
want: 571.72, // with open price, binance desktop returns 571.45, trading view returns 570.8957, for close price, binance mobile returns 571.72
|
||||
},
|
||||
{
|
||||
name: "ETHUSDT EMA 25",
|
||||
args: args{
|
||||
allKLines: buildKLines(ethusdt5m),
|
||||
priceF: KLineClosePriceMapper,
|
||||
window: 25,
|
||||
},
|
||||
want: 571.30,
|
||||
},
|
||||
{
|
||||
name: "ETHUSDT EMA 99",
|
||||
args: args{
|
||||
allKLines: buildKLines(ethusdt5m),
|
||||
priceF: KLineClosePriceMapper,
|
||||
window: 99,
|
||||
},
|
||||
want: 577.62, // binance mobile uses 577.58
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := CalculateKLineEWMA(tt.args.allKLines, tt.args.priceF, tt.args.window)
|
||||
got = math.Trunc(got * 100.0) / 100.0
|
||||
got := CalculateKLinesEMA(tt.args.allKLines, tt.args.priceF, tt.args.window)
|
||||
got = math.Trunc(got*100.0) / 100.0
|
||||
if got != tt.want {
|
||||
t.Errorf("CalculateKLineEWMA() = %v, want %v", got, tt.want)
|
||||
t.Errorf("CalculateKLinesEMA() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
288
pkg/strategy/mirrormaker/main.go
Normal file
288
pkg/strategy/mirrormaker/main.go
Normal file
|
@ -0,0 +1,288 @@
|
|||
package mirrormaker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
)
|
||||
|
||||
var defaultMargin = fixedpoint.NewFromFloat(0.01)
|
||||
|
||||
var defaultQuantity = fixedpoint.NewFromFloat(0.001)
|
||||
|
||||
var log = logrus.WithField("strategy", "mirrormaker")
|
||||
|
||||
func init() {
|
||||
bbgo.RegisterStrategy("mirrormaker", &Strategy{})
|
||||
}
|
||||
|
||||
type Strategy struct {
|
||||
Symbol string `json:"symbol"`
|
||||
SourceExchange string `json:"sourceExchange"`
|
||||
MakerExchange string `json:"makerExchange"`
|
||||
|
||||
Margin fixedpoint.Value `json:"margin"`
|
||||
BidMargin fixedpoint.Value `json:"bidMargin"`
|
||||
AskMargin fixedpoint.Value `json:"askMargin"`
|
||||
Quantity fixedpoint.Value `json:"quantity"`
|
||||
QuantityMultiplier fixedpoint.Value `json:"quantityMultiplier"`
|
||||
|
||||
NumLayers int `json:"numLayers"`
|
||||
Pips int `json:"pips"`
|
||||
|
||||
makerSession *bbgo.ExchangeSession
|
||||
sourceSession *bbgo.ExchangeSession
|
||||
|
||||
sourceMarket types.Market
|
||||
makerMarket types.Market
|
||||
|
||||
book *types.StreamOrderBook
|
||||
activeMakerOrders *bbgo.LocalActiveOrderBook
|
||||
|
||||
orderStore *bbgo.OrderStore
|
||||
|
||||
Position fixedpoint.Value
|
||||
lastPrice float64
|
||||
|
||||
stopC chan struct{}
|
||||
|
||||
*bbgo.Graceful
|
||||
}
|
||||
|
||||
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
|
||||
sourceSession, ok := sessions[s.SourceExchange]
|
||||
if !ok {
|
||||
panic(fmt.Errorf("source exchange %s is not defined", s.SourceExchange))
|
||||
}
|
||||
|
||||
log.Infof("subscribing %s from %s", s.Symbol, s.SourceExchange)
|
||||
sourceSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{})
|
||||
}
|
||||
|
||||
func (s *Strategy) updateQuote(ctx context.Context) {
|
||||
if err := s.makerSession.Exchange.CancelOrders(ctx, s.activeMakerOrders.Orders()...); err != nil {
|
||||
log.WithError(err).Errorf("can not cancel orders")
|
||||
return
|
||||
}
|
||||
|
||||
// avoid unlock issue
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
sourceBook := s.book.Get()
|
||||
if len(sourceBook.Bids) == 0 || len(sourceBook.Asks) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
bestBidPrice := sourceBook.Bids[0].Price
|
||||
bestAskPrice := sourceBook.Asks[0].Price
|
||||
log.Infof("best bid price %f, best ask price: %f", bestBidPrice.Float64(), bestAskPrice.Float64())
|
||||
|
||||
bidQuantity := s.Quantity
|
||||
bidPrice := bestBidPrice.MulFloat64(1.0 - s.BidMargin.Float64())
|
||||
|
||||
askQuantity := s.Quantity
|
||||
askPrice := bestAskPrice.MulFloat64(1.0 + s.AskMargin.Float64())
|
||||
|
||||
log.Infof("quote bid price: %f ask price: %f", bidPrice.Float64(), askPrice.Float64())
|
||||
|
||||
var submitOrders []types.SubmitOrder
|
||||
|
||||
balances := s.makerSession.Account.Balances()
|
||||
makerQuota := &bbgo.QuotaTransaction{}
|
||||
if b, ok := balances[s.makerMarket.BaseCurrency]; ok {
|
||||
makerQuota.BaseAsset.Add(b.Available)
|
||||
}
|
||||
if b, ok := balances[s.makerMarket.QuoteCurrency]; ok {
|
||||
makerQuota.QuoteAsset.Add(b.Available)
|
||||
}
|
||||
|
||||
hedgeBalances := s.sourceSession.Account.Balances()
|
||||
hedgeQuota := &bbgo.QuotaTransaction{}
|
||||
if b, ok := hedgeBalances[s.sourceMarket.BaseCurrency]; ok {
|
||||
hedgeQuota.BaseAsset.Add(b.Available)
|
||||
}
|
||||
if b, ok := hedgeBalances[s.sourceMarket.QuoteCurrency]; ok {
|
||||
hedgeQuota.QuoteAsset.Add(b.Available)
|
||||
}
|
||||
|
||||
log.Infof("maker quota: %+v", makerQuota)
|
||||
log.Infof("hedge quota: %+v", hedgeQuota)
|
||||
|
||||
for i := 0; i < s.NumLayers; i++ {
|
||||
// bid orders
|
||||
if makerQuota.QuoteAsset.Lock(bidQuantity.Mul(bidPrice)) && hedgeQuota.BaseAsset.Lock(bidQuantity) {
|
||||
// if we bought, then we need to sell the base from the hedge session
|
||||
submitOrders = append(submitOrders, types.SubmitOrder{
|
||||
Symbol: s.Symbol,
|
||||
Type: types.OrderTypeLimit,
|
||||
Side: types.SideTypeBuy,
|
||||
Price: bidPrice.Float64(),
|
||||
Quantity: bidQuantity.Float64(),
|
||||
TimeInForce: "GTC",
|
||||
})
|
||||
|
||||
makerQuota.Commit()
|
||||
hedgeQuota.Commit()
|
||||
} else {
|
||||
makerQuota.Rollback()
|
||||
hedgeQuota.Rollback()
|
||||
}
|
||||
|
||||
// ask orders
|
||||
if makerQuota.BaseAsset.Lock(askQuantity) && hedgeQuota.QuoteAsset.Lock(askQuantity.Mul(askPrice)) {
|
||||
// if we bought, then we need to sell the base from the hedge session
|
||||
submitOrders = append(submitOrders, types.SubmitOrder{
|
||||
Symbol: s.Symbol,
|
||||
Type: types.OrderTypeLimit,
|
||||
Side: types.SideTypeSell,
|
||||
Price: askPrice.Float64(),
|
||||
Quantity: askQuantity.Float64(),
|
||||
TimeInForce: "GTC",
|
||||
})
|
||||
makerQuota.Commit()
|
||||
hedgeQuota.Commit()
|
||||
} else {
|
||||
makerQuota.Rollback()
|
||||
hedgeQuota.Rollback()
|
||||
}
|
||||
|
||||
bidPrice -= fixedpoint.NewFromFloat(s.makerMarket.TickSize * float64(s.Pips))
|
||||
askPrice += fixedpoint.NewFromFloat(s.makerMarket.TickSize * float64(s.Pips))
|
||||
|
||||
askQuantity = askQuantity.Mul(s.QuantityMultiplier)
|
||||
bidQuantity = bidQuantity.Mul(s.QuantityMultiplier)
|
||||
}
|
||||
|
||||
if len(submitOrders) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
makerOrderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.makerSession}
|
||||
makerOrders, err := makerOrderExecutor.SubmitOrders(ctx, submitOrders...)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("order submit error")
|
||||
return
|
||||
}
|
||||
|
||||
s.activeMakerOrders.Add(makerOrders...)
|
||||
s.orderStore.Add(makerOrders...)
|
||||
}
|
||||
|
||||
func (s *Strategy) handleTradeUpdate(trade types.Trade) {
|
||||
log.Infof("received trade %+v", trade)
|
||||
if s.orderStore.Exists(trade.OrderID) {
|
||||
log.Infof("identified trade %d with an existing order: %d", trade.ID, trade.OrderID)
|
||||
|
||||
q := fixedpoint.NewFromFloat(trade.Quantity)
|
||||
if trade.Side == types.SideTypeSell {
|
||||
q = -q
|
||||
}
|
||||
|
||||
s.Position.AtomicAdd(q)
|
||||
|
||||
pos := s.Position.AtomicLoad()
|
||||
log.Warnf("position changed: %f", pos.Float64())
|
||||
|
||||
s.lastPrice = trade.Price
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
|
||||
sourceSession, ok := sessions[s.SourceExchange]
|
||||
if !ok {
|
||||
return fmt.Errorf("source exchange session %s is not defined", s.SourceExchange)
|
||||
}
|
||||
|
||||
s.sourceSession = sourceSession
|
||||
|
||||
makerSession, ok := sessions[s.MakerExchange]
|
||||
if !ok {
|
||||
return fmt.Errorf("maker exchange session %s is not defined", s.MakerExchange)
|
||||
}
|
||||
|
||||
s.makerSession = makerSession
|
||||
|
||||
s.sourceMarket, ok = s.sourceSession.Market(s.Symbol)
|
||||
if !ok {
|
||||
return fmt.Errorf("source session market %s is not defined", s.Symbol)
|
||||
}
|
||||
|
||||
s.makerMarket, ok = s.makerSession.Market(s.Symbol)
|
||||
if !ok {
|
||||
return fmt.Errorf("maker session market %s is not defined", s.Symbol)
|
||||
}
|
||||
|
||||
if s.NumLayers == 0 {
|
||||
s.NumLayers = 1
|
||||
}
|
||||
|
||||
if s.BidMargin == 0 {
|
||||
if s.Margin != 0 {
|
||||
s.BidMargin = s.Margin
|
||||
} else {
|
||||
s.BidMargin = defaultMargin
|
||||
}
|
||||
}
|
||||
|
||||
if s.AskMargin == 0 {
|
||||
if s.Margin != 0 {
|
||||
s.AskMargin = s.Margin
|
||||
} else {
|
||||
s.AskMargin = defaultMargin
|
||||
}
|
||||
}
|
||||
|
||||
if s.Quantity == 0 {
|
||||
s.Quantity = defaultQuantity
|
||||
}
|
||||
|
||||
s.book = types.NewStreamBook(s.Symbol)
|
||||
s.book.BindStream(s.sourceSession.Stream)
|
||||
|
||||
s.makerSession.Stream.OnTradeUpdate(s.handleTradeUpdate)
|
||||
|
||||
s.activeMakerOrders = bbgo.NewLocalActiveOrderBook()
|
||||
s.activeMakerOrders.BindStream(s.makerSession.Stream)
|
||||
|
||||
s.orderStore = bbgo.NewOrderStore()
|
||||
s.orderStore.BindStream(s.makerSession.Stream)
|
||||
|
||||
s.stopC = make(chan struct{})
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
|
||||
case <-s.stopC:
|
||||
return
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
s.updateQuote(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
|
||||
close(s.stopC)
|
||||
|
||||
defer wg.Done()
|
||||
|
||||
if err := s.makerSession.Exchange.CancelOrders(ctx, s.activeMakerOrders.Orders()...); err != nil {
|
||||
log.WithError(err).Errorf("can not cancel orders")
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user