Merge pull request #1309 from bailantaotao/edwin/add-auth-event

FEATURE: add auth event
This commit is contained in:
c9s 2023-09-14 21:47:18 +08:00 committed by GitHub
commit 4a8407bde3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 33 additions and 0 deletions

View File

@ -150,6 +150,9 @@ func (s *Stream) handleDisconnect() {
func (s *Stream) handleConnect() {
if !s.PublicOnly {
// Emit Auth before establishing the connection to prevent the caller from missing the Update data after
// creating the order.
s.EmitAuth()
return
}

View File

@ -137,6 +137,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
if err := e.IsValid(); err != nil {
log.Errorf("invalid event: %v", err)
}
if e.IsAuthenticated() {
s.EmitAuth()
}
case *BookEvent:
s.EmitBookEvent(*e)

View File

@ -126,6 +126,9 @@ func TestStream(t *testing.T) {
})
t.Run("wallet test", func(t *testing.T) {
s.OnAuth(func() {
t.Log("authenticated")
})
err := s.Connect(context.Background())
assert.NoError(t, err)

View File

@ -88,6 +88,10 @@ func (w *WebSocketOpEvent) IsValid() error {
}
}
func (w *WebSocketOpEvent) IsAuthenticated() bool {
return w.Op == WsOpTypeAuth && w.Success
}
type TopicType string
const (

View File

@ -178,6 +178,9 @@ func (s *Stream) handleConnect() {
return
}
} else {
// Emit Auth before establishing the connection to prevent the caller from missing the Update data after
// creating the order.
s.EmitAuth()
id := time.Now().UnixNano() / int64(time.Millisecond)
cmds := []WebSocketCommand{
{

View File

@ -53,6 +53,7 @@ func NewStream(key, secret string) *Stream {
stream.OnConnect(stream.handleConnect)
stream.OnAuthEvent(func(e max.AuthEvent) {
log.Infof("max websocket connection authenticated: %+v", e)
stream.EmitAuth()
})
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnOrderSnapshotEvent(stream.handleOrderSnapshotEvent)

View File

@ -118,6 +118,7 @@ func (s *Stream) handleEvent(event WebSocketEvent) {
switch event.Event {
case "login":
if event.Code == "0" {
s.EmitAuth()
var subs = []WebsocketSubscription{
{Channel: "account"},
{Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)},

View File

@ -34,6 +34,16 @@ func (s *StandardStream) EmitDisconnect() {
}
}
func (s *StandardStream) OnAuth(cb func()) {
s.authCallbacks = append(s.authCallbacks, cb)
}
func (s *StandardStream) EmitAuth() {
for _, cb := range s.authCallbacks {
cb()
}
}
func (s *StandardStream) OnTradeUpdate(cb func(trade Trade)) {
s.tradeUpdateCallbacks = append(s.tradeUpdateCallbacks, cb)
}
@ -171,6 +181,8 @@ type StandardStreamEventHub interface {
OnDisconnect(cb func())
OnAuth(cb func())
OnTradeUpdate(cb func(trade Trade))
OnOrderUpdate(cb func(order Order))

View File

@ -98,6 +98,8 @@ type StandardStream struct {
disconnectCallbacks []func()
authCallbacks []func()
// private trade update callbacks
tradeUpdateCallbacks []func(trade Trade)
@ -138,6 +140,7 @@ type StandardStreamEmitter interface {
EmitStart()
EmitConnect()
EmitDisconnect()
EmitAuth()
EmitTradeUpdate(Trade)
EmitOrderUpdate(Order)
EmitBalanceSnapshot(BalanceMap)