Merge pull request #1344 from bailantaotao/edwin/bitget/public-stream-book

FEATURE: [bitget] support book stream on bitget
This commit is contained in:
bailantaotao 2023-10-20 14:22:42 +08:00 committed by GitHub
commit f8c47f72bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 632 additions and 0 deletions

View File

@ -0,0 +1,165 @@
package bitget
import (
"context"
"encoding/json"
"fmt"
"github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi"
"github.com/c9s/bbgo/pkg/types"
)
//go:generate callbackgen -type Stream
type Stream struct {
types.StandardStream
bookEventCallbacks []func(o BookEvent)
}
func NewStream() *Stream {
stream := &Stream{
StandardStream: types.NewStandardStream(),
}
stream.SetEndpointCreator(stream.createEndpoint)
stream.SetParser(parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent)
stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
return stream
}
func (s *Stream) syncSubscriptions(opType WsEventType) error {
if opType != WsEventUnsubscribe && opType != WsEventSubscribe {
return fmt.Errorf("unexpected subscription type: %v", opType)
}
logger := log.WithField("opType", opType)
args := []WsArg{}
for _, subscription := range s.Subscriptions {
arg, err := convertSubscription(subscription)
if err != nil {
logger.WithError(err).Errorf("convert error, subscription: %+v", subscription)
return err
}
args = append(args, arg)
}
logger.Infof("%s channels: %+v", opType, args)
if err := s.Conn.WriteJSON(WsOp{
Op: opType,
Args: args,
}); err != nil {
logger.WithError(err).Error("failed to send request")
return err
}
return nil
}
func (s *Stream) Unsubscribe() {
// errors are handled in the syncSubscriptions, so they are skipped here.
_ = s.syncSubscriptions(WsEventUnsubscribe)
s.Resubscribe(func(old []types.Subscription) (new []types.Subscription, err error) {
// clear the subscriptions
return []types.Subscription{}, nil
})
}
func (s *Stream) createEndpoint(_ context.Context) (string, error) {
var url string
if s.PublicOnly {
url = bitgetapi.PublicWebSocketURL
} else {
url = bitgetapi.PrivateWebSocketURL
}
return url, nil
}
func (s *Stream) dispatchEvent(event interface{}) {
switch e := event.(type) {
case *WsEvent:
if err := e.IsValid(); err != nil {
log.Errorf("invalid event: %v", err)
}
case *BookEvent:
s.EmitBookEvent(*e)
}
}
func (s *Stream) handlerConnect() {
if s.PublicOnly {
// errors are handled in the syncSubscriptions, so they are skipped here.
_ = s.syncSubscriptions(WsEventSubscribe)
} else {
log.Error("*** PRIVATE API NOT IMPLEMENTED ***")
}
}
func (s *Stream) handleBookEvent(o BookEvent) {
for _, book := range o.ToGlobalOrderBooks() {
switch o.Type {
case ActionTypeSnapshot:
s.EmitBookSnapshot(book)
case ActionTypeUpdate:
s.EmitBookUpdate(book)
}
}
}
func convertSubscription(sub types.Subscription) (WsArg, error) {
arg := WsArg{
// support spot only
InstType: instSp,
Channel: "",
InstId: sub.Symbol,
}
switch sub.Channel {
case types.BookChannel:
arg.Channel = ChannelOrderBook5
switch sub.Options.Depth {
case types.DepthLevel15:
arg.Channel = ChannelOrderBook15
case types.DepthLevel200:
log.Warn("*** The subscription events for the order book may return fewer than 200 bids/asks at a depth of 200. ***")
arg.Channel = ChannelOrderBook
}
return arg, nil
}
return arg, fmt.Errorf("unsupported stream channel: %s", sub.Channel)
}
func parseWebSocketEvent(in []byte) (interface{}, error) {
var event WsEvent
err := json.Unmarshal(in, &event)
if err != nil {
return nil, err
}
if event.IsOp() {
return &event, nil
}
switch event.Arg.Channel {
case ChannelOrderBook, ChannelOrderBook5, ChannelOrderBook15:
var book BookEvent
err = json.Unmarshal(event.Data, &book.Events)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into BookEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err)
}
book.Type = event.Action
book.InstId = event.Arg.InstId
return &book, nil
}
return nil, fmt.Errorf("unhandled websocket event: %+v", string(in))
}

