grpc: separate market data message and user data message

This commit is contained in:
c9s 2022-04-13 14:14:25 +08:00
parent ec6a94e485
commit 8e81716d2a
5 changed files with 768 additions and 645 deletions

View File

@ -49,8 +49,8 @@ func transPriceVolume(srcPvs types.PriceVolumeSlice) (pvs []*pb.PriceVolume) {
return pvs return pvs
} }
func transBook(session *bbgo.ExchangeSession, book types.SliceOrderBook, event pb.Event) *pb.SubscribeResponse { func transBook(session *bbgo.ExchangeSession, book types.SliceOrderBook, event pb.Event) *pb.MarketData {
return &pb.SubscribeResponse{ return &pb.MarketData{
Session: session.Name, Session: session.Name,
Exchange: session.ExchangeName.String(), Exchange: session.ExchangeName.String(),
Symbol: book.Symbol, Symbol: book.Symbol,
@ -65,8 +65,8 @@ func transBook(session *bbgo.ExchangeSession, book types.SliceOrderBook, event p
} }
} }
func transMarketTrade(session *bbgo.ExchangeSession, marketTrade types.Trade) *pb.SubscribeResponse { func transMarketTrade(session *bbgo.ExchangeSession, marketTrade types.Trade) *pb.MarketData {
return &pb.SubscribeResponse{ return &pb.MarketData{
Session: session.Name, Session: session.Name,
Exchange: session.ExchangeName.String(), Exchange: session.ExchangeName.String(),
Symbol: marketTrade.Symbol, Symbol: marketTrade.Symbol,
@ -100,27 +100,31 @@ func transSide(side types.SideType) pb.Side {
return pb.Side_SELL return pb.Side_SELL
} }
func transKLine(session *bbgo.ExchangeSession, kline types.KLine) *pb.SubscribeResponse { func transKLine(session *bbgo.ExchangeSession, kline types.KLine) *pb.KLine {
return &pb.SubscribeResponse{ return &pb.KLine{
Session: session.Name, Session: session.Name,
Exchange: kline.Exchange.String(), Exchange: kline.Exchange.String(),
Symbol: kline.Symbol, Symbol: kline.Symbol,
Channel: pb.Channel_KLINE, Open: kline.Open.String(),
Event: pb.Event_UPDATE, High: kline.High.String(),
Kline: &pb.KLine{ Low: kline.Low.String(),
Session: session.Name, Close: kline.Close.String(),
Exchange: kline.Exchange.String(), Volume: kline.Volume.String(),
Symbol: kline.Symbol, QuoteVolume: kline.QuoteVolume.String(),
Open: kline.Open.String(), StartTime: kline.StartTime.UnixMilli(),
High: kline.High.String(), EndTime: kline.StartTime.UnixMilli(),
Low: kline.Low.String(), Closed: kline.Closed,
Close: kline.Close.String(), }
Volume: kline.Volume.String(), }
QuoteVolume: kline.QuoteVolume.String(),
StartTime: kline.StartTime.UnixMilli(), func transKLineResponse(session *bbgo.ExchangeSession, kline types.KLine) *pb.MarketData {
EndTime: kline.StartTime.UnixMilli(), return &pb.MarketData{
Closed: kline.Closed, Session: session.Name,
}, Exchange: kline.Exchange.String(),
Symbol: kline.Symbol,
Channel: pb.Channel_KLINE,
Event: pb.Event_UPDATE,
Kline: transKLine(session, kline),
SubscribedAt: 0, SubscribedAt: 0,
} }
} }

View File

@ -22,6 +22,11 @@ type Server struct {
Trader *bbgo.Trader Trader *bbgo.Trader
pb.UnimplementedMarketDataServiceServer pb.UnimplementedMarketDataServiceServer
pb.UnimplementedUserDataServiceServer
}
func (s *Server) SubscribeUserData(empty *pb.Empty, server pb.UserDataService_SubscribeServer) error {
return nil
} }
func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataService_SubscribeServer) error { func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataService_SubscribeServer) error {
@ -73,7 +78,7 @@ func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataSer
} }
}) })
stream.OnKLineClosed(func(kline types.KLine) { stream.OnKLineClosed(func(kline types.KLine) {
err := server.Send(transKLine(session, kline)) err := server.Send(transKLineResponse(session, kline))
if err != nil { if err != nil {
log.WithError(err).Error("grpc stream send error") log.WithError(err).Error("grpc stream send error")
} }
@ -132,19 +137,7 @@ func (s *Server) QueryKLines(ctx context.Context, request *pb.QueryKLinesRequest
} }
for _, kline := range klines { for _, kline := range klines {
response.Klines = append(response.Klines, &pb.KLine{ response.Klines = append(response.Klines, transKLine(session, kline))
Exchange: kline.Exchange.String(),
Symbol: kline.Symbol,
Open: kline.Open.String(),
High: kline.High.String(),
Low: kline.Low.String(),
Close: kline.Close.String(),
Volume: kline.Volume.String(),
QuoteVolume: kline.QuoteVolume.String(),
Closed: kline.Closed,
StartTime: kline.StartTime.UnixMilli(),
EndTime: kline.EndTime.UnixMilli(),
})
} }
return response, nil return response, nil

File diff suppressed because it is too large Load Diff

View File

@ -5,13 +5,12 @@ package bbgo;
option go_package = "../pb"; option go_package = "../pb";
service MarketDataService { service MarketDataService {
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} rpc Subscribe(SubscribeRequest) returns (stream MarketData) {}
rpc QueryKLines(QueryKLinesRequest) returns (QueryKLinesResponse) {} rpc QueryKLines(QueryKLinesRequest) returns (QueryKLinesResponse) {}
} }
service UserDataService { service UserDataService {
// should support streaming rpc Subscribe(UserDataRequest) returns (stream UserData) {}
rpc SubscribeUserData(Empty) returns (stream SubscribeResponse) {}
} }
service TradingService { service TradingService {
@ -30,12 +29,6 @@ enum Event {
SNAPSHOT = 3; SNAPSHOT = 3;
UPDATE = 4; UPDATE = 4;
AUTHENTICATED = 5; AUTHENTICATED = 5;
ORDER_SNAPSHOT = 6;
ORDER_UPDATE = 7;
TRADE_SNAPSHOT = 8;
TRADE_UPDATE = 9;
ACCOUNT_SNAPSHOT = 10;
ACCOUNT_UPDATE = 11;
ERROR = 99; ERROR = 99;
} }
@ -43,8 +36,8 @@ enum Channel {
BOOK = 0; BOOK = 0;
TRADE = 1; TRADE = 1;
TICKER = 2; TICKER = 2;
USER = 3; KLINE = 3;
KLINE = 4; BALANCE = 4;
} }
enum Side { enum Side {
@ -68,6 +61,20 @@ message Error {
string error_message = 2; string error_message = 2;
} }
message UserDataRequest {
string session = 1;
}
message UserData {
string session = 1;
string exchange = 2;
Channel channel = 3; // trade, order, balance
Event event = 4; // snapshot, update ...
repeated Balance balances = 5;
repeated Trade trades = 6;
repeated Order orders = 7;
}
message SubscribeRequest { message SubscribeRequest {
repeated Subscription subscriptions = 1; repeated Subscription subscriptions = 1;
} }
@ -80,18 +87,16 @@ message Subscription {
string interval = 5; // interval is for kline channel string interval = 5; // interval is for kline channel
} }
message SubscribeResponse { message MarketData {
string session = 1; string session = 1;
string exchange = 2; string exchange = 2;
string symbol = 3; string symbol = 3;
Channel channel = 4; // book, trade, ticker, user Channel channel = 4; // book, trade, ticker, user
Event event = 5; // snapshot, update, order_snapshot, ... Event event = 5; // snapshot or update
Depth depth = 6; // depth: used by book Depth depth = 6; // depth: used by book
KLine kline = 7; KLine kline = 7;
repeated Trade trades = 8; Ticker ticker = 9; // market ticker
Ticker ticker = 9; repeated Trade trades = 8; // market trades
repeated Order orders = 10;
repeated Balance balances = 11;
int64 subscribed_at = 12; int64 subscribed_at = 12;
Error error = 13; Error error = 13;
} }

View File

@ -46,7 +46,7 @@ func (c *marketDataServiceClient) Subscribe(ctx context.Context, in *SubscribeRe
} }
type MarketDataService_SubscribeClient interface { type MarketDataService_SubscribeClient interface {
Recv() (*SubscribeResponse, error) Recv() (*MarketData, error)
grpc.ClientStream grpc.ClientStream
} }
@ -54,8 +54,8 @@ type marketDataServiceSubscribeClient struct {
grpc.ClientStream grpc.ClientStream
} }
func (x *marketDataServiceSubscribeClient) Recv() (*SubscribeResponse, error) { func (x *marketDataServiceSubscribeClient) Recv() (*MarketData, error) {
m := new(SubscribeResponse) m := new(MarketData)
if err := x.ClientStream.RecvMsg(m); err != nil { if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
} }
@ -112,7 +112,7 @@ func _MarketDataService_Subscribe_Handler(srv interface{}, stream grpc.ServerStr
} }
type MarketDataService_SubscribeServer interface { type MarketDataService_SubscribeServer interface {
Send(*SubscribeResponse) error Send(*MarketData) error
grpc.ServerStream grpc.ServerStream
} }
@ -120,7 +120,7 @@ type marketDataServiceSubscribeServer struct {
grpc.ServerStream grpc.ServerStream
} }
func (x *marketDataServiceSubscribeServer) Send(m *SubscribeResponse) error { func (x *marketDataServiceSubscribeServer) Send(m *MarketData) error {
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
@ -168,8 +168,7 @@ var MarketDataService_ServiceDesc = grpc.ServiceDesc{
// //
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type UserDataServiceClient interface { type UserDataServiceClient interface {
// should support streaming Subscribe(ctx context.Context, in *UserDataRequest, opts ...grpc.CallOption) (UserDataService_SubscribeClient, error)
SubscribeUserData(ctx context.Context, in *Empty, opts ...grpc.CallOption) (UserDataService_SubscribeUserDataClient, error)
} }
type userDataServiceClient struct { type userDataServiceClient struct {
@ -180,12 +179,12 @@ func NewUserDataServiceClient(cc grpc.ClientConnInterface) UserDataServiceClient
return &userDataServiceClient{cc} return &userDataServiceClient{cc}
} }
func (c *userDataServiceClient) SubscribeUserData(ctx context.Context, in *Empty, opts ...grpc.CallOption) (UserDataService_SubscribeUserDataClient, error) { func (c *userDataServiceClient) Subscribe(ctx context.Context, in *UserDataRequest, opts ...grpc.CallOption) (UserDataService_SubscribeClient, error) {
stream, err := c.cc.NewStream(ctx, &UserDataService_ServiceDesc.Streams[0], "/bbgo.UserDataService/SubscribeUserData", opts...) stream, err := c.cc.NewStream(ctx, &UserDataService_ServiceDesc.Streams[0], "/bbgo.UserDataService/Subscribe", opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
x := &userDataServiceSubscribeUserDataClient{stream} x := &userDataServiceSubscribeClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil { if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err return nil, err
} }
@ -195,17 +194,17 @@ func (c *userDataServiceClient) SubscribeUserData(ctx context.Context, in *Empty
return x, nil return x, nil
} }
type UserDataService_SubscribeUserDataClient interface { type UserDataService_SubscribeClient interface {
Recv() (*SubscribeResponse, error) Recv() (*UserData, error)
grpc.ClientStream grpc.ClientStream
} }
type userDataServiceSubscribeUserDataClient struct { type userDataServiceSubscribeClient struct {
grpc.ClientStream grpc.ClientStream
} }
func (x *userDataServiceSubscribeUserDataClient) Recv() (*SubscribeResponse, error) { func (x *userDataServiceSubscribeClient) Recv() (*UserData, error) {
m := new(SubscribeResponse) m := new(UserData)
if err := x.ClientStream.RecvMsg(m); err != nil { if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
} }
@ -216,8 +215,7 @@ func (x *userDataServiceSubscribeUserDataClient) Recv() (*SubscribeResponse, err
// All implementations must embed UnimplementedUserDataServiceServer // All implementations must embed UnimplementedUserDataServiceServer
// for forward compatibility // for forward compatibility
type UserDataServiceServer interface { type UserDataServiceServer interface {
// should support streaming Subscribe(*UserDataRequest, UserDataService_SubscribeServer) error
SubscribeUserData(*Empty, UserDataService_SubscribeUserDataServer) error
mustEmbedUnimplementedUserDataServiceServer() mustEmbedUnimplementedUserDataServiceServer()
} }
@ -225,8 +223,8 @@ type UserDataServiceServer interface {
type UnimplementedUserDataServiceServer struct { type UnimplementedUserDataServiceServer struct {
} }
func (UnimplementedUserDataServiceServer) SubscribeUserData(*Empty, UserDataService_SubscribeUserDataServer) error { func (UnimplementedUserDataServiceServer) Subscribe(*UserDataRequest, UserDataService_SubscribeServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeUserData not implemented") return status.Errorf(codes.Unimplemented, "method Subscribe not implemented")
} }
func (UnimplementedUserDataServiceServer) mustEmbedUnimplementedUserDataServiceServer() {} func (UnimplementedUserDataServiceServer) mustEmbedUnimplementedUserDataServiceServer() {}
@ -241,24 +239,24 @@ func RegisterUserDataServiceServer(s grpc.ServiceRegistrar, srv UserDataServiceS
s.RegisterService(&UserDataService_ServiceDesc, srv) s.RegisterService(&UserDataService_ServiceDesc, srv)
} }
func _UserDataService_SubscribeUserData_Handler(srv interface{}, stream grpc.ServerStream) error { func _UserDataService_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Empty) m := new(UserDataRequest)
if err := stream.RecvMsg(m); err != nil { if err := stream.RecvMsg(m); err != nil {
return err return err
} }
return srv.(UserDataServiceServer).SubscribeUserData(m, &userDataServiceSubscribeUserDataServer{stream}) return srv.(UserDataServiceServer).Subscribe(m, &userDataServiceSubscribeServer{stream})
} }
type UserDataService_SubscribeUserDataServer interface { type UserDataService_SubscribeServer interface {
Send(*SubscribeResponse) error Send(*UserData) error
grpc.ServerStream grpc.ServerStream
} }
type userDataServiceSubscribeUserDataServer struct { type userDataServiceSubscribeServer struct {
grpc.ServerStream grpc.ServerStream
} }
func (x *userDataServiceSubscribeUserDataServer) Send(m *SubscribeResponse) error { func (x *userDataServiceSubscribeServer) Send(m *UserData) error {
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
@ -271,8 +269,8 @@ var UserDataService_ServiceDesc = grpc.ServiceDesc{
Methods: []grpc.MethodDesc{}, Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {
StreamName: "SubscribeUserData", StreamName: "Subscribe",
Handler: _UserDataService_SubscribeUserData_Handler, Handler: _UserDataService_Subscribe_Handler,
ServerStreams: true, ServerStreams: true,
}, },
}, },