grpc: refactor subscription convert

This commit is contained in:
c9s 2022-04-13 13:06:26 +08:00
parent 606a7b3220
commit 2e063e7eb2

View File

@ -25,6 +25,36 @@ type Server struct {
pb.UnimplementedMarketDataServiceServer
}
func toSubscriptions(sub *pb.Subscription) (types.Subscription, error) {
switch sub.Channel {
case pb.Channel_TRADE:
return types.Subscription{
Symbol: sub.Symbol,
Channel: types.MarketTradeChannel,
}, nil
case pb.Channel_BOOK:
return types.Subscription{
Symbol: sub.Symbol,
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.Depth(sub.Depth),
},
}, nil
case pb.Channel_KLINE:
return types.Subscription{
Symbol: sub.Symbol,
Channel: types.KLineChannel,
Options: types.SubscribeOptions{
Interval: sub.Interval,
},
}, nil
}
return types.Subscription{}, fmt.Errorf("unsupported subscription channel: %s", sub.Channel)
}
func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataService_SubscribeServer) error {
exchangeSubscriptions := map[string][]types.Subscription{}
for _, sub := range request.Subscriptions {
@ -33,31 +63,12 @@ func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataSer
return fmt.Errorf("exchange %s not found", sub.Exchange)
}
switch sub.Channel {
case pb.Channel_TRADE:
exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{
Symbol: sub.Symbol,
Channel: types.MarketTradeChannel,
})
case pb.Channel_BOOK:
exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{
Symbol: sub.Symbol,
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.Depth(sub.Depth),
},
})
case pb.Channel_KLINE:
exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{
Symbol: sub.Symbol,
Channel: types.KLineChannel,
Options: types.SubscribeOptions{
Interval: sub.Interval,
},
})
ss, err := toSubscriptions(sub)
if err != nil {
return err
}
exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], ss)
}
streamPool := map[string]types.Stream{}