策略:纯信号模式(不定期换仓,止损止盈卖出后因子选股补仓) 股票池:沪深300+中证500(800+只) 止损-8% / 止盈+25% / 单只20% / 最多5只 5年回测:+371.7%,夏普0.82,年化21.3% 组件: - engine.py: 核心交易引擎 + 因子评分 + 数据管理 - scheduler.py: APScheduler定时调度 + HTTP状态接口 - trade_tool.py: 命令行工具 - config.json: 策略参数配置 - Dockerfile + docker-compose.yml: 容器化部署 日志系统: - 文件日志(按日轮转,保留90天) - SQLite: trades/daily_log/signal_log/system_log
326 lines
11 KiB
Python
326 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
定时调度器
|
||
=========
|
||
- 工作日 09:00 盘前准备
|
||
- 工作日 09:30 开盘检查
|
||
- 工作日 每30分钟 止损止盈检查 + 补仓
|
||
- 工作日 15:10 日终报告
|
||
- 每周六 10:00 周报
|
||
|
||
同时启动HTTP状态接口(8888端口)
|
||
"""
|
||
|
||
import os, sys, json, logging, traceback
|
||
from datetime import datetime, timedelta
|
||
|
||
from apscheduler.schedulers.blocking import BlockingScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
APP_DIR = os.path.dirname(BASE_DIR)
|
||
LOG_DIR = os.path.join(APP_DIR, 'logs')
|
||
DATA_DIR = os.path.join(APP_DIR, 'data')
|
||
CONFIG_DIR = os.path.join(APP_DIR, 'config')
|
||
sys.path.insert(0, BASE_DIR)
|
||
|
||
from logging.handlers import TimedRotatingFileHandler
|
||
|
||
# ─── 日志 ───
|
||
fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
|
||
log = logging.getLogger('scheduler')
|
||
log.setLevel(logging.INFO)
|
||
ch = logging.StreamHandler(sys.stdout)
|
||
ch.setFormatter(fmt)
|
||
log.addHandler(ch)
|
||
fh = TimedRotatingFileHandler(os.path.join(LOG_DIR, 'scheduler.log'), when='midnight', backupCount=90, encoding='utf-8')
|
||
fh.setFormatter(fmt)
|
||
log.addHandler(fh)
|
||
|
||
# ─── 引擎 ───
|
||
from engine import TradingEngine, load_config
|
||
|
||
cfg = load_config()
|
||
engine = TradingEngine(cfg)
|
||
|
||
# ─── 调度任务 ───
|
||
|
||
def is_trading_day():
|
||
"""简单判断是否交易日(周一~周五)"""
|
||
return datetime.now().weekday() < 5
|
||
|
||
|
||
def job_pre_market():
|
||
"""盘前准备"""
|
||
if not is_trading_day():
|
||
return
|
||
log.info("━━━ 盘前准备 ━━━")
|
||
try:
|
||
# 刷新股票池缓存
|
||
engine.dm.get_pool_codes()
|
||
log.info("✅ 盘前准备完成")
|
||
except Exception as e:
|
||
log.error(f"盘前准备失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
|
||
def job_market_check():
|
||
"""盘中检查(止损止盈 + 补仓)"""
|
||
if not is_trading_day():
|
||
return
|
||
now = datetime.now()
|
||
if now.hour < 9 or now.hour >= 15:
|
||
return
|
||
if now.hour == 9 and now.minute < 30:
|
||
return
|
||
|
||
log.info(f"━━━ 盘中检查 {now.strftime('%H:%M')} ━━━")
|
||
try:
|
||
date = now.strftime('%Y%m%d')
|
||
|
||
# 获取今日价格(批量)
|
||
daily = engine.dm.get_daily_batch(date)
|
||
|
||
# 更新持仓价格
|
||
updated = 0
|
||
for code in engine.state['positions']:
|
||
if code in daily:
|
||
p = daily[code]['close']
|
||
if p > 0:
|
||
engine.state['positions'][code]['current_price'] = p
|
||
updated += 1
|
||
log.info(f"更新持仓价格: {updated}/{len(engine.state['positions'])}只")
|
||
|
||
# 止损止盈检查
|
||
prices = {code: d['close'] for code, d in daily.items() if d['close'] > 0}
|
||
triggered = engine.check_stop_loss_take_profit(date, prices)
|
||
|
||
for reason, code, price, pnl_pct in triggered:
|
||
r = '止损' if reason == 'stop_loss' else '止盈'
|
||
engine.sell(code, price, date, f'{r}{pnl_pct:+.1f}%')
|
||
|
||
# 如果有卖出,尝试补仓
|
||
if triggered:
|
||
log.info("触发卖出,准备补仓扫描...")
|
||
# 加载因子所需数据(最近300天)
|
||
stock_data, trade_dates = engine.dm.load_all_history(days=300)
|
||
if date in trade_dates:
|
||
code_date_idx = {}
|
||
for code, sd in stock_data.items():
|
||
code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])}
|
||
|
||
buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices)
|
||
for code, name, price, qty, score in buy_list:
|
||
engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score)
|
||
|
||
day_idx = trade_dates.index(date)
|
||
engine.state['last_scan_date'] = date
|
||
engine.state['last_scan_idx'] = day_idx
|
||
|
||
# 补仓检查(非触发情况下也要看间隔)
|
||
else:
|
||
last_scan = engine.state.get('last_scan_idx', -engine.scan_interval)
|
||
if engine.state['positions']:
|
||
# 有持仓但可能需要扫描
|
||
pass
|
||
elif not engine.state['positions']:
|
||
# 空仓,尝试建仓
|
||
stock_data, trade_dates = engine.dm.load_all_history(days=300)
|
||
if date in trade_dates:
|
||
code_date_idx = {}
|
||
for code, sd in stock_data.items():
|
||
code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])}
|
||
|
||
buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices)
|
||
for code, name, price, qty, score in buy_list:
|
||
engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score)
|
||
|
||
day_idx = trade_dates.index(date)
|
||
engine.state['last_scan_date'] = date
|
||
engine.state['last_scan_idx'] = day_idx
|
||
|
||
engine.save_state()
|
||
|
||
nav = engine.get_nav()
|
||
log.info(f"📊 净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%) | "
|
||
f"持仓 {nav['holdings_count']}只")
|
||
|
||
except Exception as e:
|
||
log.error(f"盘中检查失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
|
||
def job_daily_report():
|
||
"""日终报告"""
|
||
if not is_trading_day():
|
||
return
|
||
log.info("━━━ 日终报告 ━━━")
|
||
try:
|
||
date = datetime.now().strftime('%Y%m%d')
|
||
|
||
# 获取收盘价更新
|
||
daily = engine.dm.get_daily_batch(date)
|
||
for code in engine.state['positions']:
|
||
if code in daily:
|
||
p = daily[code]['close']
|
||
if p > 0:
|
||
engine.state['positions'][code]['current_price'] = p
|
||
|
||
nav = engine.get_nav()
|
||
engine._log_daily(date, nav)
|
||
engine.save_state()
|
||
|
||
report = engine.get_report()
|
||
log.info(f"\n{report}")
|
||
|
||
# 写入今日报告文件
|
||
report_file = os.path.join(LOG_DIR, f"daily_{date}.txt")
|
||
with open(report_file, 'w') as f:
|
||
f.write(report)
|
||
|
||
except Exception as e:
|
||
log.error(f"日终报告失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
|
||
def job_weekly_review():
|
||
"""周报"""
|
||
log.info("━━━ 周报 ━━━")
|
||
try:
|
||
db_path = engine._db_file()
|
||
import sqlite3
|
||
db = sqlite3.connect(db_path)
|
||
|
||
# 本周交易统计
|
||
week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime('%Y%m%d')
|
||
trades = db.execute(
|
||
'SELECT direction, COUNT(*), SUM(pnl) FROM trades WHERE date >= ? GROUP BY direction',
|
||
(week_start,)
|
||
).fetchall()
|
||
|
||
# 净值曲线
|
||
navs = db.execute(
|
||
'SELECT date, total_nav, return_pct FROM daily_log WHERE date >= ? ORDER BY date',
|
||
(week_start,)
|
||
).fetchall()
|
||
|
||
db.close()
|
||
|
||
nav = engine.get_nav()
|
||
log.info(f"本周净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)")
|
||
|
||
for direction, count, pnl in trades:
|
||
log.info(f" {direction}: {count}笔, 盈亏 {pnl or 0:+,.0f}")
|
||
|
||
if navs:
|
||
log.info(f" 净值变化: ¥{navs[0][1]:,.0f} → ¥{navs[-1][1]:,.0f}")
|
||
|
||
except Exception as e:
|
||
log.error(f"周报失败: {e}")
|
||
|
||
|
||
# ─── HTTP状态接口 ───
|
||
|
||
def start_http_server():
|
||
"""简易HTTP状态接口"""
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
import json
|
||
|
||
class StatusHandler(BaseHTTPRequestHandler):
|
||
def do_GET(self):
|
||
if self.path == '/status' or self.path == '/':
|
||
nav = engine.get_nav()
|
||
positions = []
|
||
for code, pos in engine.state['positions'].items():
|
||
p = pos.get('current_price', pos['avg_cost'])
|
||
positions.append({
|
||
'code': code, 'name': pos['name'],
|
||
'qty': pos['qty'], 'avg_cost': pos['avg_cost'],
|
||
'current_price': p,
|
||
'pnl_pct': round((p / pos['avg_cost'] - 1) * 100, 2),
|
||
})
|
||
data = {
|
||
'status': 'running',
|
||
'time': datetime.now().isoformat(),
|
||
'nav': nav,
|
||
'positions': positions,
|
||
'trade_count': engine.state.get('trade_count', 0),
|
||
}
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
||
self.end_headers()
|
||
self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode())
|
||
elif self.path == '/report':
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||
self.end_headers()
|
||
self.wfile.write(engine.get_report().encode())
|
||
else:
|
||
self.send_response(404)
|
||
self.end_headers()
|
||
|
||
def log_message(self, format, *args):
|
||
pass # 静默HTTP日志
|
||
|
||
server = HTTPServer(('0.0.0.0', 8888), StatusHandler)
|
||
server.timeout = 1
|
||
return server
|
||
|
||
|
||
# ─── 主入口 ───
|
||
|
||
if __name__ == '__main__':
|
||
log.info("=" * 60)
|
||
log.info("🚀 纯信号交易系统启动")
|
||
log.info("=" * 60)
|
||
|
||
# 打印当前状态
|
||
nav = engine.get_nav()
|
||
log.info(f"初始资金: ¥{engine.initial_cash:,.0f}")
|
||
log.info(f"当前净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)")
|
||
log.info(f"持仓: {nav['holdings_count']}只")
|
||
|
||
# 启动HTTP
|
||
http_server = start_http_server()
|
||
log.info("HTTP状态接口: http://0.0.0.0:8888/status")
|
||
|
||
# 调度器
|
||
sched = BlockingScheduler()
|
||
|
||
# 盘前准备(工作日 09:00)
|
||
sched.add_job(job_pre_market, CronTrigger(day_of_week='mon-fri', hour=9, minute=0),
|
||
id='pre_market', name='盘前准备')
|
||
|
||
# 盘中检查(工作日 9:30~15:00,每30分钟)
|
||
for h, m in [(9,30), (10,0), (10,30), (11,0), (11,30), (13,0), (13,30), (14,0), (14,30)]:
|
||
sched.add_job(job_market_check, CronTrigger(day_of_week='mon-fri', hour=h, minute=m),
|
||
id=f'check_{h}{m:02d}', name=f'盘中检查{h}:{m:02d}')
|
||
|
||
# 日终报告(工作日 15:10)
|
||
sched.add_job(job_daily_report, CronTrigger(day_of_week='mon-fri', hour=15, minute=10),
|
||
id='daily_report', name='日终报告')
|
||
|
||
# 周报(周六 10:00)
|
||
sched.add_job(job_weekly_review, CronTrigger(day_of_week='sat', hour=10, minute=0),
|
||
id='weekly_review', name='周报')
|
||
|
||
log.info("调度任务:")
|
||
for job in sched.get_jobs():
|
||
log.info(f" {job.name}: {job.trigger}")
|
||
|
||
# 集成HTTP到调度循环
|
||
import threading
|
||
def run_http():
|
||
while True:
|
||
http_server.handle_request()
|
||
|
||
http_thread = threading.Thread(target=run_http, daemon=True)
|
||
http_thread.start()
|
||
|
||
# 启动调度
|
||
try:
|
||
sched.start()
|
||
except (KeyboardInterrupt, SystemExit):
|
||
log.info("系统关闭")
|
||
http_server.shutdown()
|