mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 06:53:52 +00:00
pkg/exchange: support market trade stream on bitget
This commit is contained in:
parent
f8c47f72bf
commit
a18b1be44e
|
@ -13,7 +13,8 @@ import (
|
||||||
type Stream struct {
|
type Stream struct {
|
||||||
types.StandardStream
|
types.StandardStream
|
||||||
|
|
||||||
bookEventCallbacks []func(o BookEvent)
|
bookEventCallbacks []func(o BookEvent)
|
||||||
|
marketTradeEventCallbacks []func(o MarketTradeEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStream() *Stream {
|
func NewStream() *Stream {
|
||||||
|
@ -27,6 +28,7 @@ func NewStream() *Stream {
|
||||||
stream.OnConnect(stream.handlerConnect)
|
stream.OnConnect(stream.handlerConnect)
|
||||||
|
|
||||||
stream.OnBookEvent(stream.handleBookEvent)
|
stream.OnBookEvent(stream.handleBookEvent)
|
||||||
|
stream.OnMarketTradeEvent(stream.handleMaretTradeEvent)
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,6 +89,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
|
||||||
|
|
||||||
case *BookEvent:
|
case *BookEvent:
|
||||||
s.EmitBookEvent(*e)
|
s.EmitBookEvent(*e)
|
||||||
|
|
||||||
|
case *MarketTradeEvent:
|
||||||
|
s.EmitMarketTradeEvent(*e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +106,7 @@ func (s *Stream) handlerConnect() {
|
||||||
|
|
||||||
func (s *Stream) handleBookEvent(o BookEvent) {
|
func (s *Stream) handleBookEvent(o BookEvent) {
|
||||||
for _, book := range o.ToGlobalOrderBooks() {
|
for _, book := range o.ToGlobalOrderBooks() {
|
||||||
switch o.Type {
|
switch o.actionType {
|
||||||
case ActionTypeSnapshot:
|
case ActionTypeSnapshot:
|
||||||
s.EmitBookSnapshot(book)
|
s.EmitBookSnapshot(book)
|
||||||
|
|
||||||
|
@ -131,6 +136,10 @@ func convertSubscription(sub types.Subscription) (WsArg, error) {
|
||||||
arg.Channel = ChannelOrderBook
|
arg.Channel = ChannelOrderBook
|
||||||
}
|
}
|
||||||
return arg, nil
|
return arg, nil
|
||||||
|
|
||||||
|
case types.MarketTradeChannel:
|
||||||
|
arg.Channel = ChannelTrade
|
||||||
|
return arg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return arg, fmt.Errorf("unsupported stream channel: %s", sub.Channel)
|
return arg, fmt.Errorf("unsupported stream channel: %s", sub.Channel)
|
||||||
|
@ -156,10 +165,37 @@ func parseWebSocketEvent(in []byte) (interface{}, error) {
|
||||||
return nil, fmt.Errorf("failed to unmarshal data into BookEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err)
|
return nil, fmt.Errorf("failed to unmarshal data into BookEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
book.Type = event.Action
|
book.actionType = event.Action
|
||||||
book.InstId = event.Arg.InstId
|
book.instId = event.Arg.InstId
|
||||||
return &book, nil
|
return &book, nil
|
||||||
|
|
||||||
|
case ChannelTrade:
|
||||||
|
var trade MarketTradeEvent
|
||||||
|
err = json.Unmarshal(event.Data, &trade.Events)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal data into MarketTradeEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
trade.actionType = event.Action
|
||||||
|
trade.instId = event.Arg.InstId
|
||||||
|
return &trade, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("unhandled websocket event: %+v", string(in))
|
return nil, fmt.Errorf("unhandled websocket event: %+v", string(in))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Stream) handleMaretTradeEvent(m MarketTradeEvent) {
|
||||||
|
if m.actionType == ActionTypeSnapshot {
|
||||||
|
// we don't support snapshot event
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, trade := range m.Events {
|
||||||
|
globalTrade, err := trade.ToGlobal(m.instId)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("failed to convert to market trade")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.EmitMarketTrade(globalTrade)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -13,3 +13,13 @@ func (s *Stream) EmitBookEvent(o BookEvent) {
|
||||||
cb(o)
|
cb(o)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Stream) OnMarketTradeEvent(cb func(o MarketTradeEvent)) {
|
||||||
|
s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stream) EmitMarketTradeEvent(o MarketTradeEvent) {
|
||||||
|
for _, cb := range s.marketTradeEventCallbacks {
|
||||||
|
cb(o)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -92,6 +92,20 @@ func TestStream(t *testing.T) {
|
||||||
c := make(chan struct{})
|
c := make(chan struct{})
|
||||||
<-c
|
<-c
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("trade test", func(t *testing.T) {
|
||||||
|
s.Subscribe(types.MarketTradeChannel, "BTCUSDT", types.SubscribeOptions{})
|
||||||
|
s.SetPublicOnly()
|
||||||
|
err := s.Connect(context.Background())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s.OnMarketTrade(func(trade types.Trade) {
|
||||||
|
t.Log("got update", trade)
|
||||||
|
})
|
||||||
|
c := make(chan struct{})
|
||||||
|
<-c
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStream_parseWebSocketEvent(t *testing.T) {
|
func TestStream_parseWebSocketEvent(t *testing.T) {
|
||||||
|
@ -247,8 +261,8 @@ func TestStream_parseWebSocketEvent(t *testing.T) {
|
||||||
Checksum: 0,
|
Checksum: 0,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Type: actionType,
|
actionType: actionType,
|
||||||
InstId: "BTCUSDT",
|
instId: "BTCUSDT",
|
||||||
}, *book)
|
}, *book)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,6 +278,181 @@ func TestStream_parseWebSocketEvent(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_parseWebSocketEvent_MarketTrade(t *testing.T) {
|
||||||
|
t.Run("MarketTrade event", func(t *testing.T) {
|
||||||
|
input := `{
|
||||||
|
"action":"%s",
|
||||||
|
"arg":{
|
||||||
|
"instType":"sp",
|
||||||
|
"channel":"trade",
|
||||||
|
"instId":"BTCUSDT"
|
||||||
|
},
|
||||||
|
"data":[
|
||||||
|
[
|
||||||
|
"1697697791663",
|
||||||
|
"28303.43",
|
||||||
|
"0.0452",
|
||||||
|
"sell"
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"1697697794663",
|
||||||
|
"28345.67",
|
||||||
|
"0.1234",
|
||||||
|
"sell"
|
||||||
|
]
|
||||||
|
],
|
||||||
|
"ts":1697697791670
|
||||||
|
}`
|
||||||
|
|
||||||
|
eventFn := func(in string, actionType ActionType) {
|
||||||
|
res, err := parseWebSocketEvent([]byte(in))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
book, ok := res.(*MarketTradeEvent)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, MarketTradeEvent{
|
||||||
|
Events: MarketTradeSlice{
|
||||||
|
{
|
||||||
|
Ts: types.NewMillisecondTimestampFromInt(1697697791663),
|
||||||
|
Price: fixedpoint.NewFromFloat(28303.43),
|
||||||
|
Size: fixedpoint.NewFromFloat(0.0452),
|
||||||
|
Side: "sell",
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
Ts: types.NewMillisecondTimestampFromInt(1697697794663),
|
||||||
|
Price: fixedpoint.NewFromFloat(28345.67),
|
||||||
|
Size: fixedpoint.NewFromFloat(0.1234),
|
||||||
|
Side: "sell",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
actionType: actionType,
|
||||||
|
instId: "BTCUSDT",
|
||||||
|
}, *book)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("snapshot type", func(t *testing.T) {
|
||||||
|
snapshotInput := fmt.Sprintf(input, ActionTypeSnapshot)
|
||||||
|
eventFn(snapshotInput, ActionTypeSnapshot)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("update type", func(t *testing.T) {
|
||||||
|
snapshotInput := fmt.Sprintf(input, ActionTypeUpdate)
|
||||||
|
eventFn(snapshotInput, ActionTypeUpdate)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Unexpected length of market trade", func(t *testing.T) {
|
||||||
|
input := `{
|
||||||
|
"action":"%s",
|
||||||
|
"arg":{
|
||||||
|
"instType":"sp",
|
||||||
|
"channel":"trade",
|
||||||
|
"instId":"BTCUSDT"
|
||||||
|
},
|
||||||
|
"data":[
|
||||||
|
[
|
||||||
|
"1697697791663",
|
||||||
|
"28303.43",
|
||||||
|
"28303.43",
|
||||||
|
"0.0452",
|
||||||
|
"sell"
|
||||||
|
]
|
||||||
|
],
|
||||||
|
"ts":1697697791670
|
||||||
|
}`
|
||||||
|
_, err := parseWebSocketEvent([]byte(input))
|
||||||
|
assert.ErrorContains(t, err, "unexpected trades length")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Unexpected timestamp", func(t *testing.T) {
|
||||||
|
input := `{
|
||||||
|
"action":"%s",
|
||||||
|
"arg":{
|
||||||
|
"instType":"sp",
|
||||||
|
"channel":"trade",
|
||||||
|
"instId":"BTCUSDT"
|
||||||
|
},
|
||||||
|
"data":[
|
||||||
|
[
|
||||||
|
"TIMESTAMP",
|
||||||
|
"28303.43",
|
||||||
|
"0.0452",
|
||||||
|
"sell"
|
||||||
|
]
|
||||||
|
],
|
||||||
|
"ts":1697697791670
|
||||||
|
}`
|
||||||
|
_, err := parseWebSocketEvent([]byte(input))
|
||||||
|
assert.ErrorContains(t, err, "timestamp")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Unexpected price", func(t *testing.T) {
|
||||||
|
input := `{
|
||||||
|
"action":"%s",
|
||||||
|
"arg":{
|
||||||
|
"instType":"sp",
|
||||||
|
"channel":"trade",
|
||||||
|
"instId":"BTCUSDT"
|
||||||
|
},
|
||||||
|
"data":[
|
||||||
|
[
|
||||||
|
"1697697791663",
|
||||||
|
"1p",
|
||||||
|
"0.0452",
|
||||||
|
"sell"
|
||||||
|
]
|
||||||
|
],
|
||||||
|
"ts":1697697791670
|
||||||
|
}`
|
||||||
|
_, err := parseWebSocketEvent([]byte(input))
|
||||||
|
assert.ErrorContains(t, err, "price")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Unexpected size", func(t *testing.T) {
|
||||||
|
input := `{
|
||||||
|
"action":"%s",
|
||||||
|
"arg":{
|
||||||
|
"instType":"sp",
|
||||||
|
"channel":"trade",
|
||||||
|
"instId":"BTCUSDT"
|
||||||
|
},
|
||||||
|
"data":[
|
||||||
|
[
|
||||||
|
"1697697791663",
|
||||||
|
"28303.43",
|
||||||
|
"2v",
|
||||||
|
"sell"
|
||||||
|
]
|
||||||
|
],
|
||||||
|
"ts":1697697791670
|
||||||
|
}`
|
||||||
|
_, err := parseWebSocketEvent([]byte(input))
|
||||||
|
assert.ErrorContains(t, err, "size")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Unexpected side", func(t *testing.T) {
|
||||||
|
input := `{
|
||||||
|
"action":"%s",
|
||||||
|
"arg":{
|
||||||
|
"instType":"sp",
|
||||||
|
"channel":"trade",
|
||||||
|
"instId":"BTCUSDT"
|
||||||
|
},
|
||||||
|
"data":[
|
||||||
|
[
|
||||||
|
"1697697791663",
|
||||||
|
"28303.43",
|
||||||
|
"0.0452",
|
||||||
|
12345
|
||||||
|
]
|
||||||
|
],
|
||||||
|
"ts":1697697791670
|
||||||
|
}`
|
||||||
|
_, err := parseWebSocketEvent([]byte(input))
|
||||||
|
assert.ErrorContains(t, err, "side")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func Test_convertSubscription(t *testing.T) {
|
func Test_convertSubscription(t *testing.T) {
|
||||||
t.Run("BookChannel.ChannelOrderBook5", func(t *testing.T) {
|
t.Run("BookChannel.ChannelOrderBook5", func(t *testing.T) {
|
||||||
res, err := convertSubscription(types.Subscription{
|
res, err := convertSubscription(types.Subscription{
|
||||||
|
@ -310,4 +499,17 @@ func Test_convertSubscription(t *testing.T) {
|
||||||
InstId: "BTCUSDT",
|
InstId: "BTCUSDT",
|
||||||
}, res)
|
}, res)
|
||||||
})
|
})
|
||||||
|
t.Run("TradeChannel", func(t *testing.T) {
|
||||||
|
res, err := convertSubscription(types.Subscription{
|
||||||
|
Symbol: "BTCUSDT",
|
||||||
|
Channel: types.MarketTradeChannel,
|
||||||
|
Options: types.SubscribeOptions{},
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, WsArg{
|
||||||
|
InstType: instSp,
|
||||||
|
Channel: ChannelTrade,
|
||||||
|
InstId: "BTCUSDT",
|
||||||
|
}, res)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,10 @@ package bitget
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,6 +25,7 @@ const (
|
||||||
ChannelOrderBook5 ChannelType = "books5"
|
ChannelOrderBook5 ChannelType = "books5"
|
||||||
// ChannelOrderBook15 top 15 order book of "books" that begins from bid1/ask1
|
// ChannelOrderBook15 top 15 order book of "books" that begins from bid1/ask1
|
||||||
ChannelOrderBook15 ChannelType = "books15"
|
ChannelOrderBook15 ChannelType = "books15"
|
||||||
|
ChannelTrade ChannelType = "trade"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WsArg struct {
|
type WsArg struct {
|
||||||
|
@ -119,15 +122,15 @@ type BookEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// internal use
|
// internal use
|
||||||
Type ActionType
|
actionType ActionType
|
||||||
InstId string
|
instId string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *BookEvent) ToGlobalOrderBooks() []types.SliceOrderBook {
|
func (e *BookEvent) ToGlobalOrderBooks() []types.SliceOrderBook {
|
||||||
books := make([]types.SliceOrderBook, len(e.Events))
|
books := make([]types.SliceOrderBook, len(e.Events))
|
||||||
for i, event := range e.Events {
|
for i, event := range e.Events {
|
||||||
books[i] = types.SliceOrderBook{
|
books[i] = types.SliceOrderBook{
|
||||||
Symbol: e.InstId,
|
Symbol: e.instId,
|
||||||
Bids: event.Bids,
|
Bids: event.Bids,
|
||||||
Asks: event.Asks,
|
Asks: event.Asks,
|
||||||
Time: event.Ts.Time(),
|
Time: event.Ts.Time(),
|
||||||
|
@ -136,3 +139,124 @@ func (e *BookEvent) ToGlobalOrderBooks() []types.SliceOrderBook {
|
||||||
|
|
||||||
return books
|
return books
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SideType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
SideBuy SideType = "buy"
|
||||||
|
SideSell SideType = "sell"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s SideType) ToGlobal() (types.SideType, error) {
|
||||||
|
switch s {
|
||||||
|
case SideBuy:
|
||||||
|
return types.SideTypeBuy, nil
|
||||||
|
case SideSell:
|
||||||
|
return types.SideTypeSell, nil
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("unexpceted side type: %s", s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type MarketTrade struct {
|
||||||
|
Ts types.MillisecondTimestamp
|
||||||
|
Price fixedpoint.Value
|
||||||
|
Size fixedpoint.Value
|
||||||
|
Side SideType
|
||||||
|
}
|
||||||
|
|
||||||
|
type MarketTradeSlice []MarketTrade
|
||||||
|
|
||||||
|
func (m *MarketTradeSlice) UnmarshalJSON(b []byte) error {
|
||||||
|
if m == nil {
|
||||||
|
return errors.New("nil pointer of market trade slice")
|
||||||
|
}
|
||||||
|
s, err := parseMarketTradeSliceJSON(b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*m = s
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseMarketTradeSliceJSON tries to parse a 2 dimensional string array into a MarketTradeSlice
|
||||||
|
//
|
||||||
|
// [
|
||||||
|
//
|
||||||
|
// [
|
||||||
|
// "1697694819663",
|
||||||
|
// "28312.97",
|
||||||
|
// "0.1653",
|
||||||
|
// "sell"
|
||||||
|
// ],
|
||||||
|
// [
|
||||||
|
// "1697694818663",
|
||||||
|
// "28313",
|
||||||
|
// "0.1598",
|
||||||
|
// "buy"
|
||||||
|
// ]
|
||||||
|
//
|
||||||
|
// ]
|
||||||
|
func parseMarketTradeSliceJSON(in []byte) (slice MarketTradeSlice, err error) {
|
||||||
|
var rawTrades [][]json.RawMessage
|
||||||
|
|
||||||
|
err = json.Unmarshal(in, &rawTrades)
|
||||||
|
if err != nil {
|
||||||
|
return slice, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, raw := range rawTrades {
|
||||||
|
if len(raw) != 4 {
|
||||||
|
return nil, fmt.Errorf("unexpected trades length: %d, data: %q", len(raw), raw)
|
||||||
|
}
|
||||||
|
var trade MarketTrade
|
||||||
|
if err = json.Unmarshal(raw[0], &trade.Ts); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal into timestamp: %q", raw[0])
|
||||||
|
}
|
||||||
|
if err = json.Unmarshal(raw[1], &trade.Price); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal into price: %q", raw[1])
|
||||||
|
}
|
||||||
|
if err = json.Unmarshal(raw[2], &trade.Size); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal into size: %q", raw[2])
|
||||||
|
}
|
||||||
|
if err = json.Unmarshal(raw[3], &trade.Side); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal into side: %q", raw[3])
|
||||||
|
}
|
||||||
|
|
||||||
|
slice = append(slice, trade)
|
||||||
|
}
|
||||||
|
|
||||||
|
return slice, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MarketTrade) ToGlobal(symbol string) (types.Trade, error) {
|
||||||
|
side, err := m.Side.ToGlobal()
|
||||||
|
if err != nil {
|
||||||
|
return types.Trade{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return types.Trade{
|
||||||
|
ID: 0, // not supported
|
||||||
|
OrderID: 0, // not supported
|
||||||
|
Exchange: types.ExchangeBitget,
|
||||||
|
Price: m.Price,
|
||||||
|
Quantity: m.Size,
|
||||||
|
QuoteQuantity: m.Price.Mul(m.Size),
|
||||||
|
Symbol: symbol,
|
||||||
|
Side: side,
|
||||||
|
IsBuyer: side == types.SideTypeBuy,
|
||||||
|
IsMaker: false, // not supported
|
||||||
|
Time: types.Time(m.Ts.Time()),
|
||||||
|
Fee: fixedpoint.Zero, // not supported
|
||||||
|
FeeCurrency: "", // not supported
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type MarketTradeEvent struct {
|
||||||
|
Events MarketTradeSlice
|
||||||
|
|
||||||
|
// internal use
|
||||||
|
actionType ActionType
|
||||||
|
instId string
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user