bbgo_origin/exchange/max/stream.go

95 lines
1.8 KiB
Go
Raw Normal View History

2020-08-31 04:32:51 +00:00
package max
2020-10-03 10:35:28 +00:00
import (
"context"
log "github.com/sirupsen/logrus"
2020-10-05 06:25:58 +00:00
max "github.com/c9s/bbgo/exchange/max/maxapi"
"github.com/c9s/bbgo/types"
2020-10-03 10:35:28 +00:00
)
var logger = log.WithField("exchange", "max")
type Stream struct {
types.StandardStream
websocketService *max.WebSocketService
}
func NewStream(key, secret string) *Stream {
wss := max.NewWebSocketService(max.WebSocketURL, key, secret)
2020-10-03 11:14:15 +00:00
2020-10-03 10:35:28 +00:00
stream := &Stream{
websocketService: wss,
}
2020-10-03 11:14:15 +00:00
wss.OnMessage(func(message []byte) {
logger.Infof("M: %s", message)
})
2020-10-03 10:35:28 +00:00
wss.OnBookEvent(func(e max.BookEvent) {
newbook, err := e.OrderBook()
if err != nil {
logger.WithError(err).Error("book convert error")
return
}
switch e.Event {
case "snapshot":
stream.EmitBookSnapshot(newbook)
case "update":
stream.EmitBookUpdate(newbook)
}
})
2020-10-03 11:14:15 +00:00
wss.OnAccountSnapshotEvent(func(e max.AccountSnapshotEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
if err != nil {
continue
}
snapshot[balance.Currency] = *balance
}
stream.EmitBalanceSnapshot(snapshot)
})
wss.OnAccountUpdateEvent(func(e max.AccountUpdateEvent) {
snapshot := map[string]types.Balance{}
for _, bm := range e.Balances {
balance, err := bm.Balance()
if err != nil {
continue
}
snapshot[balance.Currency] = *balance
}
stream.EmitBalanceUpdate(snapshot)
})
2020-10-03 10:35:28 +00:00
return stream
}
2020-10-03 11:14:15 +00:00
func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.SubscribeOptions) {
// "book"
switch channel {
case types.KLineChannel:
panic("kline channel is not supported in max")
}
s.websocketService.Subscribe(string(channel), symbol)
}
2020-10-03 10:35:28 +00:00
func (s *Stream) Connect(ctx context.Context) error {
2020-10-03 11:14:15 +00:00
return s.websocketService.Connect(ctx)
2020-10-03 10:35:28 +00:00
}
func (s *Stream) Close() error {
return s.websocketService.Close()
}