View File

@ -0,0 +1,15 @@
// Code generated by "callbackgen -type Stream"; DO NOT EDIT.
package bitget
import ()
func (s *Stream) OnBookEvent(cb func(o BookEvent)) {
s.bookEventCallbacks = append(s.bookEventCallbacks, cb)
}
func (s *Stream) EmitBookEvent(o BookEvent) {
for _, cb := range s.bookEventCallbacks {
cb(o)
}
}

View File

@ -0,0 +1,313 @@
package bitget
import (
"context"
"fmt"
"os"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func getTestClientOrSkip(t *testing.T) *Stream {
if b, _ := strconv.ParseBool(os.Getenv("CI")); b {
t.Skip("skip test for CI")
}
return NewStream()
}
func TestStream(t *testing.T) {
t.Skip()
s := getTestClientOrSkip(t)
symbols := []string{
"BTCUSDT",
"ETHUSDT",
"DOTUSDT",
"ADAUSDT",
"AAVEUSDT",
"APTUSDT",
"ATOMUSDT",
"AXSUSDT",
"BNBUSDT",
"SOLUSDT",
"DOGEUSDT",
}
t.Run("book test", func(t *testing.T) {
s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{
Depth: types.DepthLevel5,
})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnBookSnapshot(func(book types.SliceOrderBook) {
t.Log("got snapshot", len(book.Bids), len(book.Asks), book.Symbol, book.Time, book)
})
s.OnBookUpdate(func(book types.SliceOrderBook) {
t.Log("got update", len(book.Bids), len(book.Asks), book.Symbol, book.Time, book)
})
c := make(chan struct{})
<-c
})
t.Run("book test on unsubscribe and reconnect", func(t *testing.T) {
for _, symbol := range symbols {
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevel200,
})
}
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnBookSnapshot(func(book types.SliceOrderBook) {
t.Log("got snapshot", book)
})
s.OnBookUpdate(func(book types.SliceOrderBook) {
t.Log("got update", book)
})
<-time.After(2 * time.Second)
s.Unsubscribe()
for _, symbol := range symbols {
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevel200,
})
}
<-time.After(2 * time.Second)
s.Reconnect()
c := make(chan struct{})
<-c
})
}
func TestStream_parseWebSocketEvent(t *testing.T) {
t.Run("op subscribe event", func(t *testing.T) {
input := `{
"event":"subscribe",
"arg":{
"instType":"sp",
"channel":"books5",
"instId":"BTCUSDT"
}
}`
res, err := parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WsEvent)
assert.True(t, ok)
assert.Equal(t, WsEvent{
Event: WsEventSubscribe,
Arg: WsArg{
InstType: instSp,
Channel: ChannelOrderBook5,
InstId: "BTCUSDT",
},
}, *opEvent)
assert.NoError(t, opEvent.IsValid())
})
t.Run("op unsubscribe event", func(t *testing.T) {
input := `{
"event":"unsubscribe",
"arg":{
"instType":"sp",
"channel":"books5",
"instId":"BTCUSDT"
}
}`
res, err := parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WsEvent)
assert.True(t, ok)
assert.Equal(t, WsEvent{
Event: WsEventUnsubscribe,
Arg: WsArg{
InstType: instSp,
Channel: ChannelOrderBook5,
InstId: "BTCUSDT",
},
}, *opEvent)
})
t.Run("op error event", func(t *testing.T) {
input := `{
"event":"error",
"arg":{
"instType":"sp",
"channel":"books5",
"instId":"BTCUSDT-"
},
"code":30001,
"msg":"instType:sp,channel:books5,instId:BTCUSDT- doesn't exist",
"op":"subscribe"
}`
res, err := parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WsEvent)
assert.True(t, ok)
assert.Equal(t, WsEvent{
Event: WsEventError,
Code: 30001,
Msg: "instType:sp,channel:books5,instId:BTCUSDT- doesn't exist",
Op: "subscribe",
Arg: WsArg{
InstType: instSp,
Channel: ChannelOrderBook5,
InstId: "BTCUSDT-",
},
}, *opEvent)
})
t.Run("Orderbook event", func(t *testing.T) {
input := `{
"action":"%s",
"arg":{
"instType":"sp",
"channel":"books5",
"instId":"BTCUSDT"
},
"data":[
{
"asks":[
[
"28350.78",
"0.2082"
],
[
"28350.80",
"0.2081"
]
],
"bids":[
[
"28350.70",
"0.5585"
],
[
"28350.67",
"6.8175"
]
],
"checksum":0,
"ts":"1697593934630"
}
],
"ts":1697593934630
}`
eventFn := func(in string, actionType ActionType) {
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
book, ok := res.(*BookEvent)
assert.True(t, ok)
assert.Equal(t, BookEvent{
Events: []struct {
Asks types.PriceVolumeSlice `json:"asks"`
// Order book on buy side, descending order
Bids types.PriceVolumeSlice `json:"bids"`
Ts types.MillisecondTimestamp `json:"ts"`
Checksum int `json:"checksum"`
}{
{
Asks: []types.PriceVolume{
{
Price: fixedpoint.NewFromFloat(28350.78),
Volume: fixedpoint.NewFromFloat(0.2082),
},
{
Price: fixedpoint.NewFromFloat(28350.80),
Volume: fixedpoint.NewFromFloat(0.2081),
},
},
Bids: []types.PriceVolume{
{
Price: fixedpoint.NewFromFloat(28350.70),
Volume: fixedpoint.NewFromFloat(0.5585),
},
{
Price: fixedpoint.NewFromFloat(28350.67),
Volume: fixedpoint.NewFromFloat(6.8175),
},
},
Ts: types.NewMillisecondTimestampFromInt(1697593934630),
Checksum: 0,
},
},
Type: actionType,
InstId: "BTCUSDT",
}, *book)
}
t.Run("snapshot type", func(t *testing.T) {
snapshotInput := fmt.Sprintf(input, ActionTypeSnapshot)
eventFn(snapshotInput, ActionTypeSnapshot)
})
t.Run("update type", func(t *testing.T) {
snapshotInput := fmt.Sprintf(input, ActionTypeUpdate)
eventFn(snapshotInput, ActionTypeUpdate)
})
})
}
func Test_convertSubscription(t *testing.T) {
t.Run("BookChannel.ChannelOrderBook5", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.DepthLevel5,
},
})
assert.NoError(t, err)
assert.Equal(t, WsArg{
InstType: instSp,
Channel: ChannelOrderBook5,
InstId: "BTCUSDT",
}, res)
})
t.Run("BookChannel.DepthLevel15", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.DepthLevel15,
},
})
assert.NoError(t, err)
assert.Equal(t, WsArg{
InstType: instSp,
Channel: ChannelOrderBook15,
InstId: "BTCUSDT",
}, res)
})
t.Run("BookChannel.DepthLevel200", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.DepthLevel200,
},
})
assert.NoError(t, err)
assert.Equal(t, WsArg{
InstType: instSp,
Channel: ChannelOrderBook,
InstId: "BTCUSDT",
}, res)
})
}

