Merge pull request #249 from jnlin/ftx/websocket-kline

Implement kline stream and subaccount feature for FTX exchange
This commit is contained in:
Yo-An Lin 2021-05-26 00:31:35 +08:00 committed by GitHub
commit 31871143a0
8 changed files with 124 additions and 27 deletions

View File

@ -100,7 +100,7 @@ var rootCmd = &cobra.Command{
})
stream.OnBalanceSnapshot(func(balances types.BalanceMap) {
log.Infof("balances: %+v",balances)
log.Infof("balances: %+v", balances)
})
streambook := types.NewStreamBook(symbol)

View File

@ -20,6 +20,10 @@ func toGlobalSymbol(original string) string {
}
func toLocalSymbol(original string) string {
if symbolMap[original] == "" {
return original
}
return symbolMap[original]
}

View File

@ -87,7 +87,7 @@ func (e *Exchange) PlatformFeeCurrency() string {
}
func (e *Exchange) NewStream() types.Stream {
return NewStream(e.key, e.secret)
return NewStream(e.key, e.secret, e.subAccount, e)
}
func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
@ -248,7 +248,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
tradeIDs := make(map[int64]struct{})
var lastTradeID int64
lastTradeID := options.LastTradeID
var trades []types.Trade
symbol = strings.ToUpper(symbol)
@ -267,15 +267,18 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
})
for _, r := range resp.Result {
// always update since to avoid infinite loop
since = r.Time.Time
if _, ok := tradeIDs[r.TradeId]; ok {
continue
}
if r.TradeId <= lastTradeID || r.Time.Before(since) || r.Time.After(until) || r.Market != symbol {
if r.TradeId <= lastTradeID || r.Time.Before(since) || r.Time.After(until) || r.Market != toLocalSymbol(symbol) {
continue
}
tradeIDs[r.TradeId] = struct{}{}
lastTradeID = r.TradeId
since = r.Time.Time
t, err := toGlobalTrade(r)
if err != nil {

View File

@ -40,6 +40,9 @@ type restRequest struct {
// payload
p map[string]interface{}
// object id
id string
}
func newRestRequest(c *http.Client, baseURL *url.URL) *restRequest {
@ -80,13 +83,23 @@ func (r *restRequest) ReferenceURL(refURL string) *restRequest {
}
func (r *restRequest) buildURL() (*url.URL, error) {
refURL, err := url.Parse(r.refURL)
u := r.refURL
if len(r.id) > 0 {
u = u + "/" + r.id
}
refURL, err := url.Parse(u)
if err != nil {
return nil, err
}
return r.baseURL.ResolveReference(refURL), nil
}
func (r *restRequest) ID(id string) *restRequest {
r.id = id
return r
}
func (r *restRequest) Payloads(payloads map[string]interface{}) *restRequest {
for k, v := range payloads {
r.p[k] = v

View File

@ -68,7 +68,7 @@ func (r *orderRequest) CancelOrderByOrderID(ctx context.Context, orderID uint64)
resp, err := r.
Method("DELETE").
ReferenceURL("api/orders").
Payloads(map[string]interface{}{"order_id": orderID}).
ID(strconv.FormatUint(orderID, 10)).
DoAuthenticatedRequest(ctx)
if err != nil {
return cancelOrderResponse{}, err
@ -85,7 +85,7 @@ func (r *orderRequest) CancelOrderByClientID(ctx context.Context, clientID strin
resp, err := r.
Method("DELETE").
ReferenceURL("api/orders/by_client_id").
Payloads(map[string]interface{}{"client_order_id": clientID}).
ID(clientID).
DoAuthenticatedRequest(ctx)
if err != nil {
return cancelOrderResponse{}, err

View File

@ -17,35 +17,47 @@ const endpoint = "wss://ftx.com/ws/"
type Stream struct {
*types.StandardStream
ws *service.WebsocketClientBase
ws *service.WebsocketClientBase
exchange *Exchange
// publicOnly can only be configured before connecting
publicOnly int32
key string
secret string
key string
secret string
subAccount string
// subscriptions are only accessed in single goroutine environment, so I don't use mutex to protect them
subscriptions []websocketRequest
subscriptions []websocketRequest
klineSubscriptions []klineSubscription
}
func NewStream(key, secret string) *Stream {
type klineSubscription struct {
symbol string
interval types.Interval
}
func NewStream(key, secret string, subAccount string, e *Exchange) *Stream {
s := &Stream{
exchange: e,
key: key,
secret: secret,
subAccount: subAccount,
StandardStream: &types.StandardStream{},
ws: service.NewWebsocketClientBase(endpoint, 3*time.Second),
}
s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
s.ws.OnConnected(func(conn *websocket.Conn) {
subs := []websocketRequest{newLoginRequest(s.key, s.secret, time.Now())}
subs := []websocketRequest{newLoginRequest(s.key, s.secret, time.Now(), s.subAccount)}
subs = append(subs, s.subscriptions...)
for _, sub := range subs {
if err := conn.WriteJSON(sub); err != nil {
s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub))
}
}
s.EmitConnect()
})
return s
@ -60,6 +72,8 @@ func (s *Stream) Connect(ctx context.Context) error {
if err := s.ws.Connect(ctx); err != nil {
return err
}
s.EmitStart()
go s.pollKLines(ctx)
go func() {
// https://docs.ftx.com/?javascript#request-process
@ -102,15 +116,77 @@ func (s *Stream) SetPublicOnly() {
atomic.StoreInt32(&s.publicOnly, 1)
}
func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) {
if channel != types.BookChannel {
panic("only support book channel now")
func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.SubscribeOptions) {
if channel == types.BookChannel {
s.addSubscription(websocketRequest{
Operation: subscribe,
Channel: orderBookChannel,
Market: toLocalSymbol(TrimUpperString(symbol)),
})
} else if channel == types.KLineChannel {
// FTX does not support kline channel, do polling
interval := types.Interval(option.Interval)
ks := klineSubscription{symbol: symbol, interval: interval}
s.klineSubscriptions = append(s.klineSubscriptions, ks)
} else {
panic("only support book/kline channel now")
}
s.addSubscription(websocketRequest{
Operation: subscribe,
Channel: orderBookChannel,
Market: toLocalSymbol(TrimUpperString(symbol)),
}
func (s *Stream) pollKLines(ctx context.Context) {
// get current kline candle
for _, sub := range s.klineSubscriptions {
klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.EmitKLineClosed(klines[len(klines)-1])
}
}
// the highest resolution of kline is 1min
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
logger.WithError(err).Errorf("pollKLines goroutine is terminated")
}
return
case <-ticker.C:
now := time.Now().Truncate(time.Minute)
for _, sub := range s.klineSubscriptions {
subTime := now.Truncate(sub.interval.Duration())
if now != subTime {
// not in the checking time slot, check next subscription
continue
}
klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.EmitKLineClosed(klines[len(klines)-1])
}
}
}
}
}
func getLastKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine {
// set since to more 30s ago to avoid getting no kline candle
since := time.Now().Add(time.Duration(-1*(interval.Minutes()*60+30)) * time.Second)
klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &since,
})
if err != nil {
logger.WithError(err).Errorf("failed to get kline data")
return klines
}
return klines
}
func (s *Stream) Close() error {

View File

@ -58,17 +58,18 @@ type loginArgs struct {
Key string `json:"key"`
Signature string `json:"sign"`
Time int64 `json:"time"`
SubAccount string `json:"subaccount"`
SubAccount string `json:"subaccount,omitempty"`
}
func newLoginRequest(key, secret string, t time.Time) websocketRequest {
func newLoginRequest(key, secret string, t time.Time, subaccount string) websocketRequest {
millis := t.UnixNano() / int64(time.Millisecond)
return websocketRequest{
Operation: login,
Login: loginArgs{
Key: key,
Signature: sign(secret, loginBody(millis)),
Time: millis,
Key: key,
Signature: sign(secret, loginBody(millis)),
Time: millis,
SubAccount: subaccount,
},
}
}

View File

@ -180,7 +180,7 @@ func Test_insertAt(t *testing.T) {
func Test_newLoginRequest(t *testing.T) {
// From API doc: https://docs.ftx.com/?javascript#authentication-2
r := newLoginRequest("", "Y2QTHI23f23f23jfjas23f23To0RfUwX3H42fvN-", time.Unix(0, 1557246346499*int64(time.Millisecond)))
r := newLoginRequest("", "Y2QTHI23f23f23jfjas23f23To0RfUwX3H42fvN-", time.Unix(0, 1557246346499*int64(time.Millisecond)), "")
expectedSignature := "d10b5a67a1a941ae9463a60b285ae845cdeac1b11edc7da9977bef0228b96de9"
assert.Equal(t, expectedSignature, r.Login.Signature)
jsonStr, err := json.Marshal(r)