exchange/okex/okex_ws_public.go
2024-06-26 00:59:56 +08:00

313 lines
7.1 KiB
Go

package okex
import (
"errors"
"fmt"
"github.com/bitly/go-simplejson"
"strconv"
"time"
"git.qtrade.icu/coin-quant/exchange/ws"
log "github.com/sirupsen/logrus"
. "git.qtrade.icu/coin-quant/trademodel"
)
func (b *OkxTrader) runPublic() (err error) {
b.wsPublic, err = ws.NewWSConn(WSOkexPUbilc, func(ws *ws.WSConn) error {
// watch when reconnect
for _, v := range b.watchPublics {
err1 := ws.WriteMsg(v)
if err1 != nil {
return err1
}
}
return nil
}, b.parsePublicMsg)
return
}
func (b *OkxTrader) parsePublicMsg(message []byte) (err error) {
sj, err := simplejson.NewJson(message)
if err != nil {
log.Warnf("parse json error:%s", string(message))
return
}
_, ok := sj.CheckGet("event")
if ok {
return
}
arg, ok := sj.CheckGet("arg")
if !ok {
return
}
channelValue, ok := arg.CheckGet("channel")
if !ok {
return
}
symbol, err := arg.Get("instId").String()
if err != nil {
log.Warnf("okex public ws message instId not found %s, %s", string(message), err.Error())
}
_ = symbol
channel, err := channelValue.String()
if err != nil {
log.Warnf("channelValue %v is not string error:%s", channelValue, err.Error())
return
}
switch channel {
case "books5":
value, ok := sj.CheckGet("data")
if !ok {
return
}
if b.depthCb == nil {
return
}
var depths5 []*Depth
depths5, err = parseOkexBooks5(value)
if err != nil {
log.Warnf("parseOkexBooks5 failed:%s, %s", string(message), err.Error())
return
}
for _, v := range depths5 {
b.depthCb(v)
}
case "trades":
value, ok := sj.CheckGet("data")
if !ok {
return
}
if b.tradeMarketCb == nil {
return
}
var trades []*Trade
trades, err = parseOkexTrade(value)
if err != nil {
log.Warnf("parseOkexTrade failed:%s, %s", string(message), err.Error())
return
}
for _, v := range trades {
b.tradeMarketCb(v)
}
case "candle1m":
value, ok := sj.CheckGet("data")
if !ok {
return
}
// fmt.Println("candle1m:", string(message))
var candles []*Candle
candles, err = parseWsCandle(value)
if err != nil {
log.Warnf("parseWsCandle failed:%s, %s", string(message), err.Error())
return
}
if len(candles) == 0 {
return
} else if len(candles) != 1 {
log.Warnf("parseWsCandle candles len warn :%d, %v", len(candles), err)
}
if b.klineCb == nil {
return
}
b.klineCb(candles[0])
default:
}
return
}
// books5 {"arg":{"channel":"books5","instId":"BSV-USD-210924"},"data":[{"asks":[["166.35","20","0","1"],["166.4","135","0","2"],["166.42","86","0","1"],["166.45","310","0","1"],["166.46","61","0","2"]],"bids":[["166.14","33","0","1"],["166.07","106","0","1"],["166.05","2","0","1"],["166.04","97","0","1"],["165.98","20","0","1"]],"instId":"BSV-USD-210924","ts":"1621862688397"}]}
func parseOkexBooks5(sj *simplejson.Json) (depths []*Depth, err error) {
arr, err := sj.Array()
if err != nil {
err = fmt.Errorf("parseOkexBooks5 not array: %w", err)
return
}
var obj, asks, bids *simplejson.Json
var askInfo, bidInfo []DepthInfo
var tsStr string
var nTs int64
for k := range arr {
obj = sj.GetIndex(k)
asks = obj.Get("asks")
bids = obj.Get("bids")
if asks == nil || bids == nil {
err = errors.New("data error")
return
}
askInfo, err = parseOrderBook(asks)
if err != nil {
return
}
bidInfo, err = parseOrderBook(bids)
if err != nil {
return
}
tsStr, err = obj.Get("ts").String()
if err != nil {
return
}
dp := Depth{Buys: bidInfo, Sells: askInfo}
nTs, err = strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return
}
dp.UpdateTime = time.Unix(nTs/1000, (nTs%1000)*int64(time.Millisecond))
depths = append(depths, &dp)
}
return
}
func parseOrderBook(sj *simplejson.Json) (datas []DepthInfo, err error) {
arr, err := sj.Array()
if err != nil {
return
}
var ret []interface{}
var ok bool
for _, v := range arr {
ret, ok = v.([]interface{})
if !ok {
err = errors.New("parseOrderBook error")
return
}
if len(ret) != 4 {
err = errors.New("parseOrderBook len error")
return
}
di := DepthInfo{Price: getFloat(ret[0]), Amount: getFloat(ret[1])}
datas = append(datas, di)
}
return
}
func parseOkexTrade(sj *simplejson.Json) (trades []*Trade, err error) {
arr, err := sj.Array()
if err != nil {
return
}
// {"instId":"BSV-USD-210924","tradeId":"1957771","px":"166.5","sz":"22","side":"sell","ts":"1621862533713"}
var temp map[string]interface{}
var ok bool
for _, v := range arr {
temp, ok = v.(map[string]interface{})
if !ok {
log.Warnf("data error:%#v", temp)
continue
}
var t Trade
t.Remark = getStr(temp, "instId")
t.ID = getStr(temp, "tradeId")
t.Side = getStr(temp, "side")
t.Price = getStrFloat(temp, "px")
t.Amount = getStrFloat(temp, "sz")
nTs := getStrTs(temp, "ts")
t.Time = time.Unix(nTs/1000, int64(time.Millisecond)*(nTs%1000))
trades = append(trades, &t)
}
return
}
func getStrTs(dict map[string]interface{}, key string) (nTs int64) {
str := getStr(dict, key)
if str == "" {
log.Warnf("getStrTs of %s empty", key)
return
}
nTs, err := strconv.ParseInt(str, 10, 64)
if err != nil {
log.Warnf("getStrTs of %s parse int err: %s", key, err.Error())
return
}
return nTs
// t = time.Unix(nTs/1000, (nTs%1000)*int64(time.Millisecond))
return
}
func getStrFloat(dict map[string]interface{}, key string) float64 {
str := getStr(dict, key)
if str == "" {
log.Warnf("getStrFloat of %s empty", key)
return 0
}
f, err := strconv.ParseFloat(str, 64)
if err != nil {
log.Warnf("getStrFloat of %s failed: %s", key, err.Error())
return 0
}
return f
}
func getStr(dict map[string]interface{}, key string) string {
v, ok := dict[key]
if !ok {
log.Warnf("getStr of %s empty", key)
return ""
}
str, ok := v.(string)
if !ok {
log.Warnf("getStr of %s type error: %#v", key, v)
return ""
}
return str
}
func getFloat(v interface{}) float64 {
str, ok := v.(string)
if !ok {
return 0
}
f, err := strconv.ParseFloat(str, 64)
if err != nil {
log.Warnf("getFloat %#v failed: %s", v, err.Error())
return 0
}
return f
}
func parseWsCandle(sj *simplejson.Json) (ret []*Candle, err error) {
arr, err := sj.Array()
if err != nil {
return
}
// {"instId":"BSV-USD-210924","tradeId":"1957771","px":"166.5","sz":"22","side":"sell","ts":"1621862533713"}
var values []interface{}
var ok bool
var nTs int64
for _, v := range arr {
values, ok = v.([]interface{})
if !ok {
log.Warnf("transWsCandle data error:%#v", values)
continue
}
if len(values) != 9 {
log.Warnf("transWsCandle data len error:%#v", values)
continue
}
nTs, err = strconv.ParseInt(values[0].(string), 10, 64)
if err != nil {
panic(fmt.Sprintf("trans candle error: %#v", values))
return
}
// unfinished kline
if values[8].(string) == "0" {
continue
}
temp := Candle{
ID: 0,
Start: nTs / 1000,
Open: parseFloat(values[1].(string)),
High: parseFloat(values[2].(string)),
Low: parseFloat(values[3].(string)),
Close: parseFloat(values[4].(string)),
Volume: parseFloat(values[5].(string)),
Turnover: parseFloat(values[7].(string)),
}
ret = append(ret, &temp)
}
return
}