diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 6f47bf19e..79c8a022d 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -40,6 +40,13 @@ func (slice PriceVolumeSlice) Copy() PriceVolumeSlice { return append(slice[:0:0], slice...) } +func (slice PriceVolumeSlice) First() (PriceVolume, bool) { + if len(slice) > 0 { + return slice[0], true + } + return PriceVolume{}, false +} + func (slice PriceVolumeSlice) IndexByVolumeDepth(requiredVolume fixedpoint.Value) int { var tv int64 = 0 for x, el := range slice { diff --git a/pkg/types/standardstream_callbacks.go b/pkg/types/standardstream_callbacks.go index 967ae52e3..360e34651 100644 --- a/pkg/types/standardstream_callbacks.go +++ b/pkg/types/standardstream_callbacks.go @@ -2,18 +2,26 @@ package types -import () - -func (stream *StandardStream) OnTrade(cb func(trade Trade)) { - stream.tradeCallbacks = append(stream.tradeCallbacks, cb) +func (stream *StandardStream) OnTradeUpdate(cb func(trade Trade)) { + stream.tradeUpdateCallbacks = append(stream.tradeUpdateCallbacks, cb) } -func (stream *StandardStream) EmitTrade(trade Trade) { - for _, cb := range stream.tradeCallbacks { +func (stream *StandardStream) EmitTradeUpdate(trade Trade) { + for _, cb := range stream.tradeUpdateCallbacks { cb(trade) } } +func (stream *StandardStream) OnOrderUpdate(cb func(order Order)) { + stream.orderUpdateCallbacks = append(stream.orderUpdateCallbacks, cb) +} + +func (stream *StandardStream) EmitOrderUpdate(order Order) { + for _, cb := range stream.orderUpdateCallbacks { + cb(order) + } +} + func (stream *StandardStream) OnBalanceSnapshot(cb func(balances map[string]Balance)) { stream.balanceSnapshotCallbacks = append(stream.balanceSnapshotCallbacks, cb) } @@ -75,7 +83,9 @@ func (stream *StandardStream) EmitBookSnapshot(book OrderBook) { } type StandardStreamEventHub interface { - OnTrade(cb func(trade Trade)) + OnTradeUpdate(cb func(trade Trade)) + + OnOrderUpdate(cb func(order Order)) OnBalanceSnapshot(cb func(balances map[string]Balance)) diff --git a/pkg/types/stream.go b/pkg/types/stream.go index b55ffd009..3f3e59f96 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -22,8 +22,11 @@ var KLineChannel = Channel("kline") type StandardStream struct { Subscriptions []Subscription - // private trade callbacks - tradeCallbacks []func(trade Trade) + // private trade update callbacks + tradeUpdateCallbacks []func(trade Trade) + + // private order update callbacks + orderUpdateCallbacks []func(order Order) // balance snapshot callbacks balanceSnapshotCallbacks []func(balances map[string]Balance)