bbgo_origin/pkg/exchange/max/stream.go

262 lines
5.8 KiB
Go
Raw Normal View History

2020-08-31 04:32:51 +00:00
package max
2020-10-03 10:35:28 +00:00
import (
"context"
"os"
2020-10-19 14:23:49 +00:00
"strconv"
"time"
2020-10-03 10:35:28 +00:00
2020-11-07 04:38:57 +00:00
"github.com/gorilla/websocket"
2020-10-11 08:46:15 +00:00
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
2020-10-11 08:46:15 +00:00
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
2020-10-03 10:35:28 +00:00
)
var logger = log.WithField("exchange", "max")
type Stream struct {
types.StandardStream
websocketService *max.WebSocketService
2020-12-21 07:43:54 +00:00
publicOnly bool
2020-10-03 10:35:28 +00:00
}
func NewStream(key, secret string) *Stream {
url := os.Getenv("MAX_API_WS_URL")
if url == "" {
url = max.WebSocketURL
}
2020-10-03 11:14:15 +00:00
wss := max.NewWebSocketService(url, key, secret)
2020-10-03 10:35:28 +00:00
stream := &Stream{
websocketService: wss,
}
2020-12-21 07:43:54 +00:00
wss.OnConnect(func(conn *websocket.Conn) {
if key == "" || secret == "" {
log.Warn("MAX API key or secret is empty, will not send authentication command")
} else {
if err := wss.Auth(); err != nil {
wss.EmitError(err)
logger.WithError(err).Error("failed to send auth request")
}
}
})
2021-03-15 02:27:01 +00:00
wss.OnDisconnect(stream.EmitDisconnect)
2020-10-03 11:14:15 +00:00
wss.OnMessage(func(message []byte) {
2020-11-05 07:04:56 +00:00
logger.Debugf("M: %s", message)
})
wss.OnKLineEvent(func(e max.KLineEvent) {
kline := e.KLine.KLine()
stream.EmitKLine(kline)
if kline.Closed {
stream.EmitKLineClosed(kline)
}
2020-10-03 11:14:15 +00:00
})
wss.OnOrderSnapshotEvent(func(e max.OrderSnapshotEvent) {
for _, o := range e.Orders {
globalOrder, err := toGlobalOrderUpdate(o)
if err != nil {
log.WithError(err).Error("websocket order snapshot convert error")
continue
}
stream.EmitOrderUpdate(*globalOrder)
}
})
wss.OnOrderUpdateEvent(func(e max.OrderUpdateEvent) {
for _, o := range e.Orders {
globalOrder, err := toGlobalOrderUpdate(o)
if err != nil {
log.WithError(err).Error("websocket order update convert error")
continue
}
stream.EmitOrderUpdate(*globalOrder)
}
})
2020-10-19 14:23:49 +00:00
wss.OnTradeUpdateEvent(func(e max.TradeUpdateEvent) {
for _, tradeUpdate := range e.Trades {
trade, err := convertWebSocketTrade(tradeUpdate)
if err != nil {
log.WithError(err).Error("websocket trade update convert error")
return
}
stream.EmitTradeUpdate(*trade)
2020-10-19 14:23:49 +00:00
}
})
2020-10-03 10:35:28 +00:00
wss.OnBookEvent(func(e max.BookEvent) {
2020-12-21 07:43:54 +00:00
newBook, err := e.OrderBook()
2020-10-03 10:35:28 +00:00
if err != nil {
logger.WithError(err).Error("book convert error")
return
}
2020-12-21 07:43:54 +00:00
newBook.Symbol = toGlobalSymbol(e.Market)
2020-10-03 10:35:28 +00:00
switch e.Event {
case "snapshot":
2020-12-21 07:43:54 +00:00
stream.EmitBookSnapshot(newBook)
2020-10-03 10:35:28 +00:00
case "update":
2020-12-21 07:43:54 +00:00
stream.EmitBookUpdate(newBook)
2020-10-03 10:35:28 +00:00
}
})
2020-11-07 04:38:57 +00:00
wss.OnConnect(func(conn *websocket.Conn) {
stream.EmitConnect()
})
2020-10-03 11:14:15 +00:00
wss.OnAccountSnapshotEvent(func(e max.AccountSnapshotEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
if err != nil {
continue
}
2020-10-19 13:33:12 +00:00
snapshot[toGlobalCurrency(balance.Currency)] = *balance
2020-10-03 11:14:15 +00:00
}
stream.EmitBalanceSnapshot(snapshot)
})
wss.OnAccountUpdateEvent(func(e max.AccountUpdateEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
if err != nil {
continue
}
2020-10-19 13:33:12 +00:00
snapshot[toGlobalCurrency(balance.Currency)] = *balance
2020-10-03 11:14:15 +00:00
}
stream.EmitBalanceUpdate(snapshot)
})
2020-12-28 08:24:35 +00:00
wss.OnError(func(err error) {
log.WithError(err).Error("websocket error")
})
2020-10-03 10:35:28 +00:00
return stream
}
2020-12-21 07:43:54 +00:00
func (s *Stream) SetPublicOnly() {
s.publicOnly = true
}
2020-10-03 11:14:15 +00:00
func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) {
2021-02-22 09:36:23 +00:00
opt := max.SubscribeOptions{}
if len(options.Depth) > 0 {
depth, err := strconv.Atoi(options.Depth)
if err != nil {
panic(err)
}
opt.Depth = depth
}
if len(options.Interval) > 0 {
opt.Resolution = options.Interval
}
s.websocketService.Subscribe(string(channel), toLocalSymbol(symbol), opt)
2020-10-03 11:14:15 +00:00
}
2020-10-03 10:35:28 +00:00
func (s *Stream) Connect(ctx context.Context) error {
err := s.websocketService.Connect(ctx)
if err != nil {
return err
}
s.EmitStart()
return nil
2020-10-03 10:35:28 +00:00
}
func (s *Stream) Close() error {
return s.websocketService.Close()
}
2020-10-19 14:23:49 +00:00
func convertWebSocketTrade(t max.TradeUpdate) (*types.Trade, error) {
// skip trade ID that is the same. however this should not happen
var side = toGlobalSideType(t.Side)
// trade time
mts := time.Unix(0, t.Timestamp*int64(time.Millisecond))
price, err := strconv.ParseFloat(t.Price, 64)
if err != nil {
return nil, err
}
quantity, err := strconv.ParseFloat(t.Volume, 64)
if err != nil {
return nil, err
}
quoteQuantity := price * quantity
fee, err := strconv.ParseFloat(t.Fee, 64)
if err != nil {
return nil, err
}
return &types.Trade{
ID: int64(t.ID),
2020-11-17 04:46:55 +00:00
OrderID: t.OrderID,
2020-10-19 14:23:49 +00:00
Symbol: toGlobalSymbol(t.Market),
Exchange: types.ExchangeMax,
2020-11-17 07:48:18 +00:00
Price: price,
2020-10-19 14:23:49 +00:00
Quantity: quantity,
Side: side,
IsBuyer: side == types.SideTypeBuy,
2020-10-19 14:23:49 +00:00
IsMaker: t.Maker,
Fee: fee,
FeeCurrency: toGlobalCurrency(t.FeeCurrency),
QuoteQuantity: quoteQuantity,
2021-05-19 17:32:26 +00:00
Time: types.Time(mts),
2020-10-19 14:23:49 +00:00
}, nil
}
func toGlobalOrderUpdate(u max.OrderUpdate) (*types.Order, error) {
executedVolume, err := fixedpoint.NewFromString(u.ExecutedVolume)
if err != nil {
return nil, err
}
remainingVolume, err := fixedpoint.NewFromString(u.RemainingVolume)
if err != nil {
return nil, err
}
return &types.Order{
SubmitOrder: types.SubmitOrder{
ClientOrderID: u.ClientOID,
2020-11-07 04:19:57 +00:00
Symbol: toGlobalSymbol(u.Market),
Side: toGlobalSideType(u.Side),
Type: toGlobalOrderType(u.OrderType),
Quantity: util.MustParseFloat(u.Volume),
Price: util.MustParseFloat(u.Price),
StopPrice: util.MustParseFloat(u.StopPrice),
TimeInForce: "GTC", // MAX only supports GTC
2021-03-16 10:39:18 +00:00
GroupID: u.GroupID,
},
Exchange: types.ExchangeMax,
OrderID: u.ID,
Status: toGlobalOrderStatus(u.State, executedVolume, remainingVolume),
ExecutedQuantity: executedVolume.Float64(),
2021-05-19 17:32:26 +00:00
CreationTime: types.Time(time.Unix(0, u.CreatedAtMs*int64(time.Millisecond))),
}, nil
}