mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-26 00:35:15 +00:00
209 lines
4.9 KiB
Go
209 lines
4.9 KiB
Go
package bybit
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
|
|
"github.com/c9s/bbgo/pkg/types"
|
|
)
|
|
|
|
const (
|
|
// Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds
|
|
// to maintain the WebSocket connection.
|
|
pingInterval = 20 * time.Second
|
|
|
|
// spotArgsLimit can input up to 10 args for each subscription request sent to one connection.
|
|
spotArgsLimit = 10
|
|
)
|
|
|
|
var (
|
|
// wsAuthRequest specifies the duration for which a websocket request's authentication is valid.
|
|
wsAuthRequest = 10 * time.Second
|
|
)
|
|
|
|
//go:generate callbackgen -type Stream
|
|
type Stream struct {
|
|
key, secret string
|
|
types.StandardStream
|
|
|
|
bookEventCallbacks []func(e BookEvent)
|
|
}
|
|
|
|
func NewStream(key, secret string) *Stream {
|
|
stream := &Stream{
|
|
StandardStream: types.NewStandardStream(),
|
|
// pragma: allowlist nextline secret
|
|
key: key,
|
|
secret: secret,
|
|
}
|
|
|
|
stream.SetEndpointCreator(stream.createEndpoint)
|
|
stream.SetParser(stream.parseWebSocketEvent)
|
|
stream.SetDispatcher(stream.dispatchEvent)
|
|
stream.SetHeartBeat(stream.ping)
|
|
|
|
stream.OnConnect(stream.handlerConnect)
|
|
stream.OnBookEvent(stream.handleBookEvent)
|
|
return stream
|
|
}
|
|
|
|
func (s *Stream) createEndpoint(_ context.Context) (string, error) {
|
|
var url string
|
|
if s.PublicOnly {
|
|
url = bybitapi.WsSpotPublicSpotUrl
|
|
} else {
|
|
url = bybitapi.WsSpotPrivateUrl
|
|
}
|
|
return url, nil
|
|
}
|
|
|
|
func (s *Stream) dispatchEvent(event interface{}) {
|
|
switch e := event.(type) {
|
|
case *WebSocketOpEvent:
|
|
if err := e.IsValid(); err != nil {
|
|
log.Errorf("invalid event: %v", err)
|
|
}
|
|
|
|
case *BookEvent:
|
|
s.EmitBookEvent(*e)
|
|
}
|
|
}
|
|
|
|
func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
|
|
var e WsEvent
|
|
|
|
err := json.Unmarshal(in, &e)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch {
|
|
case e.IsOp():
|
|
return e.WebSocketOpEvent, nil
|
|
|
|
case e.IsTopic():
|
|
switch getTopicType(e.Topic) {
|
|
case TopicTypeOrderBook:
|
|
var book BookEvent
|
|
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", string(e.WebSocketTopicEvent.Data), err)
|
|
}
|
|
|
|
book.Type = e.WebSocketTopicEvent.Type
|
|
return &book, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("unhandled websocket event: %+v", string(in))
|
|
}
|
|
|
|
// ping implements the Bybit text message of WebSocket PingPong.
|
|
func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, cancelFunc context.CancelFunc) {
|
|
defer func() {
|
|
log.Debug("[bybit] ping worker stopped")
|
|
cancelFunc()
|
|
}()
|
|
|
|
var pingTicker = time.NewTicker(pingInterval)
|
|
defer pingTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case <-s.CloseC:
|
|
return
|
|
|
|
case <-pingTicker.C:
|
|
// it's just for maintaining the liveliness of the connection, so comment out ReqId.
|
|
err := conn.WriteJSON(struct {
|
|
//ReqId string `json:"req_id"`
|
|
Op WsOpType `json:"op"`
|
|
}{
|
|
//ReqId: uuid.NewString(),
|
|
Op: WsOpTypePing,
|
|
})
|
|
if err != nil {
|
|
log.WithError(err).Error("ping error", err)
|
|
s.Reconnect()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Stream) handlerConnect() {
|
|
if s.PublicOnly {
|
|
var topics []string
|
|
|
|
for _, subscription := range s.Subscriptions {
|
|
topic, err := convertSubscription(subscription)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("subscription convert error")
|
|
continue
|
|
}
|
|
|
|
topics = append(topics, topic)
|
|
}
|
|
if len(topics) > spotArgsLimit {
|
|
log.Debugf("topics exceeds limit: %d, drop of: %v", spotArgsLimit, topics[spotArgsLimit:])
|
|
topics = topics[:spotArgsLimit]
|
|
}
|
|
log.Infof("subscribing channels: %+v", topics)
|
|
if err := s.Conn.WriteJSON(WebsocketOp{
|
|
Op: WsOpTypeSubscribe,
|
|
Args: topics,
|
|
}); err != nil {
|
|
log.WithError(err).Error("failed to send subscription request")
|
|
}
|
|
} else {
|
|
expires := strconv.FormatInt(time.Now().Add(wsAuthRequest).In(time.UTC).UnixMilli(), 10)
|
|
|
|
if err := s.Conn.WriteJSON(WebsocketOp{
|
|
Op: WsOpTypeAuth,
|
|
Args: []string{
|
|
s.key,
|
|
expires,
|
|
bybitapi.Sign(fmt.Sprintf("GET/realtime%s", expires), s.secret),
|
|
},
|
|
}); err != nil {
|
|
log.WithError(err).Error("failed to auth request")
|
|
}
|
|
}
|
|
}
|
|
|
|
func convertSubscription(s types.Subscription) (string, error) {
|
|
switch s.Channel {
|
|
case types.BookChannel:
|
|
depth := types.DepthLevel1
|
|
if len(s.Options.Depth) > 0 && s.Options.Depth == types.DepthLevel50 {
|
|
depth = types.DepthLevel50
|
|
}
|
|
return genTopic(TopicTypeOrderBook, depth, s.Symbol), nil
|
|
}
|
|
|
|
return "", fmt.Errorf("unsupported stream channel: %s", s.Channel)
|
|
}
|
|
|
|
func (s *Stream) handleBookEvent(e BookEvent) {
|
|
orderBook := e.OrderBook()
|
|
switch {
|
|
// Occasionally, you'll receive "UpdateId"=1, which is a snapshot data due to the restart of
|
|
// the service. So please overwrite your local orderbook
|
|
case e.Type == DataTypeSnapshot || e.UpdateId.Int() == 1:
|
|
s.EmitBookSnapshot(orderBook)
|
|
|
|
case e.Type == DataTypeDelta:
|
|
s.EmitBookUpdate(orderBook)
|
|
}
|
|
}
|