diff --git a/.travis.yml b/.travis.yml index 999bbd6b7..c36fe7efb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,9 @@ language: go go: - 1.14 - 1.15 +services: +- redis-server before_script: - - go mod download +- go mod download script: - - go test -v ./pkg/... +- go test -v ./pkg/... diff --git a/pkg/bbgo/redis_persistence.go b/pkg/bbgo/redis_persistence.go index 5be8f559c..dbbf83922 100644 --- a/pkg/bbgo/redis_persistence.go +++ b/pkg/bbgo/redis_persistence.go @@ -20,6 +20,7 @@ type PersistenceService interface { type Store interface { Load(val interface{}) error Save(val interface{}) error + Reset() error } type MemoryService struct { @@ -54,10 +55,17 @@ func (store *MemoryStore) Load(val interface{}) error { v := reflect.ValueOf(val) if data, ok := store.memory.Slots[store.Key]; ok { v.Elem().Set(reflect.ValueOf(data).Elem()) + } else { + return os.ErrNotExist } return nil } +func (store *MemoryStore) Reset() error { + delete(store.memory.Slots, store.Key) + return nil +} + type JsonPersistenceService struct { Directory string } @@ -74,6 +82,19 @@ type JsonStore struct { Directory string } +func (store JsonStore) Reset() error { + if _, err := os.Stat(store.Directory); os.IsNotExist(err) { + return nil + } + + p := filepath.Join(store.Directory, store.ID) + ".json" + if _, err := os.Stat(p); os.IsNotExist(err) { + return nil + } + + return os.Remove(p) +} + func (store JsonStore) Load(val interface{}) error { if _, err := os.Stat(store.Directory); os.IsNotExist(err) { if err2 := os.Mkdir(store.Directory, 0777); err2 != nil { @@ -82,13 +103,18 @@ func (store JsonStore) Load(val interface{}) error { } p := filepath.Join(store.Directory, store.ID) + ".json" + + if _, err := os.Stat(p); os.IsNotExist(err) { + return os.ErrNotExist + } + data, err := ioutil.ReadFile(p) if err != nil { return err } if len(data) == 0 { - return nil + return os.ErrNotExist } return json.Unmarshal(data, val) @@ -149,21 +175,17 @@ func (store *RedisStore) Load(val interface{}) error { data, err := cmd.Result() if err != nil { if err == redis.Nil { - return nil + return os.ErrNotExist } return err } if len(data) == 0 { - return nil + return os.ErrNotExist } - if err := json.Unmarshal([]byte(data), val); err != nil { - return err - } - - return nil + return json.Unmarshal([]byte(data), val) } func (store *RedisStore) Save(val interface{}) error { @@ -176,3 +198,8 @@ func (store *RedisStore) Save(val interface{}) error { _, err = cmd.Result() return err } + +func (store *RedisStore) Reset() error { + _, err := store.redis.Del(context.Background(), store.ID).Result() + return err +} diff --git a/pkg/bbgo/redis_persistence_test.go b/pkg/bbgo/redis_persistence_test.go new file mode 100644 index 000000000..238687a38 --- /dev/null +++ b/pkg/bbgo/redis_persistence_test.go @@ -0,0 +1,69 @@ +package bbgo + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/fixedpoint" +) + +func TestRedisPersistentService(t *testing.T) { + redisService := NewRedisPersistenceService(&RedisPersistenceConfig{ + Host: "127.0.0.1", + Port: "6379", + DB: 0, + }) + assert.NotNil(t, redisService) + + store := redisService.NewStore("bbgo", "test") + assert.NotNil(t, store) + + err := store.Reset() + assert.NoError(t, err) + + var fp fixedpoint.Value + err = store.Load(fp) + assert.Error(t, err) + assert.EqualError(t, os.ErrNotExist, err.Error()) + + fp = fixedpoint.NewFromFloat(3.1415) + err = store.Save(&fp) + assert.NoError(t, err, "should store value without error") + + var fp2 fixedpoint.Value + err = store.Load(&fp2) + assert.NoError(t, err, "should load value without error") + assert.Equal(t, fp, fp2) + + err = store.Reset() + assert.NoError(t, err) +} + +func TestMemoryService(t *testing.T) { + t.Run("load_empty", func(t *testing.T) { + service := NewMemoryService() + store := service.NewStore("test") + + j := 0 + err := store.Load(&j) + assert.Error(t, err) + }) + + t.Run("save_and_load", func(t *testing.T) { + service := NewMemoryService() + store := service.NewStore("test") + + i := 3 + err := store.Save(&i) + + assert.NoError(t, err) + + var j = 0 + err = store.Load(&j) + assert.NoError(t, err) + assert.Equal(t, i, j) + }) +} + diff --git a/pkg/fixedpoint/convert.go b/pkg/fixedpoint/convert.go index 95d6d0e56..1485fda00 100644 --- a/pkg/fixedpoint/convert.go +++ b/pkg/fixedpoint/convert.go @@ -76,6 +76,12 @@ func (v *Value) UnmarshalYAML(unmarshal func(a interface{}) error) (err error) { return err } +func (v Value) MarshalJSON() ([]byte, error) { + f := float64(v) / DefaultPow + o := fmt.Sprintf("%f", f) + return []byte(o), nil +} + func (v *Value) UnmarshalJSON(data []byte) error { var a interface{} var err = json.Unmarshal(data, &a) diff --git a/pkg/indicator/ewma.go b/pkg/indicator/ewma.go index 9c8ddab15..0690fdbff 100644 --- a/pkg/indicator/ewma.go +++ b/pkg/indicator/ewma.go @@ -64,13 +64,13 @@ func (inc *EWMA) calculateAndUpdate(allKLines []types.KLine) { } v1 := math.Floor(inc.Values[len(inc.Values)-1]*100.0) / 100.0 - v2 := math.Floor(CalculateKLineEWMA(allKLines, priceF, inc.Window)*100.0) / 100.0 + v2 := math.Floor(CalculateKLinesEMA(allKLines, priceF, inc.Window)*100.0) / 100.0 if v1 != v2 { log.Warnf("ACCUMULATED %s EMA (%d) %f != EMA %f", inc.Interval, inc.Window, v1, v2) } } -func CalculateKLineEWMA(allKLines []types.KLine, priceF KLinePriceMapper, window int) float64 { +func CalculateKLinesEMA(allKLines []types.KLine, priceF KLinePriceMapper, window int) float64 { var multiplier = 2.0 / (float64(window) + 1) return ewma(MapKLinePrice(allKLines, priceF), multiplier) } diff --git a/pkg/indicator/ewma_test.go b/pkg/indicator/ewma_test.go index 858148b48..d752b5f0c 100644 --- a/pkg/indicator/ewma_test.go +++ b/pkg/indicator/ewma_test.go @@ -1025,7 +1025,7 @@ func buildKLines(prices []float64) (klines []types.KLine) { func Test_calculateEWMA(t *testing.T) { type args struct { allKLines []types.KLine - priceF KLinePriceMapper + priceF KLinePriceMapper window int } tests := []struct { @@ -1034,21 +1034,39 @@ func Test_calculateEWMA(t *testing.T) { want float64 }{ { - name: "ethusdt ewma 7", + name: "ETHUSDT EMA 7", args: args{ allKLines: buildKLines(ethusdt5m), - priceF: KLineClosePriceMapper, + priceF: KLineClosePriceMapper, window: 7, }, - want: 571.72, // with open price, binance disktop returns 571.45, trading view returns 570.8957, for close price, binance mobile returns 571.72 + want: 571.72, // with open price, binance desktop returns 571.45, trading view returns 570.8957, for close price, binance mobile returns 571.72 + }, + { + name: "ETHUSDT EMA 25", + args: args{ + allKLines: buildKLines(ethusdt5m), + priceF: KLineClosePriceMapper, + window: 25, + }, + want: 571.30, + }, + { + name: "ETHUSDT EMA 99", + args: args{ + allKLines: buildKLines(ethusdt5m), + priceF: KLineClosePriceMapper, + window: 99, + }, + want: 577.62, // binance mobile uses 577.58 }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := CalculateKLineEWMA(tt.args.allKLines, tt.args.priceF, tt.args.window) - got = math.Trunc(got * 100.0) / 100.0 + got := CalculateKLinesEMA(tt.args.allKLines, tt.args.priceF, tt.args.window) + got = math.Trunc(got*100.0) / 100.0 if got != tt.want { - t.Errorf("CalculateKLineEWMA() = %v, want %v", got, tt.want) + t.Errorf("CalculateKLinesEMA() = %v, want %v", got, tt.want) } }) } diff --git a/pkg/strategy/mirrormaker/main.go b/pkg/strategy/mirrormaker/main.go new file mode 100644 index 000000000..57651dacd --- /dev/null +++ b/pkg/strategy/mirrormaker/main.go @@ -0,0 +1,288 @@ +package mirrormaker + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +var defaultMargin = fixedpoint.NewFromFloat(0.01) + +var defaultQuantity = fixedpoint.NewFromFloat(0.001) + +var log = logrus.WithField("strategy", "mirrormaker") + +func init() { + bbgo.RegisterStrategy("mirrormaker", &Strategy{}) +} + +type Strategy struct { + Symbol string `json:"symbol"` + SourceExchange string `json:"sourceExchange"` + MakerExchange string `json:"makerExchange"` + + Margin fixedpoint.Value `json:"margin"` + BidMargin fixedpoint.Value `json:"bidMargin"` + AskMargin fixedpoint.Value `json:"askMargin"` + Quantity fixedpoint.Value `json:"quantity"` + QuantityMultiplier fixedpoint.Value `json:"quantityMultiplier"` + + NumLayers int `json:"numLayers"` + Pips int `json:"pips"` + + makerSession *bbgo.ExchangeSession + sourceSession *bbgo.ExchangeSession + + sourceMarket types.Market + makerMarket types.Market + + book *types.StreamOrderBook + activeMakerOrders *bbgo.LocalActiveOrderBook + + orderStore *bbgo.OrderStore + + Position fixedpoint.Value + lastPrice float64 + + stopC chan struct{} + + *bbgo.Graceful +} + +func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) { + sourceSession, ok := sessions[s.SourceExchange] + if !ok { + panic(fmt.Errorf("source exchange %s is not defined", s.SourceExchange)) + } + + log.Infof("subscribing %s from %s", s.Symbol, s.SourceExchange) + sourceSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{}) +} + +func (s *Strategy) updateQuote(ctx context.Context) { + if err := s.makerSession.Exchange.CancelOrders(ctx, s.activeMakerOrders.Orders()...); err != nil { + log.WithError(err).Errorf("can not cancel orders") + return + } + + // avoid unlock issue + time.Sleep(100 * time.Millisecond) + + sourceBook := s.book.Get() + if len(sourceBook.Bids) == 0 || len(sourceBook.Asks) == 0 { + return + } + + bestBidPrice := sourceBook.Bids[0].Price + bestAskPrice := sourceBook.Asks[0].Price + log.Infof("best bid price %f, best ask price: %f", bestBidPrice.Float64(), bestAskPrice.Float64()) + + bidQuantity := s.Quantity + bidPrice := bestBidPrice.MulFloat64(1.0 - s.BidMargin.Float64()) + + askQuantity := s.Quantity + askPrice := bestAskPrice.MulFloat64(1.0 + s.AskMargin.Float64()) + + log.Infof("quote bid price: %f ask price: %f", bidPrice.Float64(), askPrice.Float64()) + + var submitOrders []types.SubmitOrder + + balances := s.makerSession.Account.Balances() + makerQuota := &bbgo.QuotaTransaction{} + if b, ok := balances[s.makerMarket.BaseCurrency]; ok { + makerQuota.BaseAsset.Add(b.Available) + } + if b, ok := balances[s.makerMarket.QuoteCurrency]; ok { + makerQuota.QuoteAsset.Add(b.Available) + } + + hedgeBalances := s.sourceSession.Account.Balances() + hedgeQuota := &bbgo.QuotaTransaction{} + if b, ok := hedgeBalances[s.sourceMarket.BaseCurrency]; ok { + hedgeQuota.BaseAsset.Add(b.Available) + } + if b, ok := hedgeBalances[s.sourceMarket.QuoteCurrency]; ok { + hedgeQuota.QuoteAsset.Add(b.Available) + } + + log.Infof("maker quota: %+v", makerQuota) + log.Infof("hedge quota: %+v", hedgeQuota) + + for i := 0; i < s.NumLayers; i++ { + // bid orders + if makerQuota.QuoteAsset.Lock(bidQuantity.Mul(bidPrice)) && hedgeQuota.BaseAsset.Lock(bidQuantity) { + // if we bought, then we need to sell the base from the hedge session + submitOrders = append(submitOrders, types.SubmitOrder{ + Symbol: s.Symbol, + Type: types.OrderTypeLimit, + Side: types.SideTypeBuy, + Price: bidPrice.Float64(), + Quantity: bidQuantity.Float64(), + TimeInForce: "GTC", + }) + + makerQuota.Commit() + hedgeQuota.Commit() + } else { + makerQuota.Rollback() + hedgeQuota.Rollback() + } + + // ask orders + if makerQuota.BaseAsset.Lock(askQuantity) && hedgeQuota.QuoteAsset.Lock(askQuantity.Mul(askPrice)) { + // if we bought, then we need to sell the base from the hedge session + submitOrders = append(submitOrders, types.SubmitOrder{ + Symbol: s.Symbol, + Type: types.OrderTypeLimit, + Side: types.SideTypeSell, + Price: askPrice.Float64(), + Quantity: askQuantity.Float64(), + TimeInForce: "GTC", + }) + makerQuota.Commit() + hedgeQuota.Commit() + } else { + makerQuota.Rollback() + hedgeQuota.Rollback() + } + + bidPrice -= fixedpoint.NewFromFloat(s.makerMarket.TickSize * float64(s.Pips)) + askPrice += fixedpoint.NewFromFloat(s.makerMarket.TickSize * float64(s.Pips)) + + askQuantity = askQuantity.Mul(s.QuantityMultiplier) + bidQuantity = bidQuantity.Mul(s.QuantityMultiplier) + } + + if len(submitOrders) == 0 { + return + } + + makerOrderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.makerSession} + makerOrders, err := makerOrderExecutor.SubmitOrders(ctx, submitOrders...) + if err != nil { + log.WithError(err).Errorf("order submit error") + return + } + + s.activeMakerOrders.Add(makerOrders...) + s.orderStore.Add(makerOrders...) +} + +func (s *Strategy) handleTradeUpdate(trade types.Trade) { + log.Infof("received trade %+v", trade) + if s.orderStore.Exists(trade.OrderID) { + log.Infof("identified trade %d with an existing order: %d", trade.ID, trade.OrderID) + + q := fixedpoint.NewFromFloat(trade.Quantity) + if trade.Side == types.SideTypeSell { + q = -q + } + + s.Position.AtomicAdd(q) + + pos := s.Position.AtomicLoad() + log.Warnf("position changed: %f", pos.Float64()) + + s.lastPrice = trade.Price + } +} + +func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error { + sourceSession, ok := sessions[s.SourceExchange] + if !ok { + return fmt.Errorf("source exchange session %s is not defined", s.SourceExchange) + } + + s.sourceSession = sourceSession + + makerSession, ok := sessions[s.MakerExchange] + if !ok { + return fmt.Errorf("maker exchange session %s is not defined", s.MakerExchange) + } + + s.makerSession = makerSession + + s.sourceMarket, ok = s.sourceSession.Market(s.Symbol) + if !ok { + return fmt.Errorf("source session market %s is not defined", s.Symbol) + } + + s.makerMarket, ok = s.makerSession.Market(s.Symbol) + if !ok { + return fmt.Errorf("maker session market %s is not defined", s.Symbol) + } + + if s.NumLayers == 0 { + s.NumLayers = 1 + } + + if s.BidMargin == 0 { + if s.Margin != 0 { + s.BidMargin = s.Margin + } else { + s.BidMargin = defaultMargin + } + } + + if s.AskMargin == 0 { + if s.Margin != 0 { + s.AskMargin = s.Margin + } else { + s.AskMargin = defaultMargin + } + } + + if s.Quantity == 0 { + s.Quantity = defaultQuantity + } + + s.book = types.NewStreamBook(s.Symbol) + s.book.BindStream(s.sourceSession.Stream) + + s.makerSession.Stream.OnTradeUpdate(s.handleTradeUpdate) + + s.activeMakerOrders = bbgo.NewLocalActiveOrderBook() + s.activeMakerOrders.BindStream(s.makerSession.Stream) + + s.orderStore = bbgo.NewOrderStore() + s.orderStore.BindStream(s.makerSession.Stream) + + s.stopC = make(chan struct{}) + + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + + case <-s.stopC: + return + + case <-ctx.Done(): + return + + case <-ticker.C: + s.updateQuote(ctx) + } + } + }() + + s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + close(s.stopC) + + defer wg.Done() + + if err := s.makerSession.Exchange.CancelOrders(ctx, s.activeMakerOrders.Orders()...); err != nil { + log.WithError(err).Errorf("can not cancel orders") + } + }) + + return nil +}