From 2e063e7eb2fcc35bf7ad22b5a6b024b61d8f57af Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 13 Apr 2022 13:06:26 +0800 Subject: [PATCH] grpc: refactor subscription convert --- pkg/grpc/server.go | 59 +++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index 878596398..a03b74a29 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -25,6 +25,36 @@ type Server struct { pb.UnimplementedMarketDataServiceServer } +func toSubscriptions(sub *pb.Subscription) (types.Subscription, error) { + switch sub.Channel { + case pb.Channel_TRADE: + return types.Subscription{ + Symbol: sub.Symbol, + Channel: types.MarketTradeChannel, + }, nil + + case pb.Channel_BOOK: + return types.Subscription{ + Symbol: sub.Symbol, + Channel: types.BookChannel, + Options: types.SubscribeOptions{ + Depth: types.Depth(sub.Depth), + }, + }, nil + + case pb.Channel_KLINE: + return types.Subscription{ + Symbol: sub.Symbol, + Channel: types.KLineChannel, + Options: types.SubscribeOptions{ + Interval: sub.Interval, + }, + }, nil + } + + return types.Subscription{}, fmt.Errorf("unsupported subscription channel: %s", sub.Channel) +} + func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataService_SubscribeServer) error { exchangeSubscriptions := map[string][]types.Subscription{} for _, sub := range request.Subscriptions { @@ -33,31 +63,12 @@ func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataSer return fmt.Errorf("exchange %s not found", sub.Exchange) } - switch sub.Channel { - case pb.Channel_TRADE: - exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{ - Symbol: sub.Symbol, - Channel: types.MarketTradeChannel, - }) - - case pb.Channel_BOOK: - exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{ - Symbol: sub.Symbol, - Channel: types.BookChannel, - Options: types.SubscribeOptions{ - Depth: types.Depth(sub.Depth), - }, - }) - - case pb.Channel_KLINE: - exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{ - Symbol: sub.Symbol, - Channel: types.KLineChannel, - Options: types.SubscribeOptions{ - Interval: sub.Interval, - }, - }) + ss, err := toSubscriptions(sub) + if err != nil { + return err } + + exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], ss) } streamPool := map[string]types.Stream{}