bbgo_origin/python/bbgo/data/event.py
2022-04-22 21:29:23 +08:00

85 lines
2.2 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import List
import bbgo_pb2
from ..enums import ChannelType
from ..enums import EventType
from ..utils import parse_time
from .balance import Balance
from .depth import Depth
from .error import ErrorMessage
from .kline import KLine
from .order import Order
from .ticker import Ticker
from .trade import Trade
@dataclass
class Event:
session: str
exchange: str
channel_type: ChannelType
event_type: EventType
@dataclass
class UserDataEvent(Event):
balances: List[Balance] = None
trades: List[Trade] = None
orders: List[Order] = None
@classmethod
def from_pb(cls, obj: bbgo_pb2.UserData) -> UserDataEvent:
return cls(
session=obj.session,
exchange=obj.exchange,
channel_type=ChannelType(obj.channel),
event_type=EventType(obj.event),
balances=[Balance.from_pb(balance) for balance in obj.balances],
trades=[Trade.from_pb(trade) for trade in obj.trades],
orders=[Order.from_pb(order) for order in obj.orders],
)
@dataclass
class MarketDataEvent(Event):
symbol: str
subscribed_at: datetime
error: ErrorMessage
depth: Depth = None
kline: KLine = None
ticker: Ticker = None
trades: List[Trade] = None
@classmethod
def from_pb(cls, obj: bbgo_pb2.MarketData) -> MarketDataEvent:
channel_type = ChannelType(obj.channel)
event = cls(
session=obj.session,
exchange=obj.exchange,
symbol=obj.symbol,
channel_type=channel_type,
event_type=EventType(obj.event),
subscribed_at=parse_time(obj.subscribed_at),
error=ErrorMessage.from_pb(obj.error),
)
if channel_type == ChannelType.BOOK:
event.depth = Depth.from_pb(obj.depth)
if channel_type == ChannelType.KLINE:
event.kline = KLine.from_pb(obj.kline)
if channel_type == ChannelType.TICKER:
event.ticker = Ticker.from_pb(obj.ticker)
if channel_type == ChannelType.TRADE:
event.trades = [Trade.from_pb(trade) for trade in obj.trades]
return event