freqtrade_origin/freqtrade/rpc/api_server/ws_schemas.py

70 lines
1.7 KiB
Python
Raw Normal View History

2022-09-07 21:08:01 +00:00
from datetime import datetime
from typing import Any, Dict, List, Optional, TypedDict
2022-09-07 21:08:01 +00:00
from pandas import DataFrame
2022-09-08 19:58:28 +00:00
from pydantic import BaseModel
2022-09-07 21:08:01 +00:00
from freqtrade.constants import PairWithTimeframe
from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType
class BaseArbitraryModel(BaseModel):
class Config:
arbitrary_types_allowed = True
class WSRequestSchema(BaseArbitraryModel):
type: RPCRequestType
data: Optional[Any] = None
class WSMessageSchemaType(TypedDict):
# Type for typing to avoid doing pydantic typechecks.
type: RPCMessageType
data: Optional[Dict[str, Any]]
2022-09-07 21:08:01 +00:00
class WSMessageSchema(BaseArbitraryModel):
type: RPCMessageType
data: Optional[Any] = None
class Config:
extra = 'allow'
# ------------------------------ REQUEST SCHEMAS ----------------------------
class WSSubscribeRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.SUBSCRIBE
data: List[RPCMessageType]
class WSWhitelistRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.WHITELIST
data: None = None
class WSAnalyzedDFRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.ANALYZED_DF
data: Dict[str, Any] = {"limit": 1500, "pair": None}
2022-09-07 21:08:01 +00:00
# ------------------------------ MESSAGE SCHEMAS ----------------------------
class WSWhitelistMessage(WSMessageSchema):
type: RPCMessageType = RPCMessageType.WHITELIST
data: List[str]
class WSAnalyzedDFMessage(WSMessageSchema):
class AnalyzedDFData(BaseArbitraryModel):
key: PairWithTimeframe
df: DataFrame
la: datetime
type: RPCMessageType = RPCMessageType.ANALYZED_DF
data: AnalyzedDFData
# --------------------------------------------------------------------------