Merge pull request #141 from c9s/ftx/on-orderbook-snapshot

This commit is contained in:
Yo-An Lin 2021-03-03 09:05:08 +08:00 committed by GitHub
commit 009dafd176
6 changed files with 982 additions and 8 deletions

View File

@ -5,13 +5,14 @@ import (
"fmt"
"syscall"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types"
)
// go run ./cmd/bbgo orderbook --session=ftx --symbol=btc/usdt
// go run ./cmd/bbgo orderbook --session=ftx --symbol=BTC/USDT
var orderbookCmd = &cobra.Command{
Use: "orderbook",
RunE: func(cmd *cobra.Command, args []string) error {
@ -29,13 +30,19 @@ var orderbookCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("can't get the symbol from flags: %w", err)
}
if symbol == "" {
return fmt.Errorf("symbol is not found")
}
s := ex.NewStream()
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
s.OnBookSnapshot(func(book types.OrderBook) {
log.Infof("orderbook snapshot: %+v", book)
})
if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", session)
}
// TODO: register callbacks to print orderbook and updates
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil

View File

@ -0,0 +1,814 @@
{
"channel": "orderbook",
"market": "BTC/USDT",
"type": "partial",
"data": {
"time": 1614520368.9313016,
"checksum": 2150525410,
"bids": [
[
44555.0,
3.3968
],
[
44554.0,
0.0561
],
[
44548.0,
0.1683
],
[
44542.0,
0.1762
],
[
44540.0,
0.0433
],
[
44539.0,
4.1616
],
[
44534.0,
0.0234
],
[
44533.0,
33.1201
],
[
44532.0,
8.2272
],
[
44531.0,
0.3364
],
[
44530.0,
0.0011
],
[
44527.0,
0.0074
],
[
44526.0,
0.0117
],
[
44525.0,
0.4514
],
[
44520.0,
0.001
],
[
44518.0,
0.1054
],
[
44517.0,
0.0077
],
[
44512.0,
0.8512
],
[
44511.0,
31.8569
],
[
44510.0,
0.001
],
[
44507.0,
0.0234
],
[
44506.0,
0.382
],
[
44505.0,
0.0468
],
[
44501.0,
0.0082
],
[
44500.0,
0.501
],
[
44498.0,
0.001
],
[
44496.0,
0.0269
],
[
44490.0,
0.001
],
[
44480.0,
0.001
],
[
44479.0,
0.0306
],
[
44478.0,
0.01
],
[
44477.0,
0.302
],
[
44470.0,
0.001
],
[
44469.0,
0.0001
],
[
44460.0,
0.001
],
[
44454.0,
0.001
],
[
44450.0,
0.0019
],
[
44448.0,
0.0005
],
[
44440.0,
0.001
],
[
44439.0,
28.9321
],
[
44430.0,
0.001
],
[
44420.0,
0.001
],
[
44416.0,
0.0001
],
[
44411.0,
0.0984
],
[
44410.0,
0.001
],
[
44409.0,
0.001
],
[
44408.0,
0.0004
],
[
44407.0,
0.0002
],
[
44400.0,
0.001
],
[
44397.0,
0.0002
],
[
44391.0,
0.0004
],
[
44390.0,
0.001
],
[
44389.0,
43.3904
],
[
44380.0,
0.001
],
[
44376.0,
0.0001
],
[
44375.0,
0.0001
],
[
44372.0,
0.0002
],
[
44370.0,
0.0012
],
[
44365.0,
0.001
],
[
44363.0,
0.0004
],
[
44360.0,
0.001
],
[
44354.0,
54.0385
],
[
44350.0,
0.0028
],
[
44346.0,
0.0001
],
[
44340.0,
0.0013
],
[
44338.0,
0.0002
],
[
44336.0,
39.6518
],
[
44333.0,
0.0001
],
[
44330.0,
0.001
],
[
44329.0,
0.5014
],
[
44326.0,
0.0002
],
[
44322.0,
0.001
],
[
44321.0,
0.001
],
[
44320.0,
0.001
],
[
44314.0,
0.0007
],
[
44310.0,
0.001
],
[
44306.0,
0.0001
],
[
44300.0,
33.2836
],
[
44292.0,
0.0035
],
[
44291.0,
0.0004
],
[
44290.0,
0.001
],
[
44287.0,
39.717
],
[
44285.0,
0.0439
],
[
44281.0,
1.0294
],
[
44280.0,
0.001
],
[
44277.0,
0.001
],
[
44275.0,
0.0165
],
[
44270.0,
0.001
],
[
44268.0,
48.31
],
[
44260.0,
0.0011
],
[
44254.0,
0.0003
],
[
44250.0,
0.0031
],
[
44246.0,
0.0002
],
[
44244.0,
0.0001
],
[
44241.0,
0.0009
],
[
44240.0,
0.001
],
[
44233.0,
0.001
],
[
44230.0,
0.001
],
[
44224.0,
0.0001
],
[
44222.0,
0.0002
]
],
"asks": [
[
44574.0,
0.4591
],
[
44579.0,
0.15
],
[
44582.0,
2.9122
],
[
44583.0,
0.1683
],
[
44584.0,
0.5
],
[
44588.0,
0.0433
],
[
44590.0,
8.6379
],
[
44593.0,
0.405
],
[
44595.0,
0.5988
],
[
44596.0,
0.06
],
[
44605.0,
0.6927
],
[
44606.0,
0.3365
],
[
44616.0,
0.1752
],
[
44617.0,
0.0215
],
[
44620.0,
0.008
],
[
44629.0,
0.0078
],
[
44630.0,
0.101
],
[
44631.0,
0.246
],
[
44632.0,
0.01
],
[
44635.0,
0.2997
],
[
44636.0,
26.777
],
[
44639.0,
0.662
],
[
44642.0,
0.0078
],
[
44650.0,
0.0009
],
[
44651.0,
0.0001
],
[
44652.0,
0.0079
],
[
44653.0,
0.0003
],
[
44654.0,
0.354
],
[
44661.0,
0.0306
],
[
44666.0,
0.0002
],
[
44667.0,
0.0009
],
[
44668.0,
0.0234
],
[
44672.0,
25.923
],
[
44673.0,
0.1
],
[
44674.0,
0.001
],
[
44675.0,
0.0467
],
[
44678.0,
0.1286
],
[
44680.0,
0.0467
],
[
44684.0,
0.0117
],
[
44687.0,
0.0351
],
[
44689.0,
0.1052
],
[
44693.0,
0.0132
],
[
44699.0,
0.0984
],
[
44700.0,
0.671
],
[
44709.0,
0.0007
],
[
44713.0,
45.9031
],
[
44714.0,
0.0001
],
[
44719.0,
0.001
],
[
44727.0,
0.0004
],
[
44728.0,
0.0002
],
[
44735.0,
0.0003
],
[
44744.0,
64.7511
],
[
44750.0,
0.0018
],
[
44763.0,
0.001
],
[
44775.0,
0.0006
],
[
44781.0,
0.0001
],
[
44782.0,
34.2206
],
[
44784.0,
0.0001
],
[
44790.0,
0.0002
],
[
44796.0,
0.001
],
[
44799.0,
0.0002
],
[
44800.0,
0.0011
],
[
44806.0,
0.0165
],
[
44807.0,
0.001
],
[
44813.0,
0.0001
],
[
44814.0,
0.0003
],
[
44816.0,
0.0002
],
[
44820.0,
38.3495
],
[
44822.0,
0.0026
],
[
44836.0,
0.0001
],
[
44846.0,
50.1127
],
[
44850.0,
0.0018
],
[
44851.0,
0.001
],
[
44859.0,
0.0003
],
[
44867.0,
66.5987
],
[
44876.0,
1.0294
],
[
44885.0,
0.0005
],
[
44888.0,
0.0002
],
[
44889.0,
0.0003
],
[
44895.0,
0.001
],
[
44897.0,
0.0443
],
[
44900.0,
40.9965
],
[
44909.0,
0.0008
],
[
44913.0,
0.0001
],
[
44926.0,
45.4838
],
[
44928.0,
70.5138
],
[
44938.0,
0.0005
],
[
44939.0,
0.001
],
[
44949.0,
0.0004
],
[
44950.0,
0.0019
],
[
44959.0,
0.0002
],
[
44962.0,
0.0002
],
[
44979.0,
0.0002
],
[
44982.0,
68.1033
],
[
44983.0,
0.001
],
[
44999.0,
0.0003
],
[
45000.0,
0.0273
],
[
45002.0,
0.0002
],
[
45009.0,
0.0003
],
[
45010.0,
0.0003
]
],
"action": "partial"
}
}

