grpc: fix user data stream subscribe

This commit is contained in:
c9s 2022-04-15 00:13:19 +08:00
parent 84c34f1d50
commit 8b8cffbd06

View File

@ -25,54 +25,59 @@ type UserDataService struct {
} }
func (s *UserDataService) Subscribe(request *pb.UserDataRequest, server pb.UserDataService_SubscribeServer) error { func (s *UserDataService) Subscribe(request *pb.UserDataRequest, server pb.UserDataService_SubscribeServer) error {
streamPool := map[string]types.Stream{} sessionName := request.Session
for sessionName, session := range s.Environ.Sessions() {
if request.Session == sessionName { if len(sessionName) == 0 {
userDataStream := session.Exchange.NewStream() return fmt.Errorf("session name can not be empty")
userDataStream.OnOrderUpdate(func(order types.Order) { }
err := server.Send(&pb.UserData{
Channel: pb.Channel_ORDER, session, ok := s.Environ.Session(sessionName)
Event: pb.Event_UPDATE, if !ok {
Orders: []*pb.Order{transOrder(session, order)}, return fmt.Errorf("session %s not found", sessionName)
})
if err != nil {
log.WithError(err).Errorf("grpc: can not send user data")
}
})
userDataStream.OnTradeUpdate(func(trade types.Trade) {
err := server.Send(&pb.UserData{
Channel: pb.Channel_TRADE,
Event: pb.Event_UPDATE,
Trades: []*pb.Trade{transTrade(session, trade)},
})
if err != nil {
log.WithError(err).Errorf("grpc: can not send user data")
}
})
userDataStream.OnBalanceUpdate(func(balances types.BalanceMap) {
err := server.Send(&pb.UserData{
Channel: pb.Channel_BALANCE,
Event: pb.Event_UPDATE,
Balances: transBalances(session, balances),
})
if err != nil {
log.WithError(err).Errorf("grpc: can not send user data")
}
})
streamPool[ sessionName ] = userDataStream
}
} }
<-server.Context().Done() userDataStream := session.Exchange.NewStream()
userDataStream.OnOrderUpdate(func(order types.Order) {
err := server.Send(&pb.UserData{
Channel: pb.Channel_ORDER,
Event: pb.Event_UPDATE,
Orders: []*pb.Order{transOrder(session, order)},
})
if err != nil {
log.WithError(err).Errorf("grpc: can not send user data")
}
})
userDataStream.OnTradeUpdate(func(trade types.Trade) {
err := server.Send(&pb.UserData{
Channel: pb.Channel_TRADE,
Event: pb.Event_UPDATE,
Trades: []*pb.Trade{transTrade(session, trade)},
})
if err != nil {
log.WithError(err).Errorf("grpc: can not send user data")
}
})
balanceHandler := func(balances types.BalanceMap) {
err := server.Send(&pb.UserData{
Channel: pb.Channel_BALANCE,
Event: pb.Event_UPDATE,
Balances: transBalances(session, balances),
})
if err != nil {
log.WithError(err).Errorf("grpc: can not send user data")
}
}
userDataStream.OnBalanceUpdate(balanceHandler)
userDataStream.OnBalanceSnapshot(balanceHandler)
defer func() { defer func() {
for _, stream := range streamPool { if err := userDataStream.Close(); err != nil {
if err := stream.Close(); err != nil { log.WithError(err).Errorf("user data stream close error")
log.WithError(err).Errorf("user data stream close error")
}
} }
}() }()
<-server.Context().Done()
return nil return nil
} }