mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 06:53:52 +00:00
pkg/exchange: add balance event
This commit is contained in:
parent
6c96d12d99
commit
f49b14ac45
|
@ -329,3 +329,15 @@ func toLocalSide(side types.SideType) (v2.SideType, error) {
|
|||
return "", fmt.Errorf("side type %s not supported", side)
|
||||
}
|
||||
}
|
||||
|
||||
func toGlobalBalanceMap(balances []Balance) types.BalanceMap {
|
||||
bm := types.BalanceMap{}
|
||||
for _, obj := range balances {
|
||||
bm[obj.Coin] = types.Balance{
|
||||
Currency: obj.Coin,
|
||||
Available: obj.Available,
|
||||
Locked: obj.Frozen.Add(obj.Locked),
|
||||
}
|
||||
}
|
||||
return bm
|
||||
}
|
||||
|
|
|
@ -579,3 +579,22 @@ func Test_toGlobalTrade(t *testing.T) {
|
|||
FeeDiscounted: false,
|
||||
}, res)
|
||||
}
|
||||
|
||||
func Test_toGlobalBalanceMap(t *testing.T) {
|
||||
assert.Equal(t, types.BalanceMap{
|
||||
"BTC": {
|
||||
Currency: "BTC",
|
||||
Available: fixedpoint.NewFromFloat(0.5),
|
||||
Locked: fixedpoint.NewFromFloat(0.6 + 0.7),
|
||||
},
|
||||
}, toGlobalBalanceMap([]Balance{
|
||||
{
|
||||
Coin: "BTC",
|
||||
Available: fixedpoint.NewFromFloat(0.5),
|
||||
Frozen: fixedpoint.NewFromFloat(0.6),
|
||||
Locked: fixedpoint.NewFromFloat(0.7),
|
||||
LimitAvailable: fixedpoint.Zero,
|
||||
UTime: types.NewMillisecondTimestampFromInt(1699020564676),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ type Stream struct {
|
|||
marketTradeEventCallbacks []func(o MarketTradeEvent)
|
||||
KLineEventCallbacks []func(o KLineEvent)
|
||||
|
||||
accountEventCallbacks []func(e AccountEvent)
|
||||
|
||||
lastCandle map[string]types.KLine
|
||||
}
|
||||
|
||||
|
@ -51,6 +53,9 @@ func NewStream(key, secret, passphrase string) *Stream {
|
|||
stream.OnBookEvent(stream.handleBookEvent)
|
||||
stream.OnMarketTradeEvent(stream.handleMaretTradeEvent)
|
||||
stream.OnKLineEvent(stream.handleKLineEvent)
|
||||
|
||||
stream.OnAuth(stream.handleAuth)
|
||||
stream.OnAccountEvent(stream.handleAccountEvent)
|
||||
return stream
|
||||
}
|
||||
|
||||
|
@ -108,6 +113,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
|
|||
if err := e.IsValid(); err != nil {
|
||||
log.Errorf("invalid event: %v", err)
|
||||
}
|
||||
if e.IsAuthenticated() {
|
||||
s.EmitAuth()
|
||||
}
|
||||
|
||||
case *BookEvent:
|
||||
s.EmitBookEvent(*e)
|
||||
|
@ -118,6 +126,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
|
|||
case *KLineEvent:
|
||||
s.EmitKLineEvent(*e)
|
||||
|
||||
case *AccountEvent:
|
||||
s.EmitAccountEvent(*e)
|
||||
|
||||
case []byte:
|
||||
// We only handle the 'pong' case. Others are unexpected.
|
||||
if !bytes.Equal(e, pongBytes) {
|
||||
|
@ -126,6 +137,22 @@ func (s *Stream) dispatchEvent(event interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Stream) handleAuth() {
|
||||
if err := s.Conn.WriteJSON(WsOp{
|
||||
Op: WsEventSubscribe,
|
||||
Args: []WsArg{
|
||||
{
|
||||
InstType: instSpV2,
|
||||
Channel: ChannelAccount,
|
||||
Coin: "default", // default all
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
log.WithError(err).Error("failed to send subscription request")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) handlerConnect() {
|
||||
if s.PublicOnly {
|
||||
// errors are handled in the syncSubscriptions, so they are skipped here.
|
||||
|
@ -236,6 +263,17 @@ func parseEvent(in []byte) (interface{}, error) {
|
|||
|
||||
ch := event.Arg.Channel
|
||||
switch ch {
|
||||
case ChannelAccount:
|
||||
var acct AccountEvent
|
||||
err = json.Unmarshal(event.Data, &acct.Balances)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal data into AccountEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err)
|
||||
}
|
||||
|
||||
acct.actionType = event.Action
|
||||
acct.instId = event.Arg.InstId
|
||||
return &acct, nil
|
||||
|
||||
case ChannelOrderBook, ChannelOrderBook5, ChannelOrderBook15:
|
||||
var book BookEvent
|
||||
err = json.Unmarshal(event.Data, &book.Events)
|
||||
|
@ -319,3 +357,16 @@ func (s *Stream) handleKLineEvent(k KLineEvent) {
|
|||
s.lastCandle[k.CacheKey()] = kLine
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) handleAccountEvent(m AccountEvent) {
|
||||
balanceMap := toGlobalBalanceMap(m.Balances)
|
||||
if len(balanceMap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if m.actionType == ActionTypeUpdate {
|
||||
s.StandardStream.EmitBalanceUpdate(balanceMap)
|
||||
return
|
||||
}
|
||||
s.StandardStream.EmitBalanceSnapshot(balanceMap)
|
||||
}
|
||||
|
|
|
@ -33,3 +33,13 @@ func (s *Stream) EmitKLineEvent(o KLineEvent) {
|
|||
cb(o)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) OnAccountEvent(cb func(e AccountEvent)) {
|
||||
s.accountEventCallbacks = append(s.accountEventCallbacks, cb)
|
||||
}
|
||||
|
||||
func (s *Stream) EmitAccountEvent(e AccountEvent) {
|
||||
for _, cb := range s.accountEventCallbacks {
|
||||
cb(e)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,6 +128,13 @@ func TestStream(t *testing.T) {
|
|||
err := s.Connect(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
s.OnBalanceSnapshot(func(balances types.BalanceMap) {
|
||||
t.Log("get balances", balances)
|
||||
})
|
||||
s.OnBalanceUpdate(func(balances types.BalanceMap) {
|
||||
t.Log("get update", balances)
|
||||
})
|
||||
|
||||
c := make(chan struct{})
|
||||
<-c
|
||||
})
|
||||
|
|
|
@ -13,12 +13,14 @@ import (
|
|||
type InstType string
|
||||
|
||||
const (
|
||||
instSp InstType = "sp"
|
||||
instSp InstType = "sp"
|
||||
instSpV2 InstType = "SPOT"
|
||||
)
|
||||
|
||||
type ChannelType string
|
||||
|
||||
const (
|
||||
ChannelAccount ChannelType = "account"
|
||||
// ChannelOrderBook snapshot and update might return less than 200 bids/asks as per symbol's orderbook various from
|
||||
// each other; The number of bids/asks is not a fixed value and may vary in the future
|
||||
ChannelOrderBook ChannelType = "books"
|
||||
|
@ -34,6 +36,7 @@ type WsArg struct {
|
|||
Channel ChannelType `json:"channel"`
|
||||
// InstId Instrument ID. e.q. BTCUSDT, ETHUSDT
|
||||
InstId string `json:"instId"`
|
||||
Coin string `json:"coin"`
|
||||
|
||||
ApiKey string `json:"apiKey"`
|
||||
Passphrase string `json:"passphrase"`
|
||||
|
@ -95,6 +98,10 @@ func (w *WsEvent) IsValid() error {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *WsEvent) IsAuthenticated() bool {
|
||||
return w.Event == WsEventLogin && w.Code == 0
|
||||
}
|
||||
|
||||
type ActionType string
|
||||
|
||||
const (
|
||||
|
@ -398,3 +405,23 @@ func (k KLineEvent) CacheKey() string {
|
|||
// e.q: candle5m.BTCUSDT
|
||||
return fmt.Sprintf("%s.%s", k.channel, k.instId)
|
||||
}
|
||||
|
||||
type Balance struct {
|
||||
Coin string `json:"coin"`
|
||||
Available fixedpoint.Value `json:"available"`
|
||||
// Amount of frozen assets Usually frozen when the order is placed
|
||||
Frozen fixedpoint.Value `json:"frozen"`
|
||||
// Amount of locked assets Locked assests required to become a fiat merchants, etc.
|
||||
Locked fixedpoint.Value `json:"locked"`
|
||||
// Restricted availability For spot copy trading
|
||||
LimitAvailable fixedpoint.Value `json:"limitAvailable"`
|
||||
UTime types.MillisecondTimestamp `json:"uTime"`
|
||||
}
|
||||
|
||||
type AccountEvent struct {
|
||||
Balances []Balance
|
||||
|
||||
// internal use
|
||||
actionType ActionType
|
||||
instId string
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user