View File

@ -8,7 +8,7 @@ import (
)
type Stream struct {
types.StandardStream
*types.StandardStream
wsService *WebsocketService
@ -19,11 +19,11 @@ type Stream struct {
func NewStream(key, secret string) *Stream {
wss := NewWebsocketService(key, secret)
s := &Stream{
StandardStream: types.StandardStream{},
StandardStream: &types.StandardStream{},
wsService: wss,
}
wss.OnMessage(messageHandler{s.StandardStream}.handleMessage)
wss.OnMessage(messageHandler{StandardStream: s.StandardStream}.handleMessage)
return s
}

View File

@ -3,11 +3,13 @@ package ftx
import (
"encoding/json"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
type messageHandler struct {
types.StandardStream
*types.StandardStream
}
func (h messageHandler) handleMessage(message []byte) {
@ -20,6 +22,11 @@ func (h messageHandler) handleMessage(message []byte) {
switch r.Type {
case subscribedRespType:
h.handleSubscribedMessage(r)
case partialRespType:
// snapshot of current market data
h.handleSnapshot(r)
case updateRespType:
//log.Infof("update=> %s", string(message))
default:
logger.Errorf("unsupported message type: %+v", r.Type)
}
@ -27,5 +34,15 @@ func (h messageHandler) handleMessage(message []byte) {
// {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
func (h messageHandler) handleSubscribedMessage(response rawResponse) {
logger.Infof("%s orderbook is subscribed", response.toSubscribedResp().Market)
r := response.toSubscribedResp()
logger.Infof("%s %s is subscribed", r.Market, r.Channel)
}
func (h messageHandler) handleSnapshot(response rawResponse) {
r, err := response.toSnapshotResp()
if err != nil {
log.WithError(err).Errorf("failed to convert the partial response to snapshot")
return
}
h.EmitBookSnapshot(r.toGlobalOrderBook())
}

View File

@ -1,6 +1,15 @@
package ftx
import "encoding/json"
import (
"encoding/json"
"fmt"
"math"
"strings"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type operation string
@ -56,7 +65,74 @@ func (r rawResponse) toSubscribedResp() subscribedResponse {
}
}
func (r rawResponse) toSnapshotResp() (snapshotResponse, error) {
o := snapshotResponse{
mandatoryFields: r.mandatoryFields,
}
if err := json.Unmarshal(r.Data["action"], &o.Action); err != nil {
return snapshotResponse{}, fmt.Errorf("failed to unmarshal data.action field: %w", err)
}
var t float64
if err := json.Unmarshal(r.Data["time"], &t); err != nil {
return snapshotResponse{}, fmt.Errorf("failed to unmarshal data.time field: %w", err)
}
sec, dec := math.Modf(t)
o.Time = time.Unix(int64(sec), int64(dec*1e9))
if err := json.Unmarshal(r.Data["checksum"], &o.Checksum); err != nil {
return snapshotResponse{}, fmt.Errorf("failed to unmarshal data.checksum field: %w", err)
}
if err := json.Unmarshal(r.Data["bids"], &o.Bids); err != nil {
return snapshotResponse{}, fmt.Errorf("failed to unmarshal data.bids field: %w", err)
}
if err := json.Unmarshal(r.Data["asks"], &o.Asks); err != nil {
return snapshotResponse{}, fmt.Errorf("failed to unmarshal data.asks field: %w", err)
}
return o, nil
}
// {"type": "subscribed", "channel": "orderbook", "market": "BTC/USDT"}
type subscribedResponse struct {
mandatoryFields
}
type snapshotResponse struct {
mandatoryFields
Action string
Time time.Time
Checksum int64
// Best 100 orders
Bids [][]float64
// Best 100 orders
Asks [][]float64
}
func (r snapshotResponse) toGlobalOrderBook() types.OrderBook {
return types.OrderBook{
// ex. BTC/USDT
Symbol: strings.ToUpper(r.Market),
Bids: toPriceVolumeSlice(r.Bids),
Asks: toPriceVolumeSlice(r.Asks),
}
}
func toPriceVolumeSlice(orders [][]float64) types.PriceVolumeSlice {
var pv types.PriceVolumeSlice
for _, o := range orders {
pv = append(pv, types.PriceVolume{
Price: fixedpoint.NewFromFloat(o[0]),
Volume: fixedpoint.NewFromFloat(o[1]),
})
}
return pv
}

View File

@ -2,9 +2,13 @@ package ftx
import (
"encoding/json"
"io/ioutil"
"testing"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func Test_rawResponse_toSubscribedResp(t *testing.T) {
@ -16,3 +20,59 @@ func Test_rawResponse_toSubscribedResp(t *testing.T) {
assert.Equal(t, orderbook, r.Channel)
assert.Equal(t, "BTC/USDT", r.Market)
}
func Test_rawResponse_toSnapshotResp(t *testing.T) {
f, err := ioutil.ReadFile("./orderbook_snapshot.json")
assert.NoError(t, err)
var m rawResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toSnapshotResp()
assert.NoError(t, err)
assert.Equal(t, partialRespType, r.Type)
assert.Equal(t, orderbook, r.Channel)
assert.Equal(t, "BTC/USDT", r.Market)
assert.Equal(t, int64(1614520368), r.Time.Unix())
assert.Equal(t, int64(2150525410), r.Checksum)
assert.Len(t, r.Bids, 100)
assert.Equal(t, []float64{44555.0, 3.3968}, r.Bids[0])
assert.Equal(t, []float64{44554.0, 0.0561}, r.Bids[1])
assert.Len(t, r.Asks, 100)
assert.Equal(t, []float64{44574.0, 0.4591}, r.Asks[0])
assert.Equal(t, []float64{44579.0, 0.15}, r.Asks[1])
}
func Test_snapshotResponse_toGlobalOrderBook(t *testing.T) {
f, err := ioutil.ReadFile("./orderbook_snapshot.json")
assert.NoError(t, err)
var m rawResponse
assert.NoError(t, json.Unmarshal(f, &m))
r, err := m.toSnapshotResp()
assert.NoError(t, err)
b := r.toGlobalOrderBook()
assert.NoError(t, err)
assert.Equal(t, "BTC/USDT", b.Symbol)
isValid, err := b.IsValid()
assert.True(t, isValid)
assert.NoError(t, err)
assert.Len(t, b.Bids, 100)
assert.Equal(t, types.PriceVolume{
Price: fixedpoint.MustNewFromString("44555.0"),
Volume: fixedpoint.MustNewFromString("3.3968"),
}, b.Bids[0])
assert.Equal(t, types.PriceVolume{
Price: fixedpoint.MustNewFromString("44222.0"),
Volume: fixedpoint.MustNewFromString("0.0002"),
}, b.Bids[99])
assert.Len(t, b.Asks, 100)
assert.Equal(t, types.PriceVolume{
Price: fixedpoint.MustNewFromString("44574.0"),
Volume: fixedpoint.MustNewFromString("0.4591"),
}, b.Asks[0])
assert.Equal(t, types.PriceVolume{
Price: fixedpoint.MustNewFromString("45010.0"),
Volume: fixedpoint.MustNewFromString("0.0003"),
}, b.Asks[99])
}