mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 14:55:16 +00:00
FEATURE: add market info in-mem cache
This commit is contained in:
parent
3952f33de8
commit
3f7e617004
101
pkg/cache/cache.go
vendored
101
pkg/cache/cache.go
vendored
|
@ -8,17 +8,74 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
"github.com/c9s/bbgo/pkg/util"
|
||||||
|
"github.com/c9s/bbgo/pkg/util/backoff"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DataFetcher func() (interface{}, error)
|
const memCacheExpiry = 5 * time.Minute
|
||||||
|
const fileCacheExpiry = 24 * time.Hour
|
||||||
|
|
||||||
const cacheExpiry = 24 * time.Hour
|
var globalMarketMemCache *marketMemCache = newMarketMemCache()
|
||||||
|
|
||||||
|
type marketMemCache struct {
|
||||||
|
sync.Mutex
|
||||||
|
markets map[string]marketMapWithTime
|
||||||
|
}
|
||||||
|
|
||||||
|
type marketMapWithTime struct {
|
||||||
|
updatedAt time.Time
|
||||||
|
markets types.MarketMap
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMarketMemCache() *marketMemCache {
|
||||||
|
cache := &marketMemCache{
|
||||||
|
markets: make(map[string]marketMapWithTime),
|
||||||
|
}
|
||||||
|
return cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *marketMemCache) IsOutdated(exName string) bool {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
data, ok := c.markets[exName]
|
||||||
|
return !ok || time.Since(data.updatedAt) > memCacheExpiry
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *marketMemCache) Set(exName string, markets types.MarketMap) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
c.markets[exName] = marketMapWithTime{
|
||||||
|
updatedAt: time.Now(),
|
||||||
|
markets: markets,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *marketMemCache) Get(exName string) (types.MarketMap, bool) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
markets, ok := c.markets[exName]
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
copied := types.MarketMap{}
|
||||||
|
for key, val := range markets.markets {
|
||||||
|
copied[key] = val
|
||||||
|
}
|
||||||
|
return copied, true
|
||||||
|
}
|
||||||
|
|
||||||
|
type DataFetcher func() (interface{}, error)
|
||||||
|
|
||||||
// WithCache let you use the cache with the given cache key, variable reference and your data fetcher,
|
// WithCache let you use the cache with the given cache key, variable reference and your data fetcher,
|
||||||
// The key must be an unique ID.
|
// The key must be an unique ID.
|
||||||
|
@ -29,7 +86,7 @@ func WithCache(key string, obj interface{}, fetcher DataFetcher) error {
|
||||||
cacheFile := path.Join(cacheDir, key+".json")
|
cacheFile := path.Join(cacheDir, key+".json")
|
||||||
|
|
||||||
stat, err := os.Stat(cacheFile)
|
stat, err := os.Stat(cacheFile)
|
||||||
if os.IsNotExist(err) || (stat != nil && time.Since(stat.ModTime()) > cacheExpiry) {
|
if os.IsNotExist(err) || (stat != nil && time.Since(stat.ModTime()) > fileCacheExpiry) {
|
||||||
log.Debugf("cache %s not found or cache expired, executing fetcher callback to get the data", cacheFile)
|
log.Debugf("cache %s not found or cache expired, executing fetcher callback to get the data", cacheFile)
|
||||||
|
|
||||||
data, err := fetcher()
|
data, err := fetcher()
|
||||||
|
@ -70,6 +127,42 @@ func WithCache(key string, obj interface{}, fetcher DataFetcher) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadExchangeMarketsWithCache(ctx context.Context, ex types.Exchange) (markets types.MarketMap, err error) {
|
func LoadExchangeMarketsWithCache(ctx context.Context, ex types.Exchange) (markets types.MarketMap, err error) {
|
||||||
|
inMem, ok := util.GetEnvVarBool("USE_MARKETS_CACHE_IN_MEMORY")
|
||||||
|
if ok && inMem {
|
||||||
|
return loadMarketsFromMem(ctx, ex)
|
||||||
|
}
|
||||||
|
|
||||||
|
// fallback to use files as cache
|
||||||
|
return loadMarketsFromFile(ctx, ex)
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadMarketsFromMem is useful for one process to run multiple bbgos in different go routines.
|
||||||
|
func loadMarketsFromMem(ctx context.Context, ex types.Exchange) (markets types.MarketMap, _ error) {
|
||||||
|
exName := ex.Name().String()
|
||||||
|
if globalMarketMemCache.IsOutdated(exName) {
|
||||||
|
op := func() error {
|
||||||
|
rst, err2 := ex.QueryMarkets(ctx)
|
||||||
|
if err2 != nil {
|
||||||
|
return err2
|
||||||
|
}
|
||||||
|
|
||||||
|
markets = rst
|
||||||
|
globalMarketMemCache.Set(exName, rst)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := backoff.RetryGeneral(ctx, op); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return markets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rst, _ := globalMarketMemCache.Get(exName)
|
||||||
|
return rst, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadMarketsFromFile(ctx context.Context, ex types.Exchange) (markets types.MarketMap, err error) {
|
||||||
key := fmt.Sprintf("%s-markets", ex.Name())
|
key := fmt.Sprintf("%s-markets", ex.Name())
|
||||||
if futureExchange, implemented := ex.(types.FuturesExchange); implemented {
|
if futureExchange, implemented := ex.(types.FuturesExchange); implemented {
|
||||||
settings := futureExchange.GetFuturesSettings()
|
settings := futureExchange.GetFuturesSettings()
|
||||||
|
@ -82,4 +175,4 @@ func LoadExchangeMarketsWithCache(ctx context.Context, ex types.Exchange) (marke
|
||||||
return ex.QueryMarkets(ctx)
|
return ex.QueryMarkets(ctx)
|
||||||
})
|
})
|
||||||
return markets, err
|
return markets, err
|
||||||
}
|
}
|
103
pkg/cache/cache_test.go
vendored
Normal file
103
pkg/cache/cache_test.go
vendored
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/mock/gomock"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
"github.com/c9s/bbgo/pkg/types/mocks"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_newMarketMemCache(t *testing.T) {
|
||||||
|
cache := newMarketMemCache()
|
||||||
|
assert.NotNil(t, cache)
|
||||||
|
assert.NotNil(t, cache.markets)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_marketMemCache_GetSet(t *testing.T) {
|
||||||
|
cache := newMarketMemCache()
|
||||||
|
cache.Set("max", types.MarketMap{
|
||||||
|
"btctwd": types.Market{
|
||||||
|
Symbol: "btctwd",
|
||||||
|
LocalSymbol: "btctwd",
|
||||||
|
},
|
||||||
|
"ethtwd": types.Market{
|
||||||
|
Symbol: "ethtwd",
|
||||||
|
LocalSymbol: "ethtwd",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
markets, ok := cache.Get("max")
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
btctwd, ok := markets["btctwd"]
|
||||||
|
assert.True(t, ok)
|
||||||
|
ethtwd, ok := markets["ethtwd"]
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, types.Market{
|
||||||
|
Symbol: "btctwd",
|
||||||
|
LocalSymbol: "btctwd",
|
||||||
|
}, btctwd)
|
||||||
|
assert.Equal(t, types.Market{
|
||||||
|
Symbol: "ethtwd",
|
||||||
|
LocalSymbol: "ethtwd",
|
||||||
|
}, ethtwd)
|
||||||
|
|
||||||
|
_, ok = cache.Get("binance")
|
||||||
|
assert.False(t, ok)
|
||||||
|
|
||||||
|
expired := cache.IsOutdated("max")
|
||||||
|
assert.False(t, expired)
|
||||||
|
|
||||||
|
detailed := cache.markets["max"]
|
||||||
|
detailed.updatedAt = time.Now().Add(-2 * memCacheExpiry)
|
||||||
|
cache.markets["max"] = detailed
|
||||||
|
expired = cache.IsOutdated("max")
|
||||||
|
assert.True(t, expired)
|
||||||
|
|
||||||
|
expired = cache.IsOutdated("binance")
|
||||||
|
assert.True(t, expired)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_loadMarketsFromMem(t *testing.T) {
|
||||||
|
mockCtrl := gomock.NewController(t)
|
||||||
|
defer mockCtrl.Finish()
|
||||||
|
|
||||||
|
mockEx := mocks.NewMockExchange(mockCtrl)
|
||||||
|
mockEx.EXPECT().Name().Return(types.ExchangeName("max")).AnyTimes()
|
||||||
|
mockEx.EXPECT().QueryMarkets(gomock.Any()).Return(nil, errors.New("faked")).Times(1)
|
||||||
|
mockEx.EXPECT().QueryMarkets(gomock.Any()).Return(types.MarketMap{
|
||||||
|
"btctwd": types.Market{
|
||||||
|
Symbol: "btctwd",
|
||||||
|
LocalSymbol: "btctwd",
|
||||||
|
},
|
||||||
|
"ethtwd": types.Market{
|
||||||
|
Symbol: "ethtwd",
|
||||||
|
LocalSymbol: "ethtwd",
|
||||||
|
},
|
||||||
|
}, nil).Times(1)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
markets, err := loadMarketsFromMem(context.Background(), mockEx)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
btctwd, ok := markets["btctwd"]
|
||||||
|
assert.True(t, ok)
|
||||||
|
ethtwd, ok := markets["ethtwd"]
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, types.Market{
|
||||||
|
Symbol: "btctwd",
|
||||||
|
LocalSymbol: "btctwd",
|
||||||
|
}, btctwd)
|
||||||
|
assert.Equal(t, types.Market{
|
||||||
|
Symbol: "ethtwd",
|
||||||
|
LocalSymbol: "ethtwd",
|
||||||
|
}, ethtwd)
|
||||||
|
}
|
||||||
|
|
||||||
|
globalMarketMemCache = newMarketMemCache() // reset the global cache
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user