mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
70 lines
1.5 KiB
Go
70 lines
1.5 KiB
Go
|
package bbgo
|
||
|
|
||
|
import (
|
||
|
"time"
|
||
|
|
||
|
"github.com/c9s/bbgo/pkg/types"
|
||
|
)
|
||
|
|
||
|
type SerialMarketDataStore struct {
|
||
|
*MarketDataStore
|
||
|
KLines map[types.Interval]*types.KLine
|
||
|
Subscription []types.Interval
|
||
|
}
|
||
|
|
||
|
func NewSerialMarketDataStore(symbol string) *SerialMarketDataStore {
|
||
|
return &SerialMarketDataStore{
|
||
|
MarketDataStore: NewMarketDataStore(symbol),
|
||
|
KLines: make(map[types.Interval]*types.KLine),
|
||
|
Subscription: []types.Interval{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (store *SerialMarketDataStore) Subscribe(interval types.Interval) {
|
||
|
// dedup
|
||
|
for _, i := range store.Subscription {
|
||
|
if i == interval {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
store.Subscription = append(store.Subscription, interval)
|
||
|
}
|
||
|
|
||
|
func (store *SerialMarketDataStore) BindStream(stream types.Stream) {
|
||
|
stream.OnKLineClosed(store.handleKLineClosed)
|
||
|
}
|
||
|
|
||
|
func (store *SerialMarketDataStore) handleKLineClosed(kline types.KLine) {
|
||
|
store.AddKLine(kline)
|
||
|
}
|
||
|
|
||
|
func (store *SerialMarketDataStore) AddKLine(kline types.KLine) {
|
||
|
if kline.Symbol != store.Symbol {
|
||
|
return
|
||
|
}
|
||
|
// only consumes kline1m
|
||
|
if kline.Interval != types.Interval1m {
|
||
|
return
|
||
|
}
|
||
|
// endtime in minutes
|
||
|
timestamp := kline.StartTime.Time().Add(time.Minute)
|
||
|
for _, val := range store.Subscription {
|
||
|
k, ok := store.KLines[val]
|
||
|
if !ok {
|
||
|
k = &types.KLine{}
|
||
|
k.Set(&kline)
|
||
|
k.Interval = val
|
||
|
k.Closed = false
|
||
|
store.KLines[val] = k
|
||
|
} else {
|
||
|
k.Merge(&kline)
|
||
|
k.Closed = false
|
||
|
}
|
||
|
if timestamp.Truncate(val.Duration()) == timestamp {
|
||
|
k.Closed = true
|
||
|
store.MarketDataStore.AddKLine(*k)
|
||
|
delete(store.KLines, val)
|
||
|
}
|
||
|
}
|
||
|
}
|