mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
166 lines
4.2 KiB
Go
166 lines
4.2 KiB
Go
package bbgo
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
|
"github.com/c9s/bbgo/pkg/types"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type SerialMarketDataStore struct {
|
|
*MarketDataStore
|
|
UseAggTrade bool
|
|
KLines map[types.Interval]*types.KLine
|
|
MinInterval types.Interval
|
|
Subscription []types.Interval
|
|
o, h, l, c, v, qv, price fixedpoint.Value
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// @param symbol: symbol to trace on
|
|
// @param minInterval: unit interval, related to your signal timeframe
|
|
// @param useAggTrade: if not assigned, default to false. if assigned to true, will use AggTrade signal to generate klines
|
|
func NewSerialMarketDataStore(symbol string, minInterval types.Interval, useAggTrade ...bool) *SerialMarketDataStore {
|
|
return &SerialMarketDataStore{
|
|
MarketDataStore: NewMarketDataStore(symbol),
|
|
KLines: make(map[types.Interval]*types.KLine),
|
|
UseAggTrade: len(useAggTrade) > 0 && useAggTrade[0],
|
|
Subscription: []types.Interval{},
|
|
MinInterval: minInterval,
|
|
}
|
|
}
|
|
|
|
func (store *SerialMarketDataStore) Subscribe(interval types.Interval) {
|
|
// dedup
|
|
for _, i := range store.Subscription {
|
|
if i == interval {
|
|
return
|
|
}
|
|
}
|
|
store.Subscription = append(store.Subscription, interval)
|
|
}
|
|
|
|
func (store *SerialMarketDataStore) BindStream(ctx context.Context, stream types.Stream) {
|
|
if store.UseAggTrade {
|
|
if IsBackTesting {
|
|
log.Errorf("Right now in backtesting, aggTrade event is not yet supported. Use OnKLineClosed instead.")
|
|
stream.OnKLineClosed(store.handleKLineClosed)
|
|
return
|
|
}
|
|
go store.tickerProcessor(ctx)
|
|
stream.OnAggTrade(store.handleMarketTrade)
|
|
} else {
|
|
stream.OnKLineClosed(store.handleKLineClosed)
|
|
}
|
|
}
|
|
|
|
func (store *SerialMarketDataStore) handleKLineClosed(kline types.KLine) {
|
|
store.AddKLine(kline)
|
|
}
|
|
|
|
func (store *SerialMarketDataStore) handleMarketTrade(trade types.Trade) {
|
|
store.mu.Lock()
|
|
store.price = trade.Price
|
|
store.c = store.price
|
|
if store.price.Compare(store.h) > 0 {
|
|
store.h = store.price
|
|
}
|
|
if !store.l.IsZero() {
|
|
if store.price.Compare(store.l) < 0 {
|
|
store.l = store.price
|
|
}
|
|
} else {
|
|
store.l = store.price
|
|
}
|
|
if store.o.IsZero() {
|
|
store.o = store.price
|
|
}
|
|
store.v.Add(trade.Quantity)
|
|
store.qv.Add(trade.QuoteQuantity)
|
|
store.mu.Unlock()
|
|
}
|
|
|
|
func (store *SerialMarketDataStore) tickerProcessor(ctx context.Context) {
|
|
duration := store.MinInterval.Duration()
|
|
intervalCloseTicker := time.NewTicker(duration)
|
|
defer intervalCloseTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case time := <-intervalCloseTicker.C:
|
|
kline := types.KLine{
|
|
Symbol: store.Symbol,
|
|
StartTime: types.Time(time.Add(-1 * duration)),
|
|
EndTime: types.Time(time),
|
|
Interval: store.MinInterval,
|
|
Closed: true,
|
|
}
|
|
store.mu.Lock()
|
|
if store.c.IsZero() {
|
|
kline.Open = store.price
|
|
kline.Close = store.price
|
|
kline.High = store.price
|
|
kline.Low = store.price
|
|
kline.Volume = fixedpoint.Zero
|
|
kline.QuoteVolume = fixedpoint.Zero
|
|
} else {
|
|
kline.Open = store.o
|
|
kline.Close = store.c
|
|
kline.High = store.h
|
|
kline.Low = store.l
|
|
kline.Volume = store.v
|
|
kline.QuoteVolume = store.qv
|
|
store.o = fixedpoint.Zero
|
|
store.c = fixedpoint.Zero
|
|
store.h = fixedpoint.Zero
|
|
store.l = fixedpoint.Zero
|
|
store.v = fixedpoint.Zero
|
|
store.qv = fixedpoint.Zero
|
|
}
|
|
store.mu.Unlock()
|
|
store.AddKLine(kline, true)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (store *SerialMarketDataStore) AddKLine(kline types.KLine, async ...bool) {
|
|
if kline.Symbol != store.Symbol {
|
|
return
|
|
}
|
|
// only consumes MinInterval
|
|
if kline.Interval != store.MinInterval {
|
|
return
|
|
}
|
|
// endtime
|
|
duration := store.MinInterval.Duration()
|
|
timestamp := kline.StartTime.Time().Round(duration).Add(duration)
|
|
for _, val := range store.Subscription {
|
|
k, ok := store.KLines[val]
|
|
if !ok {
|
|
k = &types.KLine{}
|
|
k.Set(&kline)
|
|
k.Interval = val
|
|
k.Closed = false
|
|
store.KLines[val] = k
|
|
} else {
|
|
k.Merge(&kline)
|
|
k.Closed = false
|
|
}
|
|
if timestamp.Round(val.Duration()) == timestamp {
|
|
k.Closed = true
|
|
if len(async) > 0 && async[0] {
|
|
go store.MarketDataStore.AddKLine(*k)
|
|
} else {
|
|
store.MarketDataStore.AddKLine(*k)
|
|
}
|
|
delete(store.KLines, val)
|
|
}
|
|
}
|
|
}
|