mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
Merge pull request #135 from c9s/feature/base-websocket-client
Feature: base websocket client
This commit is contained in:
commit
78e647ca2e
|
@ -7,6 +7,8 @@ import (
|
|||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
)
|
||||
|
@ -16,8 +18,11 @@ const (
|
|||
defaultHTTPTimeout = 15 * time.Second
|
||||
)
|
||||
|
||||
var logger = logrus.WithField("exchange", "ftx")
|
||||
|
||||
type Exchange struct {
|
||||
rest *restRequest
|
||||
rest *restRequest
|
||||
key, secret string
|
||||
}
|
||||
|
||||
func NewExchange(key, secret string, subAccount string) *Exchange {
|
||||
|
@ -30,7 +35,9 @@ func NewExchange(key, secret string, subAccount string) *Exchange {
|
|||
rest.SubAccount(subAccount)
|
||||
}
|
||||
return &Exchange{
|
||||
rest: rest,
|
||||
rest: rest,
|
||||
key: key,
|
||||
secret: secret,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,7 +50,7 @@ func (e *Exchange) PlatformFeeCurrency() string {
|
|||
}
|
||||
|
||||
func (e *Exchange) NewStream() types.Stream {
|
||||
panic("implement me")
|
||||
return NewStream(e.key, e.secret)
|
||||
}
|
||||
|
||||
func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
|
||||
|
|
74
pkg/exchange/ftx/stream.go
Normal file
74
pkg/exchange/ftx/stream.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package ftx
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
)
|
||||
|
||||
type Stream struct {
|
||||
types.StandardStream
|
||||
|
||||
wsService *WebsocketService
|
||||
|
||||
// publicOnly must be accessed atomically
|
||||
publicOnly int32
|
||||
}
|
||||
|
||||
func NewStream(key, secret string) *Stream {
|
||||
wss := NewWebsocketService(key, secret)
|
||||
s := &Stream{
|
||||
StandardStream: types.StandardStream{},
|
||||
wsService: wss,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Stream) Connect(ctx context.Context) error {
|
||||
return s.wsService.Connect(ctx)
|
||||
}
|
||||
|
||||
func (s *Stream) SetPublicOnly() {
|
||||
atomic.StoreInt32(&s.publicOnly, 1)
|
||||
}
|
||||
|
||||
func (s *Stream) Close() error {
|
||||
return s.wsService.Close()
|
||||
}
|
||||
|
||||
func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnTradeUpdate(cb func(trade types.Trade)) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnOrderUpdate(cb func(order types.Order)) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnBalanceSnapshot(cb func(balances types.BalanceMap)) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnBalanceUpdate(cb func(balances types.BalanceMap)) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnKLineClosed(cb func(kline types.KLine)) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnKLine(cb func(kline types.KLine)) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnBookUpdate(cb func(book types.OrderBook)) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Stream) OnBookSnapshot(cb func(book types.OrderBook)) {
|
||||
panic("implement me")
|
||||
}
|
29
pkg/exchange/ftx/websocket.go
Normal file
29
pkg/exchange/ftx/websocket.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package ftx
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/service"
|
||||
)
|
||||
|
||||
type WebsocketService struct {
|
||||
*service.WebsocketClientBase
|
||||
|
||||
key string
|
||||
secret string
|
||||
}
|
||||
|
||||
const endpoint = "wss://ftx.com/ws/"
|
||||
|
||||
func NewWebsocketService(key string, secret string) *WebsocketService {
|
||||
s := &WebsocketService{
|
||||
WebsocketClientBase: service.NewWebsocketClientBase(endpoint, 3*time.Second),
|
||||
key: key,
|
||||
secret: secret,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (w *WebsocketService) Close() error {
|
||||
return w.Conn().Close()
|
||||
}
|
98
pkg/service/websocket.go
Normal file
98
pkg/service/websocket.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
//go:generate callbackgen -type WebsocketClientBase
|
||||
type WebsocketClientBase struct {
|
||||
baseURL string
|
||||
|
||||
// mu protects conn
|
||||
mu sync.Mutex
|
||||
conn *websocket.Conn
|
||||
reconnectC chan struct{}
|
||||
reconnectDuration time.Duration
|
||||
|
||||
connectedCallbacks []func(conn *websocket.Conn)
|
||||
disconnectedCallbacks []func(conn *websocket.Conn)
|
||||
messageCallbacks []func(message []byte)
|
||||
errorCallbacks []func(err error)
|
||||
}
|
||||
|
||||
func NewWebsocketClientBase(baseURL string, reconnectDuration time.Duration) *WebsocketClientBase {
|
||||
return &WebsocketClientBase{
|
||||
baseURL: baseURL,
|
||||
reconnectC: make(chan struct{}, 1),
|
||||
reconnectDuration: reconnectDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) Listen(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.reconnectC:
|
||||
time.Sleep(s.reconnectDuration)
|
||||
if err := s.connect(ctx); err != nil {
|
||||
s.reconnect()
|
||||
}
|
||||
default:
|
||||
conn := s.Conn()
|
||||
mt, msg, err := conn.ReadMessage()
|
||||
|
||||
if err != nil {
|
||||
s.reconnect()
|
||||
continue
|
||||
}
|
||||
|
||||
if mt != websocket.TextMessage {
|
||||
continue
|
||||
}
|
||||
|
||||
s.EmitMessage(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) Connect(ctx context.Context) error {
|
||||
if err := s.connect(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
go s.Listen(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) reconnect() {
|
||||
select {
|
||||
case s.reconnectC <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) connect(ctx context.Context) error {
|
||||
dialer := websocket.DefaultDialer
|
||||
conn, _, err := dialer.DialContext(ctx, s.baseURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.conn = conn
|
||||
s.mu.Unlock()
|
||||
|
||||
s.EmitConnected(conn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) Conn() *websocket.Conn {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.conn
|
||||
}
|
47
pkg/service/websocketclientbase_callbacks.go
Normal file
47
pkg/service/websocketclientbase_callbacks.go
Normal file
|
@ -0,0 +1,47 @@
|
|||
// Code generated by "callbackgen -type WebsocketClientBase"; DO NOT EDIT.
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func (s *WebsocketClientBase) OnConnected(cb func(conn *websocket.Conn)) {
|
||||
s.connectedCallbacks = append(s.connectedCallbacks, cb)
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) EmitConnected(conn *websocket.Conn) {
|
||||
for _, cb := range s.connectedCallbacks {
|
||||
cb(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) OnDisconnected(cb func(conn *websocket.Conn)) {
|
||||
s.disconnectedCallbacks = append(s.disconnectedCallbacks, cb)
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) EmitDisconnected(conn *websocket.Conn) {
|
||||
for _, cb := range s.disconnectedCallbacks {
|
||||
cb(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) OnMessage(cb func(message []byte)) {
|
||||
s.messageCallbacks = append(s.messageCallbacks, cb)
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) EmitMessage(message []byte) {
|
||||
for _, cb := range s.messageCallbacks {
|
||||
cb(message)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) OnError(cb func(err error)) {
|
||||
s.errorCallbacks = append(s.errorCallbacks, cb)
|
||||
}
|
||||
|
||||
func (s *WebsocketClientBase) EmitError(err error) {
|
||||
for _, cb := range s.errorCallbacks {
|
||||
cb(err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user