bbgo_origin/examples/kucoin/websocket.go

156 lines
3.0 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
"errors"
"os"
"os/signal"
"time"
"github.com/c9s/bbgo/pkg/exchange/kucoin"
"github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
func init() {
rootCmd.AddCommand(websocketCmd)
}
var websocketCmd = &cobra.Command{
Use: "websocket",
// SilenceUsage is an option to silence usage when an error occurs.
SilenceUsage: true,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return nil
}
var ctx = context.Background()
var t = args[0]
var err error
var bullet *kucoinapi.Bullet
switch t {
case "public":
bullet, err = client.BulletService.NewGetPublicBulletRequest().Do(ctx)
if err != nil {
return err
}
logrus.Infof("public bullet: %+v", bullet)
case "private":
bullet, err = client.BulletService.NewGetPrivateBulletRequest().Do(ctx)
if err != nil {
return err
}
logrus.Infof("private bullet: %+v", bullet)
default:
return errors.New("valid bullet type: public, private")
}
2021-12-22 16:41:10 +00:00
u, err := bullet.URL()
if err != nil {
return err
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
logrus.Infof("connecting %s", u.String())
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
2021-12-22 16:30:17 +00:00
return err
}
2021-12-22 16:30:17 +00:00
defer c.Close()
2021-12-23 13:09:40 +00:00
id := time.Now().UnixNano() / int64(time.Millisecond)
wsCmds := []kucoin.WebSocketCommand{
/*
{
Id: id+1,
Type: "subscribe",
Topic: "/market/ticker:ETH-USDT",
PrivateChannel: false,
Response: true,
},
*/
{
Id: id + 2,
Type: "subscribe",
Topic: "/market/candles:ETH-USDT_1min",
PrivateChannel: false,
Response: true,
},
2021-12-22 16:30:17 +00:00
}
for _, wsCmd := range wsCmds {
err = c.WriteJSON(wsCmd)
if err != nil {
return err
}
2021-12-22 16:30:17 +00:00
}
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
logrus.Infoln("read:", err)
return
}
logrus.Infof("recv: %s", message)
}
}()
pingTicker := time.NewTicker(bullet.PingInterval())
defer pingTicker.Stop()
for {
select {
case <-done:
return nil
case <-pingTicker.C:
if err := c.WriteJSON(kucoin.WebSocketCommand{
2021-12-23 13:09:40 +00:00
Id: time.Now().UnixNano() / int64(time.Millisecond),
Type: "ping",
}); err != nil {
logrus.WithError(err).Error("websocket ping error", err)
}
case <-interrupt:
logrus.Infof("interrupt")
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
logrus.Error("write close:", err)
return nil
}
select {
case <-done:
case <-time.After(time.Second):
}
return nil
}
}
return nil
},
}