trade/pkg/ctl/trade.go
2024-06-26 00:44:05 +08:00

170 lines
3.7 KiB
Go

package ctl
import (
"errors"
"fmt"
"git.qtrade.icu/coin-quant/trade/pkg/event"
"git.qtrade.icu/coin-quant/trade/pkg/process/exchange"
"git.qtrade.icu/coin-quant/trade/pkg/process/goscript"
"git.qtrade.icu/coin-quant/trade/pkg/process/notify"
"git.qtrade.icu/coin-quant/trade/pkg/process/rpt"
"sync"
"time"
. "git.qtrade.icu/coin-quant/trade/pkg/core"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
var (
cfg *viper.Viper
)
func SetConfig(c *viper.Viper) {
cfg = c
}
// Trade trade with multi scripts
type Trade struct {
exchangeType string
exchangeName string
symbol string
running bool
stop chan bool
rpt rpt.Reporter
proc *event.Processers
engine *goscript.GoEngine
wg sync.WaitGroup
loadRecent time.Duration
}
// NewTrade constructor of Trade
func NewTrade(exchange, symbol string) (b *Trade, err error) {
b = new(Trade)
b.exchangeName = exchange
b.symbol = symbol
b.exchangeType = cfg.GetString(fmt.Sprintf("exchanges.%s.name", b.exchangeName))
gEngine, err := goscript.NewGoEngine(symbol)
if err != nil {
return
}
b.engine = gEngine
b.loadRecent = time.Hour * 24
return
}
func (b *Trade) SetLoadRecent(recent time.Duration) {
b.loadRecent = recent
}
func (b *Trade) SetStatusCh(ch chan *goscript.Status) {
b.engine.SetStatusCh(ch)
}
func (b *Trade) SetReporter(rpt rpt.Reporter) {
b.rpt = rpt
}
func (b *Trade) AddScript(name, scriptFile, param string) (err error) {
err = b.engine.AddScript(name, scriptFile, param)
return
}
func (b *Trade) ScriptCount() int {
return b.engine.ScriptCount()
}
func (b *Trade) RemoveScript(name string) (err error) {
if !b.running {
err = errors.New("Trade is not working,must start it first")
return
}
err = b.engine.RemoveScript(name)
return
}
// Start start backtest
func (b *Trade) Start() (err error) {
if b.running {
return
}
b.running = true
err = b.init()
if err != nil {
b.running = false
return
}
b.wg.Add(1)
go b.Run()
return
}
// Stop stop backtest
func (b *Trade) Stop() (err error) {
b.proc.Stop()
b.stop <- true
return
}
func (b *Trade) init() (err error) {
b.stop = make(chan bool)
param := event.NewBaseProcesser("param")
ex, err := exchange.GetTradeExchange(b.exchangeType, cfg, b.exchangeName, b.symbol)
if err != nil {
err = fmt.Errorf("creat exchange trade %s failed:%s", b.exchangeName, err.Error())
return
}
notify, err := notify.NewNotify(cfg)
if err != nil {
log.Errorf("creat notify failed:%s", err.Error())
err = nil
}
b.proc = event.NewProcessers()
procs := []event.Processer{param, ex, b.engine}
if notify != nil {
procs = append(procs, notify)
}
if b.rpt != nil {
r := rpt.NewRpt(b.rpt)
procs = append(procs, r)
}
err = b.proc.Adds(procs...)
if err != nil {
log.Error("add processers error:", err.Error())
return
}
err = b.proc.Start()
if err != nil {
log.Error("start processers error:", err.Error())
return
}
candleParam := CandleParam{
Start: time.Now().Add(-1 * b.loadRecent),
Symbol: b.symbol,
BinSize: "1m",
}
log.Info("real trade candle param:", candleParam)
param.Send("candle", EventWatch, NewWatchCandle(&candleParam))
log.Info("real trade watch trade_market")
param.Send("trade", EventWatch, &WatchParam{Type: EventTradeMarket, Extra: b.symbol, Data: map[string]interface{}{"name": "market"}})
log.Info("real trade watch depth")
param.Send("depth", EventWatch, &WatchParam{Type: EventDepth, Extra: b.symbol, Data: map[string]interface{}{"name": "depth"}})
return
}
func (b *Trade) Wait() (err error) {
b.wg.Wait()
return
}
func (b *Trade) Run() (err error) {
defer b.wg.Done()
// TODO wait for finish
<-b.stop
b.proc.WaitClose(time.Second * 10)
return
}