diff --git a/pkg/twap/v2/done.go b/pkg/twap/v2/done.go new file mode 100644 index 000000000..9d498b939 --- /dev/null +++ b/pkg/twap/v2/done.go @@ -0,0 +1,39 @@ +package twap + +import "sync" + +type DoneSignal struct { + doneC chan struct{} + mu sync.Mutex +} + +func NewDoneSignal() *DoneSignal { + return &DoneSignal{ + doneC: make(chan struct{}), + } +} + +func (e *DoneSignal) Emit() { + e.mu.Lock() + if e.doneC == nil { + e.doneC = make(chan struct{}) + } + + close(e.doneC) + e.mu.Unlock() +} + +// Chan returns a channel that emits a signal when the execution is done. +func (e *DoneSignal) Chan() (c <-chan struct{}) { + // if the channel is not allocated, it means it's not started yet, we need to return a closed channel + e.mu.Lock() + if e.doneC == nil { + e.doneC = make(chan struct{}) + c = e.doneC + } else { + c = e.doneC + } + e.mu.Unlock() + + return c +} diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index 51e56b8d6..1b41889bb 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -18,42 +18,6 @@ import ( var defaultUpdateInterval = time.Minute -type DoneSignal struct { - doneC chan struct{} - mu sync.Mutex -} - -func NewDoneSignal() *DoneSignal { - return &DoneSignal{ - doneC: make(chan struct{}), - } -} - -func (e *DoneSignal) Emit() { - e.mu.Lock() - if e.doneC == nil { - e.doneC = make(chan struct{}) - } - - close(e.doneC) - e.mu.Unlock() -} - -// Chan returns a channel that emits a signal when the execution is done. -func (e *DoneSignal) Chan() (c <-chan struct{}) { - // if the channel is not allocated, it means it's not started yet, we need to return a closed channel - e.mu.Lock() - if e.doneC == nil { - e.doneC = make(chan struct{}) - c = e.doneC - } else { - c = e.doneC - } - e.mu.Unlock() - - return c -} - // FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API. // It uses a fixed target quantity to place orders. type FixedQuantityExecutor struct {