pkg/exchange: add order event

This commit is contained in:
Edwin 2023-08-08 11:40:50 +08:00
parent 5349e5afbe
commit f664ef2262
4 changed files with 55 additions and 0 deletions

View File

@ -34,6 +34,7 @@ type Stream struct {
bookEventCallbacks []func(e BookEvent) bookEventCallbacks []func(e BookEvent)
walletEventCallbacks []func(e []*WalletEvent) walletEventCallbacks []func(e []*WalletEvent)
orderEventCallbacks []func(e []*OrderEvent)
} }
func NewStream(key, secret string) *Stream { func NewStream(key, secret string) *Stream {
@ -52,6 +53,7 @@ func NewStream(key, secret string) *Stream {
stream.OnConnect(stream.handlerConnect) stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent) stream.OnBookEvent(stream.handleBookEvent)
stream.OnWalletEvent(stream.handleWalletEvent) stream.OnWalletEvent(stream.handleWalletEvent)
stream.OnOrderEvent(stream.handleOrderEvent)
return stream return stream
} }
@ -77,6 +79,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
case []*WalletEvent: case []*WalletEvent:
s.EmitWalletEvent(e) s.EmitWalletEvent(e)
case []*OrderEvent:
s.EmitOrderEvent(e)
} }
} }
@ -107,6 +112,11 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
case TopicTypeWallet: case TopicTypeWallet:
var wallets []*WalletEvent var wallets []*WalletEvent
return wallets, json.Unmarshal(e.WebSocketTopicEvent.Data, &wallets) return wallets, json.Unmarshal(e.WebSocketTopicEvent.Data, &wallets)
case TopicTypeOrder:
var orders []*OrderEvent
return orders, json.Unmarshal(e.WebSocketTopicEvent.Data, &orders)
} }
} }
@ -194,6 +204,7 @@ func (s *Stream) handlerConnect() {
Op: WsOpTypeSubscribe, Op: WsOpTypeSubscribe,
Args: []string{ Args: []string{
string(TopicTypeWallet), string(TopicTypeWallet),
string(TopicTypeOrder),
}, },
}); err != nil { }); err != nil {
log.WithError(err).Error("failed to send subscription request") log.WithError(err).Error("failed to send subscription request")
@ -246,3 +257,18 @@ func (s *Stream) handleWalletEvent(events []*WalletEvent) {
s.StandardStream.EmitBalanceSnapshot(bm) s.StandardStream.EmitBalanceSnapshot(bm)
} }
func (s *Stream) handleOrderEvent(events []*OrderEvent) {
for _, event := range events {
if event.Category != bybitapi.CategorySpot {
return
}
gOrder, err := toGlobalOrder(event.Order)
if err != nil {
log.WithError(err).Error("failed to convert to global order")
continue
}
s.StandardStream.EmitOrderUpdate(*gOrder)
}
}

View File

@ -23,3 +23,13 @@ func (s *Stream) EmitWalletEvent(e []*WalletEvent) {
cb(e) cb(e)
} }
} }
func (s *Stream) OnOrderEvent(cb func(e []*OrderEvent)) {
s.orderEventCallbacks = append(s.orderEventCallbacks, cb)
}
func (s *Stream) EmitOrderEvent(e []*OrderEvent) {
for _, cb := range s.orderEventCallbacks {
cb(e)
}
}

View File

@ -65,6 +65,17 @@ func TestStream(t *testing.T) {
c := make(chan struct{}) c := make(chan struct{})
<-c <-c
}) })
t.Run("order test", func(t *testing.T) {
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnOrderUpdate(func(order types.Order) {
t.Log("got update", order)
})
c := make(chan struct{})
<-c
})
} }
func TestStream_parseWebSocketEvent(t *testing.T) { func TestStream_parseWebSocketEvent(t *testing.T) {

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -80,6 +81,7 @@ type TopicType string
const ( const (
TopicTypeOrderBook TopicType = "orderbook" TopicTypeOrderBook TopicType = "orderbook"
TopicTypeWallet TopicType = "wallet" TopicTypeWallet TopicType = "wallet"
TopicTypeOrder TopicType = "order"
) )
type DataType string type DataType string
@ -201,3 +203,9 @@ type WalletEvent struct {
MarginCollateral bool `json:"marginCollateral"` MarginCollateral bool `json:"marginCollateral"`
} `json:"coin"` } `json:"coin"`
} }
type OrderEvent struct {
bybitapi.Order
Category bybitapi.Category `json:"category"`
}