grpc: fix connect and add balance snapshot

This commit is contained in:
c9s 2022-04-15 14:28:35 +08:00
parent dd6588b1e9
commit 84d4f312fa

View File

@ -30,7 +30,7 @@ func (s *UserDataService) Subscribe(request *pb.UserDataRequest, server pb.UserD
if len(sessionName) == 0 {
return fmt.Errorf("session name can not be empty")
}
session, ok := s.Environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
@ -71,13 +71,31 @@ func (s *UserDataService) Subscribe(request *pb.UserDataRequest, server pb.UserD
userDataStream.OnBalanceUpdate(balanceHandler)
userDataStream.OnBalanceSnapshot(balanceHandler)
ctx := server.Context()
balances, err := session.Exchange.QueryAccountBalances(ctx)
if err != nil {
return err
}
err = server.Send(&pb.UserData{
Channel: pb.Channel_BALANCE,
Event: pb.Event_SNAPSHOT,
Balances: transBalances(session, balances),
})
if err != nil {
log.WithError(err).Errorf("grpc: can not send user data")
}
go userDataStream.Connect(ctx)
defer func() {
if err := userDataStream.Close(); err != nil {
log.WithError(err).Errorf("user data stream close error")
}
}()
<-server.Context().Done()
<-ctx.Done()
return nil
}