Merge pull request #1276 from bailantaotao/edwin/add-order-info

pkg/exchange: add order event
This commit is contained in:
bailantaotao 2023-08-08 14:58:30 +08:00 committed by GitHub
commit 42bec972c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 6 deletions

View File

@ -33,7 +33,8 @@ type Stream struct {
types.StandardStream
bookEventCallbacks []func(e BookEvent)
walletEventCallbacks []func(e []*WalletEvent)
walletEventCallbacks []func(e []WalletEvent)
orderEventCallbacks []func(e []OrderEvent)
}
func NewStream(key, secret string) *Stream {
@ -52,6 +53,7 @@ func NewStream(key, secret string) *Stream {
stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnWalletEvent(stream.handleWalletEvent)
stream.OnOrderEvent(stream.handleOrderEvent)
return stream
}
@ -75,8 +77,11 @@ func (s *Stream) dispatchEvent(event interface{}) {
case *BookEvent:
s.EmitBookEvent(*e)
case []*WalletEvent:
case []WalletEvent:
s.EmitWalletEvent(e)
case []OrderEvent:
s.EmitOrderEvent(e)
}
}
@ -105,8 +110,13 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
return &book, nil
case TopicTypeWallet:
var wallets []*WalletEvent
var wallets []WalletEvent
return wallets, json.Unmarshal(e.WebSocketTopicEvent.Data, &wallets)
case TopicTypeOrder:
var orders []OrderEvent
return orders, json.Unmarshal(e.WebSocketTopicEvent.Data, &orders)
}
}
@ -194,6 +204,7 @@ func (s *Stream) handlerConnect() {
Op: WsOpTypeSubscribe,
Args: []string{
string(TopicTypeWallet),
string(TopicTypeOrder),
},
}); err != nil {
log.WithError(err).Error("failed to send subscription request")
@ -228,7 +239,7 @@ func (s *Stream) handleBookEvent(e BookEvent) {
}
}
func (s *Stream) handleWalletEvent(events []*WalletEvent) {
func (s *Stream) handleWalletEvent(events []WalletEvent) {
bm := types.BalanceMap{}
for _, event := range events {
if event.AccountType != AccountTypeSpot {
@ -246,3 +257,18 @@ func (s *Stream) handleWalletEvent(events []*WalletEvent) {
s.StandardStream.EmitBalanceSnapshot(bm)
}
func (s *Stream) handleOrderEvent(events []OrderEvent) {
for _, event := range events {
if event.Category != bybitapi.CategorySpot {
return
}
gOrder, err := toGlobalOrder(event.Order)
if err != nil {
log.WithError(err).Error("failed to convert to global order")
continue
}
s.StandardStream.EmitOrderUpdate(*gOrder)
}
}

View File

@ -14,12 +14,22 @@ func (s *Stream) EmitBookEvent(e BookEvent) {
}
}
func (s *Stream) OnWalletEvent(cb func(e []*WalletEvent)) {
func (s *Stream) OnWalletEvent(cb func(e []WalletEvent)) {
s.walletEventCallbacks = append(s.walletEventCallbacks, cb)
}
func (s *Stream) EmitWalletEvent(e []*WalletEvent) {
func (s *Stream) EmitWalletEvent(e []WalletEvent) {
for _, cb := range s.walletEventCallbacks {
cb(e)
}
}
func (s *Stream) OnOrderEvent(cb func(e []OrderEvent)) {
s.orderEventCallbacks = append(s.orderEventCallbacks, cb)
}
func (s *Stream) EmitOrderEvent(e []OrderEvent) {
for _, cb := range s.orderEventCallbacks {
cb(e)
}
}

View File

@ -65,6 +65,17 @@ func TestStream(t *testing.T) {
c := make(chan struct{})
<-c
})
t.Run("order test", func(t *testing.T) {
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnOrderUpdate(func(order types.Order) {
t.Log("got update", order)
})
c := make(chan struct{})
<-c
})
}
func TestStream_parseWebSocketEvent(t *testing.T) {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
@ -80,6 +81,7 @@ type TopicType string
const (
TopicTypeOrderBook TopicType = "orderbook"
TopicTypeWallet TopicType = "wallet"
TopicTypeOrder TopicType = "order"
)
type DataType string
@ -201,3 +203,9 @@ type WalletEvent struct {
MarginCollateral bool `json:"marginCollateral"`
} `json:"coin"`
}
type OrderEvent struct {
bybitapi.Order
Category bybitapi.Category `json:"category"`
}