bbgo_origin/python/bbgo/services.py
2022-04-18 11:34:36 +08:00

180 lines
6.8 KiB
Python

from __future__ import annotations
from typing import Iterator
from typing import List
from loguru import logger
import bbgo_pb2
import bbgo_pb2_grpc
from .data import ErrorMessage
from .data import KLine
from .data import MarketDataEvent
from .data import Order
from .data import SubmitOrder
from .data import Subscription
from .data import UserDataEvent
from .enums import OrderType
from .enums import SideType
from .utils import get_insecure_channel
class UserDataService(object):
stub: bbgo_pb2_grpc.UserDataServiceStub
def __init__(self, host: str, port: int) -> None:
self.stub = bbgo_pb2_grpc.UserDataServiceStub(get_insecure_channel(host, port))
def subscribe(self, session: str) -> Iterator[UserDataEvent]:
request = bbgo_pb2.UserDataRequest(session)
response_iter = self.stub.Subscribe(request)
for response in response_iter:
yield UserDataEvent.from_pb(response)
class MarketService(object):
stub: bbgo_pb2_grpc.MarketDataServiceStub
def __init__(self, host: str, port: int) -> None:
self.stub = bbgo_pb2_grpc.MarketDataServiceStub(get_insecure_channel(host, port))
def subscribe(self, subscriptions: List[Subscription]) -> Iterator[MarketDataEvent]:
request = bbgo_pb2.SubscribeRequest(subscriptions=[s.to_pb() for s in subscriptions])
response_iter = self.stub.Subscribe(request)
for response in response_iter:
yield MarketDataEvent.from_pb(response)
def query_klines(self,
exchange: str,
symbol: str,
limit: int = 30,
interval: str = '1m',
start_time: int = None,
end_time: int = None) -> List[KLine]:
request = bbgo_pb2.QueryKLinesRequest(exchange=exchange,
symbol=symbol,
limit=limit,
interval=interval,
start_time=start_time,
end_time=end_time)
response = self.stub.QueryKLines(request)
klines = []
for kline in response.klines:
klines.append(KLine.from_pb(kline))
error = ErrorMessage.from_pb(response.error)
if error.code != 0:
logger.error(error.message)
return klines
class TradingService(object):
stub: bbgo_pb2_grpc.TradingServiceStub
def __init__(self, host: str, port: int) -> None:
self.stub = bbgo_pb2_grpc.TradingServiceStub(get_insecure_channel(host, port))
def submit_order(self,
session: str,
exchange: str,
symbol: str,
side: str,
quantity: float,
order_type: str,
price: float = None,
stop_price: float = None,
client_order_id: str = None,
group_id: int = None) -> Order:
submit_order = SubmitOrder(session=session,
exchange=exchange,
symbol=symbol,
side=SideType.from_str(side),
quantity=quantity,
order_type=OrderType.from_str(order_type),
price=price,
stop_price=stop_price,
client_order_id=client_order_id,
group_id=group_id)
request = bbgo_pb2.SubmitOrderRequest(session=session, submit_orders=[submit_order.to_pb()])
response = self.stub.SubmitOrder(request)
order = Order.from_pb(response.orders[0])
error = ErrorMessage.from_pb(response.error)
if error.code != 0:
logger.error(error.message)
return order
def cancel_order(self, session: str, order_id: int = None, client_order_id: int = None) -> Order:
request = bbgo_pb2.CancelOrderRequest(
session=session,
id=order_id or "",
client_order_id=client_order_id or "",
)
response = self.stub.CancelOrder(request)
order = Order.from_pb(response.order)
error = ErrorMessage.from_pb(response.error)
if error.code != 0:
logger.error(error.message)
return order
def query_order(self, order_id: int = None, client_order_id: int = None) -> bbgo_pb2.QueryOrderResponse:
request = bbgo_pb2.QueryOrderRequest(id=order_id, client_order_id=client_order_id)
response = self.stub.QueryOrder(request)
return response
def query_orders(self,
exchange: str,
symbol: str,
states: List[str] = None,
order_by: str = 'asc',
group_id: int = None,
pagination: bool = True,
page: int = 0,
limit: int = 100,
offset: int = 0) -> bbgo_pb2.QueryOrdersResponse:
# set default value to ['wait', 'convert']
states = states or ['wait', 'convert']
request = bbgo_pb2.QueryOrdersRequest(exchange=exchange,
symbol=symbol,
states=states,
order_by=order_by,
group_id=group_id,
pagination=pagination,
page=page,
limit=limit,
offset=offset)
reponse = self.stub.QueryOrders(request)
return reponse
def query_trades(self,
exchange: str,
symbol: str,
timestamp: int,
order_by: str = 'asc',
pagination: bool = True,
page: int = 1,
limit: int = 100,
offset: int = 0) -> bbgo_pb2.QueryTradesResponse:
request = bbgo_pb2.QueryTradesRequest(exchange=exchange,
symbol=symbol,
timestamp=timestamp,
order_by=order_by,
pagination=pagination,
page=page,
limit=limit,
offset=offset)
response = self.stub.QueryTrades(request)
return response