implement orderbook updater

This commit is contained in:
c9s 2020-10-02 21:15:00 +08:00
parent 5619deffb2
commit d29725119f
3 changed files with 198 additions and 4 deletions

View File

@ -6,6 +6,9 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
"github.com/c9s/bbgo/pkg/bbgo/fixedpoint"
"github.com/c9s/bbgo/pkg/bbgo/types"
) )
var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length") var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length")
@ -105,6 +108,28 @@ func (e *BookEvent) Time() time.Time {
return time.Unix(0, e.Timestamp*int64(time.Millisecond)) return time.Unix(0, e.Timestamp*int64(time.Millisecond))
} }
func (e *BookEvent) OrderBook() (snapshot types.OrderBook, err error) {
for _, bid := range e.Bids {
pv, err := bid.PriceVolumePair()
if err != nil {
return snapshot, err
}
snapshot.Bids = append(snapshot.Bids, pv)
}
for _, ask := range e.Asks {
pv, err := ask.PriceVolumePair()
if err != nil {
return snapshot, err
}
snapshot.Asks = append(snapshot.Asks, pv)
}
return snapshot, nil
}
func parseBookEvent(val *fastjson.Value) (*BookEvent, error) { func parseBookEvent(val *fastjson.Value) (*BookEvent, error) {
event := BookEvent{ event := BookEvent{
Event: string(val.GetStringBytes("e")), Event: string(val.GetStringBytes("e")),
@ -136,6 +161,20 @@ type BookEntry struct {
Volume string Volume string
} }
func (e *BookEntry) PriceVolumePair() (pv types.PriceVolumePair, err error) {
pv.Price, err = fixedpoint.NewFromString(e.Price)
if err != nil {
return pv, err
}
pv.Volume, err = fixedpoint.NewFromString(e.Volume)
if err != nil {
return pv, err
}
return pv, err
}
// parseBookEntries parses JSON struct like `[["233330", "0.33"], ....]` // parseBookEntries parses JSON struct like `[["233330", "0.33"], ....]`
func parseBookEntries(vals []*fastjson.Value, side int, t time.Time) (entries []BookEntry, err error) { func parseBookEntries(vals []*fastjson.Value, side int, t time.Time) (entries []BookEntry, err error) {
for _, entry := range vals { for _, entry := range vals {
@ -151,8 +190,8 @@ func parseBookEntries(vals []*fastjson.Value, side int, t time.Time) (entries []
entries = append(entries, BookEntry{ entries = append(entries, BookEntry{
Side: side, Side: side,
Time: t, Time: t,
Price: pv[0].String(), Price: string(pv[0].GetStringBytes()),
Volume: pv[1].String(), Volume: string(pv[1].GetStringBytes()),
}) })
} }
@ -211,5 +250,3 @@ func parseSubscriptionEvent(val *fastjson.Value) (*SubscriptionEvent, error) {
return &event, nil return &event, nil
} }

View File

@ -65,6 +65,20 @@ func NewWebSocketService(wsURL string, key, secret string) *WebSocketService {
} }
func (s *WebSocketService) Connect(ctx context.Context) error { func (s *WebSocketService) Connect(ctx context.Context) error {
s.OnConnect(func(c *websocket.Conn) {
if err := s.SendSubscriptionRequest(SubscribeAction); err != nil {
s.EmitError(err)
logger.WithError(err).Error("failed to subscribe")
}
})
s.OnConnect(func(conn *websocket.Conn) {
if err := s.Auth(); err != nil {
s.EmitError(err)
logger.WithError(err).Error("failed to send auth request")
}
})
// pre-allocate the websocket client, the websocket client can be used for reconnecting. // pre-allocate the websocket client, the websocket client can be used for reconnecting.
if err := s.connect(ctx) ; err != nil { if err := s.connect(ctx) ; err != nil {
return err return err
@ -94,6 +108,9 @@ func (s *WebSocketService) connect(ctx context.Context) error {
s.conn = conn s.conn = conn
s.EmitConnect(conn) s.EmitConnect(conn)
return nil return nil
} }

140
bbgo/types/orderbook.go Normal file
View File

@ -0,0 +1,140 @@
package types
import (
"fmt"
"sort"
"github.com/c9s/bbgo/pkg/bbgo/fixedpoint"
)
type PriceVolumePair struct {
Price fixedpoint.Value
Volume fixedpoint.Value
}
func (p PriceVolumePair) String() string {
return fmt.Sprintf("PriceVolumePair{ price: %f, volume: %f }", p.Price.Float64(), p.Volume.Float64())
}
type PriceVolumePairSlice []PriceVolumePair
func (ps PriceVolumePairSlice) Len() int { return len(ps) }
func (ps PriceVolumePairSlice) Less(i, j int) bool { return ps[i].Price < ps[j].Price }
func (ps PriceVolumePairSlice) Swap(i, j int) { ps[i], ps[j] = ps[j], ps[i] }
// Trim removes the pairs that volume = 0
func (ps PriceVolumePairSlice) Trim() (newps PriceVolumePairSlice) {
for _, pv := range ps {
if pv.Volume > 0 {
newps = append(newps, pv)
}
}
return newps
}
func (ps PriceVolumePairSlice) Copy() PriceVolumePairSlice {
// this is faster than make
return append(ps[:0:0], ps...)
}
func (ps *PriceVolumePairSlice) UpdateOrInsert(newPair PriceVolumePair, descending bool) {
var newps = UpdateOrInsertPriceVolumePair(*ps, newPair, descending)
*ps = newps
}
func (ps *PriceVolumePairSlice) RemoveByPrice(price fixedpoint.Value, descending bool) {
var newps = RemovePriceVolumePair(*ps, price, descending)
*ps = newps
}
// FindPriceVolumePair finds the pair by the given price, this function is a read-only
// operation, so we use the value receiver to avoid copy value from the pointer
// If the price is not found, it will return the index where the price can be inserted at.
// true for descending (bid orders), false for ascending (ask orders)
func FindPriceVolumePair(slice []PriceVolumePair, price fixedpoint.Value, descending bool) (int, PriceVolumePair) {
idx := sort.Search(len(slice), func(i int) bool {
if descending {
return slice[i].Price <= price
}
return slice[i].Price >= price
})
if idx >= len(slice) || slice[idx].Price != price {
return idx, PriceVolumePair{}
}
return idx, slice[idx]
}
//true for descending (bid orders), false for ascending (ask orders)
func UpdateOrInsertPriceVolumePair(slice []PriceVolumePair, pvPair PriceVolumePair, descending bool) []PriceVolumePair {
price := pvPair.Price
if len(slice) == 0 {
return append(slice, pvPair)
}
idx, _ := FindPriceVolumePair(slice, price, descending)
if idx >= len(slice) || slice[idx].Price != price {
return InsertPriceVolumePairAt(slice, pvPair, idx)
} else {
slice[idx].Volume = pvPair.Volume
return slice
}
}
func InsertPriceVolumePairAt(slice []PriceVolumePair, pvPair PriceVolumePair, idx int) []PriceVolumePair {
rear := append([]PriceVolumePair{}, slice[idx:]...)
slice = append(slice[:idx], pvPair)
return append(slice, rear...)
}
func RemovePriceVolumePair(slice []PriceVolumePair, price fixedpoint.Value, descending bool) []PriceVolumePair {
idx, matched := FindPriceVolumePair(slice, price, descending)
if matched.Price != price {
return slice
}
return append(slice[:idx], slice[idx+1:]...)
}
type OrderBook struct {
Symbol string
Bids PriceVolumePairSlice
Asks PriceVolumePairSlice
}
func (b *OrderBook) UpdateAsks(pvs PriceVolumePairSlice) {
for _, pv := range pvs {
b.Asks.UpdateOrInsert(pv, false)
}
}
func (b *OrderBook) UpdateBids(pvs PriceVolumePairSlice) {
for _, pv := range pvs {
b.Bids.UpdateOrInsert(pv, true)
}
}
func (b *OrderBook) Load(book OrderBook) {
b.Bids = nil
b.Asks = nil
b.Update(book)
}
func (b *OrderBook) Update(book OrderBook) {
b.UpdateBids(book.Bids)
b.UpdateAsks(book.Asks)
}
func (b *OrderBook) Print() {
fmt.Printf("BOOK %s\n", b.Symbol)
fmt.Printf("ASKS:\n")
for i := len(b.Asks) - 1 ; i >= 0 ; i-- {
fmt.Printf("- ASK: %s\n", b.Asks[i].String())
}
fmt.Printf("BIDS:\n")
for _, bid := range b.Bids {
fmt.Printf("- BID: %s\n", bid.String())
}
}