View File

@ -0,0 +1,138 @@
package bitget
import (
"encoding/json"
"fmt"
"github.com/c9s/bbgo/pkg/types"
)
type InstType string
const (
instSp InstType = "sp"
)
type ChannelType string
const (
// ChannelOrderBook snapshot and update might return less than 200 bids/asks as per symbol's orderbook various from
// each other; The number of bids/asks is not a fixed value and may vary in the future
ChannelOrderBook ChannelType = "books"
// ChannelOrderBook5 top 5 order book of "books" that begins from bid1/ask1
ChannelOrderBook5 ChannelType = "books5"
// ChannelOrderBook15 top 15 order book of "books" that begins from bid1/ask1
ChannelOrderBook15 ChannelType = "books15"
)
type WsArg struct {
InstType InstType `json:"instType"`
Channel ChannelType `json:"channel"`
// InstId Instrument ID. e.q. BTCUSDT, ETHUSDT
InstId string `json:"instId"`
}
type WsEventType string
const (
WsEventSubscribe WsEventType = "subscribe"
WsEventUnsubscribe WsEventType = "unsubscribe"
WsEventError WsEventType = "error"
)
type WsOp struct {
Op WsEventType `json:"op"`
Args []WsArg `json:"args"`
}
// WsEvent is the lowest level of event type. We use this struct to convert the received data, so that we will know
// whether the event belongs to `op` or `data`.
type WsEvent struct {
// for comment event
Arg WsArg `json:"arg"`
// for op event
Event WsEventType `json:"event"`
Code int `json:"code"`
Msg string `json:"msg"`
Op string `json:"op"`
// for data event
Action ActionType `json:"action"`
Data json.RawMessage `json:"data"`
}
// IsOp represents the data event will be empty
func (w *WsEvent) IsOp() bool {
return w.Action == "" && len(w.Data) == 0
}
func (w *WsEvent) IsValid() error {
switch w.Event {
case WsEventError:
return fmt.Errorf("websocket request error, op: %s, code: %d, msg: %s", w.Op, w.Code, w.Msg)
case WsEventSubscribe, WsEventUnsubscribe:
// Actually, this code is unnecessary because the events are either `Subscribe` or `Unsubscribe`, But to avoid bugs
// in the exchange, we still check.
if w.Code != 0 || len(w.Msg) != 0 {
return fmt.Errorf("unexpected websocket %s event, code: %d, msg: %s", w.Event, w.Code, w.Msg)
}
return nil
default:
return fmt.Errorf("unexpected event type: %+v", w)
}
}
type ActionType string
const (
ActionTypeSnapshot ActionType = "snapshot"
ActionTypeUpdate ActionType = "update"
)
// {
// "asks":[
// [
// "28350.78",
// "0.2082"
// ],
// ],
// "bids":[
// [
// "28350.70",
// "0.5585"
// ],
// ],
// "checksum":0,
// "ts":"1697593934630"
// }
type BookEvent struct {
Events []struct {
// Order book on sell side, ascending order
Asks types.PriceVolumeSlice `json:"asks"`
// Order book on buy side, descending order
Bids types.PriceVolumeSlice `json:"bids"`
Ts types.MillisecondTimestamp `json:"ts"`
Checksum int `json:"checksum"`
}
// internal use
Type ActionType
InstId string
}
func (e *BookEvent) ToGlobalOrderBooks() []types.SliceOrderBook {
books := make([]types.SliceOrderBook, len(e.Events))
for i, event := range e.Events {
books[i] = types.SliceOrderBook{
Symbol: e.InstId,
Bids: event.Bids,
Asks: event.Asks,
Time: event.Ts.Time(),
}
}
return books
}

View File

@ -523,6 +523,7 @@ const (
DepthLevelMedium Depth = "MEDIUM" DepthLevelMedium Depth = "MEDIUM"
DepthLevel1 Depth = "1" DepthLevel1 Depth = "1"
DepthLevel5 Depth = "5" DepthLevel5 Depth = "5"
DepthLevel15 Depth = "15"
DepthLevel20 Depth = "20" DepthLevel20 Depth = "20"
DepthLevel50 Depth = "50" DepthLevel50 Depth = "50"
DepthLevel200 Depth = "200" DepthLevel200 Depth = "200"