okex: add kline command for testing kline data

This commit is contained in:
c9s 2021-05-27 18:35:34 +08:00
parent 76048633cc
commit 2844b7c3a7
6 changed files with 130 additions and 22 deletions

78
pkg/cmd/kline.go Normal file
View File

@ -0,0 +1,78 @@
package cmd
import (
"context"
"fmt"
"syscall"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
// go run ./cmd/bbgo kline --exchange=ftx --symbol=BTCUSDT
var klineCmd = &cobra.Command{
Use: "kline",
Short: "connect to the kline market data streaming service of an exchange",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
exName, err := cmd.Flags().GetString("exchange")
if err != nil {
return fmt.Errorf("can not get exchange from flags: %w", err)
}
exchangeName, err := types.ValidExchangeName(exName)
if err != nil {
return err
}
ex, err := cmdutil.NewExchange(exchangeName)
if err != nil {
return err
}
symbol, err := cmd.Flags().GetString("symbol")
if err != nil {
return fmt.Errorf("can not get the symbol from flags: %w", err)
}
if symbol == "" {
return fmt.Errorf("--symbol option is required")
}
interval, err := cmd.Flags().GetString("interval")
if err != nil {
return err
}
s := ex.NewStream()
s.SetPublicOnly()
s.Subscribe(types.KLineChannel, symbol, types.SubscribeOptions{Interval: interval})
s.OnKLineClosed(func(kline types.KLine) {
log.Infof("kline closed: %s", kline.String())
})
s.OnKLine(func(kline types.KLine) {
log.Infof("kline: %s", kline.String())
})
log.Infof("connecting...")
if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", exchangeName)
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil
},
}
func init() {
// since the public data does not require trading authentication, we use --exchange option here.
klineCmd.Flags().String("exchange", "", "the exchange name")
klineCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...")
klineCmd.Flags().String("interval", "1m", "interval of the kline (candle), .e.g, 1m, 3m, 15m")
RootCmd.AddCommand(klineCmd)
}

View File

@ -15,7 +15,7 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
// go run ./cmd/bbgo orderbook --exchange=ftx --symbol=BTC/USDT
// go run ./cmd/bbgo orderbook --exchange=ftx --symbol=BTCUSDT
var orderbookCmd = &cobra.Command{
Use: "orderbook",
Short: "connect to the order book market data streaming service of an exchange",

View File

@ -423,7 +423,7 @@ func (s *Stream) read(ctx context.Context) {
default:
s.ConnLock.Lock()
if err := s.Conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
}

View File

@ -110,7 +110,7 @@ func parseBookEntry(v *fastjson.Value) (*BookEntry, error) {
}, nil
}
func parseBookData(instrumentId string, v *fastjson.Value) (interface{}, error) {
func parseBookData(instrumentId string, v *fastjson.Value) (*BookData, error) {
data := v.GetArray("data")
if len(data) == 0 {
return nil, errors.New("empty data payload")
@ -178,17 +178,40 @@ type Candle struct {
MillisecondTimestamp int64
Time time.Time
StartTime time.Time
}
func parseCandle(channel, instrumentID string, v *fastjson.Value) (interface{}, error) {
arr, err := v.Array()
func (c *Candle) KLine() types.KLine {
interval := types.Interval(c.Interval)
endTime := c.StartTime.Add(interval.Duration() - 1*time.Millisecond)
return types.KLine{
Interval: interval,
Open: c.Open.Float64(),
High: c.High.Float64(),
Low: c.Low.Float64(),
Close: c.Close.Float64(),
StartTime: c.StartTime,
EndTime: endTime,
}
}
func parseCandle(channel, instrumentID string, v *fastjson.Value) (*Candle, error) {
data, err := v.Get("data").Array()
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, errors.New("candle data is empty")
}
arr, err := data[0].Array()
if err != nil {
return nil, err
}
if len(arr) < 7 {
return nil, fmt.Errorf("unexpected data array length: %d", len(arr))
return nil, fmt.Errorf("unexpected candle data length: %d", len(arr))
}
interval := strings.ToLower(strings.TrimPrefix(channel, "candle"))
@ -241,7 +264,7 @@ func parseCandle(channel, instrumentID string, v *fastjson.Value) (interface{},
Volume: vol,
VolumeInCurrency: volCurrency,
MillisecondTimestamp: timestamp,
Time: candleTime,
StartTime: candleTime,
}, nil
}
@ -256,7 +279,6 @@ func parseData(v *fastjson.Value) (interface{}, error) {
default:
if strings.HasPrefix(channel, "candle") {
return parseCandle(channel, instrumentId, v)
}
}

View File

@ -29,8 +29,8 @@ type Stream struct {
publicOnly bool
// public callbacks
cancelDataCallbacks []func()
bookDataCallbacks []func(data BookData)
candleDataCallbacks []func(candle Candle)
bookDataCallbacks []func(book BookData)
eventCallbacks []func(event WebSocketEvent)
}
@ -42,6 +42,11 @@ func NewStream(client *okexapi.RestClient) *Stream {
},
}
stream.OnCandleData(func(candle Candle) {
kline := candle.KLine()
stream.EmitKLine(kline)
})
stream.OnBookData(func(data BookData) {
book := data.Book()
switch data.Action {
@ -182,7 +187,7 @@ func (s *Stream) read(ctx context.Context) {
default:
s.connLock.Lock()
if err := s.Conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
}
@ -235,6 +240,9 @@ func (s *Stream) read(ctx context.Context) {
case *BookData:
s.EmitBookData(*et)
case *Candle:
s.EmitCandleData(*et)
}
}
}

View File

@ -4,23 +4,23 @@ package okex
import ()
func (s *Stream) OnCancelData(cb func()) {
s.cancelDataCallbacks = append(s.cancelDataCallbacks, cb)
func (s *Stream) OnCandleData(cb func(candle Candle)) {
s.candleDataCallbacks = append(s.candleDataCallbacks, cb)
}
func (s *Stream) EmitCancelData() {
for _, cb := range s.cancelDataCallbacks {
cb()
func (s *Stream) EmitCandleData(candle Candle) {
for _, cb := range s.candleDataCallbacks {
cb(candle)
}
}
func (s *Stream) OnBookData(cb func(data BookData)) {
func (s *Stream) OnBookData(cb func(book BookData)) {
s.bookDataCallbacks = append(s.bookDataCallbacks, cb)
}
func (s *Stream) EmitBookData(data BookData) {
func (s *Stream) EmitBookData(book BookData) {
for _, cb := range s.bookDataCallbacks {
cb(data)
cb(book)
}
}
@ -35,9 +35,9 @@ func (s *Stream) EmitEvent(event WebSocketEvent) {
}
type StreamEventHub interface {
OnCancelData(cb func())
OnCandleData(cb func(candle Candle))
OnBookData(cb func(data BookData))
OnBookData(cb func(book BookData))
OnEvent(cb func(event WebSocketEvent))
}