bbgo_origin/python/bbgo/stream.py

78 lines
2.4 KiB
Python
Raw Normal View History

2022-03-07 04:06:16 +00:00
import asyncio
from typing import Callable
from typing import List
2022-04-13 10:11:21 +00:00
import grpc
import bbgo_pb2
import bbgo_pb2_grpc
from bbgo.enums import ChannelType
from bbgo.enums import DepthType
2022-04-13 10:11:21 +00:00
from .data import Event
from .data import MarketDataEvent
2022-04-13 10:11:21 +00:00
from .data import Subscription
from .data import UserDataEvent
2022-03-07 04:06:16 +00:00
class Stream(object):
2022-04-13 10:11:21 +00:00
subscriptions: List[Subscription]
2022-03-07 04:06:16 +00:00
2022-04-13 10:11:21 +00:00
def __init__(self, host: str, port: int, user_data: bool = False):
2022-03-07 04:06:16 +00:00
self.host = host
self.port = port
2022-04-13 06:52:34 +00:00
self.user_data = user_data
2022-03-07 04:06:16 +00:00
2022-04-13 10:11:21 +00:00
self.subscriptions = []
self.event_handlers = []
def subscribe(self, exchange: str, channel: str, symbol: str, depth: str = None, interval: str = None):
2022-04-15 07:16:36 +00:00
subscription = Subscription(exchange=exchange, channel=ChannelType.from_str(channel), symbol=symbol)
2022-04-13 10:11:21 +00:00
if depth is not None:
subscription.depth = DepthType(depth)
if interval is not None:
subscription.interval = interval
self.subscriptions.append(subscription)
2022-03-07 04:06:16 +00:00
2022-04-13 10:11:21 +00:00
def add_event_handler(self, event_handler: Callable) -> None:
self.event_handlers.append(event_handler)
def fire_event_handlers(self, event: Event) -> None:
for event_handler in self.event_handlers:
event_handler(event)
2022-03-07 04:06:16 +00:00
@property
def address(self):
return f'{self.host}:{self.port}'
2022-04-13 10:11:21 +00:00
async def _subscribe(self):
2022-03-07 04:06:16 +00:00
async with grpc.aio.insecure_channel(self.address) as channel:
2022-04-09 09:24:34 +00:00
stub = bbgo_pb2_grpc.MarketDataServiceStub(channel)
2022-03-07 04:06:16 +00:00
2022-04-13 10:11:21 +00:00
request = bbgo_pb2.SubscribeRequest(subscriptions=[s.to_pb() for s in self.subscriptions])
2022-04-09 09:24:34 +00:00
async for response in stub.Subscribe(request):
event = MarketDataEvent.from_pb(response)
2022-04-13 10:11:21 +00:00
self.fire_event_handlers(event)
2022-03-07 04:06:16 +00:00
2022-04-13 10:11:21 +00:00
async def _subscribe_user_data(self):
2022-03-07 04:06:16 +00:00
async with grpc.aio.insecure_channel(self.address) as channel:
2022-04-09 09:24:34 +00:00
stub = bbgo_pb2_grpc.UserDataServiceStub(channel)
2022-03-07 04:06:16 +00:00
request = bbgo_pb2.Empty()
2022-04-09 09:24:34 +00:00
async for response in stub.SubscribeUserData(request):
event = UserDataEvent.from_pb(response)
2022-04-13 10:11:21 +00:00
self.fire_event_handlers(event)
2022-03-07 04:06:16 +00:00
def start(self):
2022-04-13 10:11:21 +00:00
coroutines = [self._subscribe()]
2022-04-13 06:52:34 +00:00
if self.user_data:
2022-04-13 10:11:21 +00:00
coroutines.append(self._subscribe_user_data())
2022-04-13 06:52:34 +00:00
group = asyncio.gather(*coroutines)
2022-03-07 04:06:16 +00:00
loop = asyncio.get_event_loop()
loop.run_until_complete(group)
loop.close()