python: fix subscribe user data

This commit is contained in:
なるみ 2022-04-20 15:43:15 +08:00
parent 8a724aa619
commit 78a96e7ccc

View File

@ -18,12 +18,12 @@ from .data import UserDataEvent
class Stream(object):
subscriptions: List[Subscription]
def __init__(self, host: str, port: int, user_data: bool = False):
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.user_data = user_data
self.subscriptions = []
self.sessions = []
self.event_handlers = []
def subscribe(self, exchange: str, channel: str, symbol: str, depth: str = None, interval: str = None):
@ -37,6 +37,9 @@ class Stream(object):
self.subscriptions.append(subscription)
def subscribe_user_data(self, session: str):
self.sessions.append(session)
def add_event_handler(self, event_handler: Callable) -> None:
self.event_handlers.append(event_handler)
@ -48,7 +51,7 @@ class Stream(object):
def address(self):
return f'{self.host}:{self.port}'
async def _subscribe(self):
async def _subscribe_market_data(self):
async with grpc.aio.insecure_channel(self.address) as channel:
stub = bbgo_pb2_grpc.MarketDataServiceStub(channel)
@ -57,19 +60,19 @@ class Stream(object):
event = MarketDataEvent.from_pb(response)
self.fire_event_handlers(event)
async def _subscribe_user_data(self):
async def _subscribe_user_data(self, session: str):
async with grpc.aio.insecure_channel(self.address) as channel:
stub = bbgo_pb2_grpc.UserDataServiceStub(channel)
request = bbgo_pb2.Empty()
async for response in stub.SubscribeUserData(request):
request = bbgo_pb2.UserDataRequest(session=session)
async for response in stub.Subscribe(request):
event = UserDataEvent.from_pb(response)
self.fire_event_handlers(event)
def start(self):
coroutines = [self._subscribe()]
if self.user_data:
coroutines.append(self._subscribe_user_data())
coroutines = [self._subscribe_market_data()]
for session in self.sessions:
coroutines.append(self._subscribe_user_data(session))
group = asyncio.gather(*coroutines)
loop = asyncio.get_event_loop()