Files

326 lines
11 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
定时调度器
=========
- 工作日 09:00 盘前准备
- 工作日 09:30 开盘检查
- 工作日 每30分钟 止损止盈检查 + 补仓
- 工作日 15:10 日终报告
- 每周六 10:00 周报
同时启动HTTP状态接口8888端口
"""
import os, sys, json, logging, traceback
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
APP_DIR = os.path.dirname(BASE_DIR)
LOG_DIR = os.path.join(APP_DIR, 'logs')
DATA_DIR = os.path.join(APP_DIR, 'data')
CONFIG_DIR = os.path.join(APP_DIR, 'config')
sys.path.insert(0, BASE_DIR)
from logging.handlers import TimedRotatingFileHandler
# ─── 日志 ───
fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger('scheduler')
log.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(fmt)
log.addHandler(ch)
fh = TimedRotatingFileHandler(os.path.join(LOG_DIR, 'scheduler.log'), when='midnight', backupCount=90, encoding='utf-8')
fh.setFormatter(fmt)
log.addHandler(fh)
# ─── 引擎 ───
from engine import TradingEngine, load_config
cfg = load_config()
engine = TradingEngine(cfg)
# ─── 调度任务 ───
def is_trading_day():
"""简单判断是否交易日(周一~周五)"""
return datetime.now().weekday() < 5
def job_pre_market():
"""盘前准备"""
if not is_trading_day():
return
log.info("━━━ 盘前准备 ━━━")
try:
# 刷新股票池缓存
engine.dm.get_pool_codes()
log.info("✅ 盘前准备完成")
except Exception as e:
log.error(f"盘前准备失败: {e}")
traceback.print_exc()
def job_market_check():
"""盘中检查(止损止盈 + 补仓)"""
if not is_trading_day():
return
now = datetime.now()
if now.hour < 9 or now.hour >= 15:
return
if now.hour == 9 and now.minute < 30:
return
log.info(f"━━━ 盘中检查 {now.strftime('%H:%M')} ━━━")
try:
date = now.strftime('%Y%m%d')
# 获取今日价格(批量)
daily = engine.dm.get_daily_batch(date)
# 更新持仓价格
updated = 0
for code in engine.state['positions']:
if code in daily:
p = daily[code]['close']
if p > 0:
engine.state['positions'][code]['current_price'] = p
updated += 1
log.info(f"更新持仓价格: {updated}/{len(engine.state['positions'])}")
# 止损止盈检查
prices = {code: d['close'] for code, d in daily.items() if d['close'] > 0}
triggered = engine.check_stop_loss_take_profit(date, prices)
for reason, code, price, pnl_pct in triggered:
r = '止损' if reason == 'stop_loss' else '止盈'
engine.sell(code, price, date, f'{r}{pnl_pct:+.1f}%')
# 如果有卖出,尝试补仓
if triggered:
log.info("触发卖出,准备补仓扫描...")
# 加载因子所需数据最近300天
stock_data, trade_dates = engine.dm.load_all_history(days=300)
if date in trade_dates:
code_date_idx = {}
for code, sd in stock_data.items():
code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])}
buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices)
for code, name, price, qty, score in buy_list:
engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score)
day_idx = trade_dates.index(date)
engine.state['last_scan_date'] = date
engine.state['last_scan_idx'] = day_idx
# 补仓检查(非触发情况下也要看间隔)
else:
last_scan = engine.state.get('last_scan_idx', -engine.scan_interval)
if engine.state['positions']:
# 有持仓但可能需要扫描
pass
elif not engine.state['positions']:
# 空仓,尝试建仓
stock_data, trade_dates = engine.dm.load_all_history(days=300)
if date in trade_dates:
code_date_idx = {}
for code, sd in stock_data.items():
code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])}
buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices)
for code, name, price, qty, score in buy_list:
engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score)
day_idx = trade_dates.index(date)
engine.state['last_scan_date'] = date
engine.state['last_scan_idx'] = day_idx
engine.save_state()
nav = engine.get_nav()
log.info(f"📊 净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%) | "
f"持仓 {nav['holdings_count']}")
except Exception as e:
log.error(f"盘中检查失败: {e}")
traceback.print_exc()
def job_daily_report():
"""日终报告"""
if not is_trading_day():
return
log.info("━━━ 日终报告 ━━━")
try:
date = datetime.now().strftime('%Y%m%d')
# 获取收盘价更新
daily = engine.dm.get_daily_batch(date)
for code in engine.state['positions']:
if code in daily:
p = daily[code]['close']
if p > 0:
engine.state['positions'][code]['current_price'] = p
nav = engine.get_nav()
engine._log_daily(date, nav)
engine.save_state()
report = engine.get_report()
log.info(f"\n{report}")
# 写入今日报告文件
report_file = os.path.join(LOG_DIR, f"daily_{date}.txt")
with open(report_file, 'w') as f:
f.write(report)
except Exception as e:
log.error(f"日终报告失败: {e}")
traceback.print_exc()
def job_weekly_review():
"""周报"""
log.info("━━━ 周报 ━━━")
try:
db_path = engine._db_file()
import sqlite3
db = sqlite3.connect(db_path)
# 本周交易统计
week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime('%Y%m%d')
trades = db.execute(
'SELECT direction, COUNT(*), SUM(pnl) FROM trades WHERE date >= ? GROUP BY direction',
(week_start,)
).fetchall()
# 净值曲线
navs = db.execute(
'SELECT date, total_nav, return_pct FROM daily_log WHERE date >= ? ORDER BY date',
(week_start,)
).fetchall()
db.close()
nav = engine.get_nav()
log.info(f"本周净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)")
for direction, count, pnl in trades:
log.info(f" {direction}: {count}笔, 盈亏 {pnl or 0:+,.0f}")
if navs:
log.info(f" 净值变化: ¥{navs[0][1]:,.0f} → ¥{navs[-1][1]:,.0f}")
except Exception as e:
log.error(f"周报失败: {e}")
# ─── HTTP状态接口 ───
def start_http_server():
"""简易HTTP状态接口"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class StatusHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/status' or self.path == '/':
nav = engine.get_nav()
positions = []
for code, pos in engine.state['positions'].items():
p = pos.get('current_price', pos['avg_cost'])
positions.append({
'code': code, 'name': pos['name'],
'qty': pos['qty'], 'avg_cost': pos['avg_cost'],
'current_price': p,
'pnl_pct': round((p / pos['avg_cost'] - 1) * 100, 2),
})
data = {
'status': 'running',
'time': datetime.now().isoformat(),
'nav': nav,
'positions': positions,
'trade_count': engine.state.get('trade_count', 0),
}
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode())
elif self.path == '/report':
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(engine.get_report().encode())
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # 静默HTTP日志
server = HTTPServer(('0.0.0.0', 8888), StatusHandler)
server.timeout = 1
return server
# ─── 主入口 ───
if __name__ == '__main__':
log.info("=" * 60)
log.info("🚀 纯信号交易系统启动")
log.info("=" * 60)
# 打印当前状态
nav = engine.get_nav()
log.info(f"初始资金: ¥{engine.initial_cash:,.0f}")
log.info(f"当前净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)")
log.info(f"持仓: {nav['holdings_count']}")
# 启动HTTP
http_server = start_http_server()
log.info("HTTP状态接口: http://0.0.0.0:8888/status")
# 调度器
sched = BlockingScheduler()
# 盘前准备(工作日 09:00
sched.add_job(job_pre_market, CronTrigger(day_of_week='mon-fri', hour=9, minute=0),
id='pre_market', name='盘前准备')
# 盘中检查(工作日 9:30~15:00每30分钟
for h, m in [(9,30), (10,0), (10,30), (11,0), (11,30), (13,0), (13,30), (14,0), (14,30)]:
sched.add_job(job_market_check, CronTrigger(day_of_week='mon-fri', hour=h, minute=m),
id=f'check_{h}{m:02d}', name=f'盘中检查{h}:{m:02d}')
# 日终报告(工作日 15:10
sched.add_job(job_daily_report, CronTrigger(day_of_week='mon-fri', hour=15, minute=10),
id='daily_report', name='日终报告')
# 周报(周六 10:00
sched.add_job(job_weekly_review, CronTrigger(day_of_week='sat', hour=10, minute=0),
id='weekly_review', name='周报')
log.info("调度任务:")
for job in sched.get_jobs():
log.info(f" {job.name}: {job.trigger}")
# 集成HTTP到调度循环
import threading
def run_http():
while True:
http_server.handle_request()
http_thread = threading.Thread(target=run_http, daemon=True)
http_thread.start()
# 启动调度
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
log.info("系统关闭")
http_server.shutdown()