From 6c96d12d99e77178f83e5fd7bc233c57b0007f27 Mon Sep 17 00:00:00 2001 From: Edwin Date: Fri, 10 Nov 2023 21:56:18 +0800 Subject: [PATCH 1/2] pkg/exchange: add login method --- pkg/exchange/bitget/bitgetapi/client.go | 6 ++--- pkg/exchange/bitget/bitgetapi/v2/client.go | 4 +++ pkg/exchange/bitget/exchange.go | 3 +-- pkg/exchange/bitget/stream.go | 31 +++++++++++++++++++--- pkg/exchange/bitget/stream_test.go | 12 ++++++++- pkg/exchange/bitget/types.go | 8 +++++- 6 files changed, 53 insertions(+), 11 deletions(-) diff --git a/pkg/exchange/bitget/bitgetapi/client.go b/pkg/exchange/bitget/bitgetapi/client.go index 19b145d5d..823f3a7c0 100644 --- a/pkg/exchange/bitget/bitgetapi/client.go +++ b/pkg/exchange/bitget/bitgetapi/client.go @@ -76,7 +76,7 @@ func (c *RestClient) NewAuthenticatedRequest(ctx context.Context, method, refURL } // See https://bitgetlimited.github.io/apidoc/en/spot/#signature - // sign( + // Sign( // timestamp + // method.toUpperCase() + // requestPath + "?" + queryString + @@ -94,7 +94,7 @@ func (c *RestClient) NewAuthenticatedRequest(ctx context.Context, method, refURL } signKey := timestamp + strings.ToUpper(method) + path + string(body) - signature := sign(signKey, c.secret) + signature := Sign(signKey, c.secret) req, err := http.NewRequestWithContext(ctx, method, pathURL.String(), bytes.NewReader(body)) if err != nil { @@ -110,7 +110,7 @@ func (c *RestClient) NewAuthenticatedRequest(ctx context.Context, method, refURL return req, nil } -func sign(payload string, secret string) string { +func Sign(payload string, secret string) string { var sig = hmac.New(sha256.New, []byte(secret)) _, err := sig.Write([]byte(payload)) if err != nil { diff --git a/pkg/exchange/bitget/bitgetapi/v2/client.go b/pkg/exchange/bitget/bitgetapi/v2/client.go index 3a2b2204d..d15cd889b 100644 --- a/pkg/exchange/bitget/bitgetapi/v2/client.go +++ b/pkg/exchange/bitget/bitgetapi/v2/client.go @@ -6,6 +6,10 @@ import ( "github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi" ) +const ( + PrivateWebSocketURL = "wss://ws.bitget.com/v2/ws/private" +) + type APIResponse = bitgetapi.APIResponse type Client struct { diff --git a/pkg/exchange/bitget/exchange.go b/pkg/exchange/bitget/exchange.go index 05a10bb26..988b626d2 100644 --- a/pkg/exchange/bitget/exchange.go +++ b/pkg/exchange/bitget/exchange.go @@ -83,8 +83,7 @@ func (e *Exchange) PlatformFeeCurrency() string { } func (e *Exchange) NewStream() types.Stream { - // TODO implement me - panic("implement me") + return NewStream(e.key, e.secret, e.passphrase) } func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) { diff --git a/pkg/exchange/bitget/stream.go b/pkg/exchange/bitget/stream.go index 039c65127..4636be4db 100644 --- a/pkg/exchange/bitget/stream.go +++ b/pkg/exchange/bitget/stream.go @@ -5,10 +5,14 @@ import ( "context" "encoding/json" "fmt" - "github.com/gorilla/websocket" + "strconv" "strings" + "time" + + "github.com/gorilla/websocket" "github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi" + v2 "github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi/v2" "github.com/c9s/bbgo/pkg/types" ) @@ -21,6 +25,7 @@ var ( type Stream struct { types.StandardStream + key, secret, passphrase string bookEventCallbacks []func(o BookEvent) marketTradeEventCallbacks []func(o MarketTradeEvent) KLineEventCallbacks []func(o KLineEvent) @@ -28,10 +33,13 @@ type Stream struct { lastCandle map[string]types.KLine } -func NewStream() *Stream { +func NewStream(key, secret, passphrase string) *Stream { stream := &Stream{ StandardStream: types.NewStandardStream(), lastCandle: map[string]types.KLine{}, + key: key, + secret: secret, + passphrase: passphrase, } stream.SetEndpointCreator(stream.createEndpoint) @@ -89,7 +97,7 @@ func (s *Stream) createEndpoint(_ context.Context) (string, error) { if s.PublicOnly { url = bitgetapi.PublicWebSocketURL } else { - url = bitgetapi.PrivateWebSocketURL + url = v2.PrivateWebSocketURL } return url, nil } @@ -123,7 +131,22 @@ func (s *Stream) handlerConnect() { // errors are handled in the syncSubscriptions, so they are skipped here. _ = s.syncSubscriptions(WsEventSubscribe) } else { - log.Error("*** PRIVATE API NOT IMPLEMENTED ***") + timestamp := strconv.FormatInt(time.Now().Unix(), 10) + + if err := s.Conn.WriteJSON(WsOp{ + Op: WsEventLogin, + Args: []WsArg{ + { + ApiKey: s.key, + Passphrase: s.passphrase, + Timestamp: timestamp, + Sign: bitgetapi.Sign(fmt.Sprintf("%sGET/user/verify", timestamp), s.secret), + }, + }, + }); err != nil { + log.WithError(err).Error("failed to auth request") + return + } } } diff --git a/pkg/exchange/bitget/stream_test.go b/pkg/exchange/bitget/stream_test.go index b33e6afa2..d05cbd037 100644 --- a/pkg/exchange/bitget/stream_test.go +++ b/pkg/exchange/bitget/stream_test.go @@ -19,7 +19,9 @@ func getTestClientOrSkip(t *testing.T) *Stream { t.Skip("skip test for CI") } - return NewStream() + return NewStream(os.Getenv("BITGET_API_KEY"), + os.Getenv("BITGET_API_SECRET"), + os.Getenv("BITGET_API_PASSPHRASE")) } func TestStream(t *testing.T) { @@ -122,6 +124,14 @@ func TestStream(t *testing.T) { <-c }) + t.Run("private test", func(t *testing.T) { + err := s.Connect(context.Background()) + assert.NoError(t, err) + + c := make(chan struct{}) + <-c + }) + } func TestStream_parseWebSocketEvent(t *testing.T) { diff --git a/pkg/exchange/bitget/types.go b/pkg/exchange/bitget/types.go index a1107cad6..f6fbfa06c 100644 --- a/pkg/exchange/bitget/types.go +++ b/pkg/exchange/bitget/types.go @@ -34,6 +34,11 @@ type WsArg struct { Channel ChannelType `json:"channel"` // InstId Instrument ID. e.q. BTCUSDT, ETHUSDT InstId string `json:"instId"` + + ApiKey string `json:"apiKey"` + Passphrase string `json:"passphrase"` + Timestamp string `json:"timestamp"` + Sign string `json:"sign"` } type WsEventType string @@ -41,6 +46,7 @@ type WsEventType string const ( WsEventSubscribe WsEventType = "subscribe" WsEventUnsubscribe WsEventType = "unsubscribe" + WsEventLogin WsEventType = "login" WsEventError WsEventType = "error" ) @@ -76,7 +82,7 @@ func (w *WsEvent) IsValid() error { case WsEventError: return fmt.Errorf("websocket request error, op: %s, code: %d, msg: %s", w.Op, w.Code, w.Msg) - case WsEventSubscribe, WsEventUnsubscribe: + case WsEventSubscribe, WsEventUnsubscribe, WsEventLogin: // Actually, this code is unnecessary because the events are either `Subscribe` or `Unsubscribe`, But to avoid bugs // in the exchange, we still check. if w.Code != 0 || len(w.Msg) != 0 { From f49b14ac45f6d23708e547baa8216a7f3994ae77 Mon Sep 17 00:00:00 2001 From: Edwin Date: Fri, 10 Nov 2023 22:35:39 +0800 Subject: [PATCH 2/2] pkg/exchange: add balance event --- pkg/exchange/bitget/convert.go | 12 ++++++ pkg/exchange/bitget/convert_test.go | 19 +++++++++ pkg/exchange/bitget/stream.go | 51 +++++++++++++++++++++++++ pkg/exchange/bitget/stream_callbacks.go | 10 +++++ pkg/exchange/bitget/stream_test.go | 7 ++++ pkg/exchange/bitget/types.go | 29 +++++++++++++- 6 files changed, 127 insertions(+), 1 deletion(-) diff --git a/pkg/exchange/bitget/convert.go b/pkg/exchange/bitget/convert.go index 59f335730..b9d1a1ccb 100644 --- a/pkg/exchange/bitget/convert.go +++ b/pkg/exchange/bitget/convert.go @@ -329,3 +329,15 @@ func toLocalSide(side types.SideType) (v2.SideType, error) { return "", fmt.Errorf("side type %s not supported", side) } } + +func toGlobalBalanceMap(balances []Balance) types.BalanceMap { + bm := types.BalanceMap{} + for _, obj := range balances { + bm[obj.Coin] = types.Balance{ + Currency: obj.Coin, + Available: obj.Available, + Locked: obj.Frozen.Add(obj.Locked), + } + } + return bm +} diff --git a/pkg/exchange/bitget/convert_test.go b/pkg/exchange/bitget/convert_test.go index 470c63f31..19e759aa1 100644 --- a/pkg/exchange/bitget/convert_test.go +++ b/pkg/exchange/bitget/convert_test.go @@ -579,3 +579,22 @@ func Test_toGlobalTrade(t *testing.T) { FeeDiscounted: false, }, res) } + +func Test_toGlobalBalanceMap(t *testing.T) { + assert.Equal(t, types.BalanceMap{ + "BTC": { + Currency: "BTC", + Available: fixedpoint.NewFromFloat(0.5), + Locked: fixedpoint.NewFromFloat(0.6 + 0.7), + }, + }, toGlobalBalanceMap([]Balance{ + { + Coin: "BTC", + Available: fixedpoint.NewFromFloat(0.5), + Frozen: fixedpoint.NewFromFloat(0.6), + Locked: fixedpoint.NewFromFloat(0.7), + LimitAvailable: fixedpoint.Zero, + UTime: types.NewMillisecondTimestampFromInt(1699020564676), + }, + })) +} diff --git a/pkg/exchange/bitget/stream.go b/pkg/exchange/bitget/stream.go index 4636be4db..53e696490 100644 --- a/pkg/exchange/bitget/stream.go +++ b/pkg/exchange/bitget/stream.go @@ -30,6 +30,8 @@ type Stream struct { marketTradeEventCallbacks []func(o MarketTradeEvent) KLineEventCallbacks []func(o KLineEvent) + accountEventCallbacks []func(e AccountEvent) + lastCandle map[string]types.KLine } @@ -51,6 +53,9 @@ func NewStream(key, secret, passphrase string) *Stream { stream.OnBookEvent(stream.handleBookEvent) stream.OnMarketTradeEvent(stream.handleMaretTradeEvent) stream.OnKLineEvent(stream.handleKLineEvent) + + stream.OnAuth(stream.handleAuth) + stream.OnAccountEvent(stream.handleAccountEvent) return stream } @@ -108,6 +113,9 @@ func (s *Stream) dispatchEvent(event interface{}) { if err := e.IsValid(); err != nil { log.Errorf("invalid event: %v", err) } + if e.IsAuthenticated() { + s.EmitAuth() + } case *BookEvent: s.EmitBookEvent(*e) @@ -118,6 +126,9 @@ func (s *Stream) dispatchEvent(event interface{}) { case *KLineEvent: s.EmitKLineEvent(*e) + case *AccountEvent: + s.EmitAccountEvent(*e) + case []byte: // We only handle the 'pong' case. Others are unexpected. if !bytes.Equal(e, pongBytes) { @@ -126,6 +137,22 @@ func (s *Stream) dispatchEvent(event interface{}) { } } +func (s *Stream) handleAuth() { + if err := s.Conn.WriteJSON(WsOp{ + Op: WsEventSubscribe, + Args: []WsArg{ + { + InstType: instSpV2, + Channel: ChannelAccount, + Coin: "default", // default all + }, + }, + }); err != nil { + log.WithError(err).Error("failed to send subscription request") + return + } +} + func (s *Stream) handlerConnect() { if s.PublicOnly { // errors are handled in the syncSubscriptions, so they are skipped here. @@ -236,6 +263,17 @@ func parseEvent(in []byte) (interface{}, error) { ch := event.Arg.Channel switch ch { + case ChannelAccount: + var acct AccountEvent + err = json.Unmarshal(event.Data, &acct.Balances) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data into AccountEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err) + } + + acct.actionType = event.Action + acct.instId = event.Arg.InstId + return &acct, nil + case ChannelOrderBook, ChannelOrderBook5, ChannelOrderBook15: var book BookEvent err = json.Unmarshal(event.Data, &book.Events) @@ -319,3 +357,16 @@ func (s *Stream) handleKLineEvent(k KLineEvent) { s.lastCandle[k.CacheKey()] = kLine } } + +func (s *Stream) handleAccountEvent(m AccountEvent) { + balanceMap := toGlobalBalanceMap(m.Balances) + if len(balanceMap) == 0 { + return + } + + if m.actionType == ActionTypeUpdate { + s.StandardStream.EmitBalanceUpdate(balanceMap) + return + } + s.StandardStream.EmitBalanceSnapshot(balanceMap) +} diff --git a/pkg/exchange/bitget/stream_callbacks.go b/pkg/exchange/bitget/stream_callbacks.go index 82ef7beae..44661171e 100644 --- a/pkg/exchange/bitget/stream_callbacks.go +++ b/pkg/exchange/bitget/stream_callbacks.go @@ -33,3 +33,13 @@ func (s *Stream) EmitKLineEvent(o KLineEvent) { cb(o) } } + +func (s *Stream) OnAccountEvent(cb func(e AccountEvent)) { + s.accountEventCallbacks = append(s.accountEventCallbacks, cb) +} + +func (s *Stream) EmitAccountEvent(e AccountEvent) { + for _, cb := range s.accountEventCallbacks { + cb(e) + } +} diff --git a/pkg/exchange/bitget/stream_test.go b/pkg/exchange/bitget/stream_test.go index d05cbd037..273941e7d 100644 --- a/pkg/exchange/bitget/stream_test.go +++ b/pkg/exchange/bitget/stream_test.go @@ -128,6 +128,13 @@ func TestStream(t *testing.T) { err := s.Connect(context.Background()) assert.NoError(t, err) + s.OnBalanceSnapshot(func(balances types.BalanceMap) { + t.Log("get balances", balances) + }) + s.OnBalanceUpdate(func(balances types.BalanceMap) { + t.Log("get update", balances) + }) + c := make(chan struct{}) <-c }) diff --git a/pkg/exchange/bitget/types.go b/pkg/exchange/bitget/types.go index f6fbfa06c..0d5d5771a 100644 --- a/pkg/exchange/bitget/types.go +++ b/pkg/exchange/bitget/types.go @@ -13,12 +13,14 @@ import ( type InstType string const ( - instSp InstType = "sp" + instSp InstType = "sp" + instSpV2 InstType = "SPOT" ) type ChannelType string const ( + ChannelAccount ChannelType = "account" // ChannelOrderBook snapshot and update might return less than 200 bids/asks as per symbol's orderbook various from // each other; The number of bids/asks is not a fixed value and may vary in the future ChannelOrderBook ChannelType = "books" @@ -34,6 +36,7 @@ type WsArg struct { Channel ChannelType `json:"channel"` // InstId Instrument ID. e.q. BTCUSDT, ETHUSDT InstId string `json:"instId"` + Coin string `json:"coin"` ApiKey string `json:"apiKey"` Passphrase string `json:"passphrase"` @@ -95,6 +98,10 @@ func (w *WsEvent) IsValid() error { } } +func (w *WsEvent) IsAuthenticated() bool { + return w.Event == WsEventLogin && w.Code == 0 +} + type ActionType string const ( @@ -398,3 +405,23 @@ func (k KLineEvent) CacheKey() string { // e.q: candle5m.BTCUSDT return fmt.Sprintf("%s.%s", k.channel, k.instId) } + +type Balance struct { + Coin string `json:"coin"` + Available fixedpoint.Value `json:"available"` + // Amount of frozen assets Usually frozen when the order is placed + Frozen fixedpoint.Value `json:"frozen"` + // Amount of locked assets Locked assests required to become a fiat merchants, etc. + Locked fixedpoint.Value `json:"locked"` + // Restricted availability For spot copy trading + LimitAvailable fixedpoint.Value `json:"limitAvailable"` + UTime types.MillisecondTimestamp `json:"uTime"` +} + +type AccountEvent struct { + Balances []Balance + + // internal use + actionType ActionType + instId string +}