add Continuous Contract Kline/Candlestick Streams

This commit is contained in:
Austin 2021-11-16 14:26:27 +08:00
parent 68e22921dd
commit c5d1a70a61
3 changed files with 56 additions and 4 deletions

View File

@ -297,7 +297,10 @@ func ParseEvent(message string) (interface{}, error) {
var event MarkPriceUpdateEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
case "continuousKline":
var event ContinuousKLineEvent
err := json.Unmarshal([]byte(message), &event)
return &event, err
default:
id := val.GetInt("id")
if id > 0 {
@ -521,6 +524,40 @@ type MarkPriceUpdateEvent struct {
}
*/
type ContinuousKLineEvent struct {
EventBase
Symbol string `json:"ps"`
ct string `json:"ct"`
KLine KLine `json:"k,omitempty"`
}
/*
{
"e":"continuous_kline", // Event type
"E":1607443058651, // Event time
"ps":"BTCUSDT", // Pair
"ct":"PERPETUAL" // Contract type
"k":{
"t":1607443020000, // Kline start time
"T":1607443079999, // Kline close time
"i":"1m", // Interval
"f":116467658886, // First trade ID
"L":116468012423, // Last trade ID
"o":"18787.00", // Open price
"c":"18804.04", // Close price
"h":"18804.04", // High price
"l":"18786.54", // Low price
"v":"197.664", // volume
"n": 543, // Number of trades
"x":false, // Is this kline closed?
"q":"3715253.19494", // Quote asset volume
"V":"184.769", // Taker buy volume
"Q":"3472925.84746", //Taker buy quote asset volume
"B":"0" // Ignore
}
}
*/
/*
kline

View File

@ -75,7 +75,8 @@ type Stream struct {
kLineEventCallbacks []func(e *KLineEvent)
kLineClosedEventCallbacks []func(e *KLineEvent)
MarkPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent)
markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent)
continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)
balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent)
outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent)
@ -543,6 +544,8 @@ func (s *Stream) read(ctx context.Context) {
case *MarkPriceUpdateEvent:
s.EmitMarkPriceUpdateEvent(e)
case *ContinuousKLineEvent:
s.EmitContinuousKLineEvent(e)
}
}
}

View File

@ -35,11 +35,21 @@ func (s *Stream) EmitKLineClosedEvent(e *KLineEvent) {
}
func (s *Stream) OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent)) {
s.MarkPriceUpdateEventCallbacks = append(s.MarkPriceUpdateEventCallbacks, cb)
s.markPriceUpdateEventCallbacks = append(s.markPriceUpdateEventCallbacks, cb)
}
func (s *Stream) EmitMarkPriceUpdateEvent(e *MarkPriceUpdateEvent) {
for _, cb := range s.MarkPriceUpdateEventCallbacks {
for _, cb := range s.markPriceUpdateEventCallbacks {
cb(e)
}
}
func (s *Stream) OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent)) {
s.continuousKLineEventCallbacks = append(s.continuousKLineEventCallbacks, cb)
}
func (s *Stream) EmitContinuousKLineEvent(e *ContinuousKLineEvent) {
for _, cb := range s.continuousKLineEventCallbacks {
cb(e)
}
}
@ -93,6 +103,8 @@ type StreamEventHub interface {
OnMarkPriceUpdateEvent(cb func(e *MarkPriceUpdateEvent))
OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent))
OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent))
OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent))