Merge pull request #551 from narumiruna/python/fix-subscribe-user-data

python: fix subscribe user data
This commit is contained in:
なるみ 2022-04-20 16:20:00 +08:00 committed by GitHub
commit 5765f6af95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 13 deletions

View File

@ -18,12 +18,12 @@ from .data import UserDataEvent
class Stream(object): class Stream(object):
subscriptions: List[Subscription] subscriptions: List[Subscription]
def __init__(self, host: str, port: int, user_data: bool = False): def __init__(self, host: str, port: int):
self.host = host self.host = host
self.port = port self.port = port
self.user_data = user_data
self.subscriptions = [] self.subscriptions = []
self.sessions = []
self.event_handlers = [] self.event_handlers = []
def subscribe(self, exchange: str, channel: str, symbol: str, depth: str = None, interval: str = None): def subscribe(self, exchange: str, channel: str, symbol: str, depth: str = None, interval: str = None):
@ -37,6 +37,9 @@ class Stream(object):
self.subscriptions.append(subscription) self.subscriptions.append(subscription)
def subscribe_user_data(self, session: str):
self.sessions.append(session)
def add_event_handler(self, event_handler: Callable) -> None: def add_event_handler(self, event_handler: Callable) -> None:
self.event_handlers.append(event_handler) self.event_handlers.append(event_handler)
@ -48,7 +51,7 @@ class Stream(object):
def address(self): def address(self):
return f'{self.host}:{self.port}' return f'{self.host}:{self.port}'
async def _subscribe(self): async def _subscribe_market_data(self):
async with grpc.aio.insecure_channel(self.address) as channel: async with grpc.aio.insecure_channel(self.address) as channel:
stub = bbgo_pb2_grpc.MarketDataServiceStub(channel) stub = bbgo_pb2_grpc.MarketDataServiceStub(channel)
@ -57,19 +60,19 @@ class Stream(object):
event = MarketDataEvent.from_pb(response) event = MarketDataEvent.from_pb(response)
self.fire_event_handlers(event) self.fire_event_handlers(event)
async def _subscribe_user_data(self): async def _subscribe_user_data(self, session: str):
async with grpc.aio.insecure_channel(self.address) as channel: async with grpc.aio.insecure_channel(self.address) as channel:
stub = bbgo_pb2_grpc.UserDataServiceStub(channel) stub = bbgo_pb2_grpc.UserDataServiceStub(channel)
request = bbgo_pb2.Empty() request = bbgo_pb2.UserDataRequest(session=session)
async for response in stub.SubscribeUserData(request): async for response in stub.Subscribe(request):
event = UserDataEvent.from_pb(response) event = UserDataEvent.from_pb(response)
self.fire_event_handlers(event) self.fire_event_handlers(event)
def start(self): def start(self):
coroutines = [self._subscribe()] coroutines = [self._subscribe_market_data()]
if self.user_data: for session in self.sessions:
coroutines.append(self._subscribe_user_data()) coroutines.append(self._subscribe_user_data(session))
group = asyncio.gather(*coroutines) group = asyncio.gather(*coroutines)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()

View File

@ -9,13 +9,16 @@ from bbgo import MarketService
def main(host, port): def main(host, port):
service = MarketService(host, port) service = MarketService(host, port)
klines, error = service.query_klines(exchange='binance', symbol='BTCUSDT', interval='1m', limit=10) klines = service.query_klines(
exchange='binance',
symbol='BTCUSDT',
interval='1m',
limit=10,
)
for kline in klines: for kline in klines:
print(kline) print(kline)
print(error)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -19,6 +19,7 @@ def main(host, port):
stream = Stream(host, port) stream = Stream(host, port)
stream.subscribe('max', 'book', 'BTCUSDT', 'full') stream.subscribe('max', 'book', 'BTCUSDT', 'full')
stream.subscribe('max', 'book', 'ETHUSDT', 'full') stream.subscribe('max', 'book', 'ETHUSDT', 'full')
stream.subscribe_user_data('max')
stream.add_event_handler(LogBook()) stream.add_event_handler(LogBook())
stream.start() stream.start()

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "bbgo" name = "bbgo"
version = "0.1.3" version = "0.1.4"
description = "" description = ""
authors = ["なるみ <weaper@gmail.com>"] authors = ["なるみ <weaper@gmail.com>"]
packages = [ packages = [