Files
openclaw-trading/scripts/scheduler.py
OpenClaw Trading 4ae8061172 初始提交:纯信号量化交易系统 v1.0
策略:纯信号模式(不定期换仓,止损止盈卖出后因子选股补仓)
股票池:沪深300+中证500(800+只)
止损-8% / 止盈+25% / 单只20% / 最多5只
5年回测:+371.7%,夏普0.82,年化21.3%

组件:
- engine.py: 核心交易引擎 + 因子评分 + 数据管理
- scheduler.py: APScheduler定时调度 + HTTP状态接口
- trade_tool.py: 命令行工具
- config.json: 策略参数配置
- Dockerfile + docker-compose.yml: 容器化部署

日志系统:
- 文件日志(按日轮转,保留90天)
- SQLite: trades/daily_log/signal_log/system_log
2026-04-05 16:59:50 +08:00

326 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
定时调度器
=========
- 工作日 09:00 盘前准备
- 工作日 09:30 开盘检查
- 工作日 每30分钟 止损止盈检查 + 补仓
- 工作日 15:10 日终报告
- 每周六 10:00 周报
同时启动HTTP状态接口8888端口
"""
import os, sys, json, logging, traceback
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
APP_DIR = os.path.dirname(BASE_DIR)
LOG_DIR = os.path.join(APP_DIR, 'logs')
DATA_DIR = os.path.join(APP_DIR, 'data')
CONFIG_DIR = os.path.join(APP_DIR, 'config')
sys.path.insert(0, BASE_DIR)
from logging.handlers import TimedRotatingFileHandler
# ─── 日志 ───
fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger('scheduler')
log.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(fmt)
log.addHandler(ch)
fh = TimedRotatingFileHandler(os.path.join(LOG_DIR, 'scheduler.log'), when='midnight', backupCount=90, encoding='utf-8')
fh.setFormatter(fmt)
log.addHandler(fh)
# ─── 引擎 ───
from engine import TradingEngine, load_config
cfg = load_config()
engine = TradingEngine(cfg)
# ─── 调度任务 ───
def is_trading_day():
"""简单判断是否交易日(周一~周五)"""
return datetime.now().weekday() < 5
def job_pre_market():
"""盘前准备"""
if not is_trading_day():
return
log.info("━━━ 盘前准备 ━━━")
try:
# 刷新股票池缓存
engine.dm.get_pool_codes()
log.info("✅ 盘前准备完成")
except Exception as e:
log.error(f"盘前准备失败: {e}")
traceback.print_exc()
def job_market_check():
"""盘中检查(止损止盈 + 补仓)"""
if not is_trading_day():
return
now = datetime.now()
if now.hour < 9 or now.hour >= 15:
return
if now.hour == 9 and now.minute < 30:
return
log.info(f"━━━ 盘中检查 {now.strftime('%H:%M')} ━━━")
try:
date = now.strftime('%Y%m%d')
# 获取今日价格(批量)
daily = engine.dm.get_daily_batch(date)
# 更新持仓价格
updated = 0
for code in engine.state['positions']:
if code in daily:
p = daily[code]['close']
if p > 0:
engine.state['positions'][code]['current_price'] = p
updated += 1
log.info(f"更新持仓价格: {updated}/{len(engine.state['positions'])}")
# 止损止盈检查
prices = {code: d['close'] for code, d in daily.items() if d['close'] > 0}
triggered = engine.check_stop_loss_take_profit(date, prices)
for reason, code, price, pnl_pct in triggered:
r = '止损' if reason == 'stop_loss' else '止盈'
engine.sell(code, price, date, f'{r}{pnl_pct:+.1f}%')
# 如果有卖出,尝试补仓
if triggered:
log.info("触发卖出,准备补仓扫描...")
# 加载因子所需数据最近300天
stock_data, trade_dates = engine.dm.load_all_history(days=300)
if date in trade_dates:
code_date_idx = {}
for code, sd in stock_data.items():
code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])}
buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices)
for code, name, price, qty, score in buy_list:
engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score)
day_idx = trade_dates.index(date)
engine.state['last_scan_date'] = date
engine.state['last_scan_idx'] = day_idx
# 补仓检查(非触发情况下也要看间隔)
else:
last_scan = engine.state.get('last_scan_idx', -engine.scan_interval)
if engine.state['positions']:
# 有持仓但可能需要扫描
pass
elif not engine.state['positions']:
# 空仓,尝试建仓
stock_data, trade_dates = engine.dm.load_all_history(days=300)
if date in trade_dates:
code_date_idx = {}
for code, sd in stock_data.items():
code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])}
buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices)
for code, name, price, qty, score in buy_list:
engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score)
day_idx = trade_dates.index(date)
engine.state['last_scan_date'] = date
engine.state['last_scan_idx'] = day_idx
engine.save_state()
nav = engine.get_nav()
log.info(f"📊 净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%) | "
f"持仓 {nav['holdings_count']}")
except Exception as e:
log.error(f"盘中检查失败: {e}")
traceback.print_exc()
def job_daily_report():
"""日终报告"""
if not is_trading_day():
return
log.info("━━━ 日终报告 ━━━")
try:
date = datetime.now().strftime('%Y%m%d')
# 获取收盘价更新
daily = engine.dm.get_daily_batch(date)
for code in engine.state['positions']:
if code in daily:
p = daily[code]['close']
if p > 0:
engine.state['positions'][code]['current_price'] = p
nav = engine.get_nav()
engine._log_daily(date, nav)
engine.save_state()
report = engine.get_report()
log.info(f"\n{report}")
# 写入今日报告文件
report_file = os.path.join(LOG_DIR, f"daily_{date}.txt")
with open(report_file, 'w') as f:
f.write(report)
except Exception as e:
log.error(f"日终报告失败: {e}")
traceback.print_exc()
def job_weekly_review():
"""周报"""
log.info("━━━ 周报 ━━━")
try:
db_path = engine._db_file()
import sqlite3
db = sqlite3.connect(db_path)
# 本周交易统计
week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime('%Y%m%d')
trades = db.execute(
'SELECT direction, COUNT(*), SUM(pnl) FROM trades WHERE date >= ? GROUP BY direction',
(week_start,)
).fetchall()
# 净值曲线
navs = db.execute(
'SELECT date, total_nav, return_pct FROM daily_log WHERE date >= ? ORDER BY date',
(week_start,)
).fetchall()
db.close()
nav = engine.get_nav()
log.info(f"本周净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)")
for direction, count, pnl in trades:
log.info(f" {direction}: {count}笔, 盈亏 {pnl or 0:+,.0f}")
if navs:
log.info(f" 净值变化: ¥{navs[0][1]:,.0f} → ¥{navs[-1][1]:,.0f}")
except Exception as e:
log.error(f"周报失败: {e}")
# ─── HTTP状态接口 ───
def start_http_server():
"""简易HTTP状态接口"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class StatusHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/status' or self.path == '/':
nav = engine.get_nav()
positions = []
for code, pos in engine.state['positions'].items():
p = pos.get('current_price', pos['avg_cost'])
positions.append({
'code': code, 'name': pos['name'],
'qty': pos['qty'], 'avg_cost': pos['avg_cost'],
'current_price': p,
'pnl_pct': round((p / pos['avg_cost'] - 1) * 100, 2),
})
data = {
'status': 'running',
'time': datetime.now().isoformat(),
'nav': nav,
'positions': positions,
'trade_count': engine.state.get('trade_count', 0),
}
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode())
elif self.path == '/report':
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(engine.get_report().encode())
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # 静默HTTP日志
server = HTTPServer(('0.0.0.0', 8888), StatusHandler)
server.timeout = 1
return server
# ─── 主入口 ───
if __name__ == '__main__':
log.info("=" * 60)
log.info("🚀 纯信号交易系统启动")
log.info("=" * 60)
# 打印当前状态
nav = engine.get_nav()
log.info(f"初始资金: ¥{engine.initial_cash:,.0f}")
log.info(f"当前净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)")
log.info(f"持仓: {nav['holdings_count']}")
# 启动HTTP
http_server = start_http_server()
log.info("HTTP状态接口: http://0.0.0.0:8888/status")
# 调度器
sched = BlockingScheduler()
# 盘前准备(工作日 09:00
sched.add_job(job_pre_market, CronTrigger(day_of_week='mon-fri', hour=9, minute=0),
id='pre_market', name='盘前准备')
# 盘中检查(工作日 9:30~15:00每30分钟
for h, m in [(9,30), (10,0), (10,30), (11,0), (11,30), (13,0), (13,30), (14,0), (14,30)]:
sched.add_job(job_market_check, CronTrigger(day_of_week='mon-fri', hour=h, minute=m),
id=f'check_{h}{m:02d}', name=f'盘中检查{h}:{m:02d}')
# 日终报告(工作日 15:10
sched.add_job(job_daily_report, CronTrigger(day_of_week='mon-fri', hour=15, minute=10),
id='daily_report', name='日终报告')
# 周报(周六 10:00
sched.add_job(job_weekly_review, CronTrigger(day_of_week='sat', hour=10, minute=0),
id='weekly_review', name='周报')
log.info("调度任务:")
for job in sched.get_jobs():
log.info(f" {job.name}: {job.trigger}")
# 集成HTTP到调度循环
import threading
def run_http():
while True:
http_server.handle_request()
http_thread = threading.Thread(target=run_http, daemon=True)
http_thread.start()
# 启动调度
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
log.info("系统关闭")
http_server.shutdown()