#!/usr/bin/env python3 """ 定时调度器 ========= - 工作日 09:00 盘前准备 - 工作日 09:30 开盘检查 - 工作日 每30分钟 止损止盈检查 + 补仓 - 工作日 15:10 日终报告 - 每周六 10:00 周报 同时启动HTTP状态接口(8888端口) """ import os, sys, json, logging, traceback from datetime import datetime, timedelta from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger BASE_DIR = os.path.dirname(os.path.abspath(__file__)) APP_DIR = os.path.dirname(BASE_DIR) LOG_DIR = os.path.join(APP_DIR, 'logs') DATA_DIR = os.path.join(APP_DIR, 'data') CONFIG_DIR = os.path.join(APP_DIR, 'config') sys.path.insert(0, BASE_DIR) from logging.handlers import TimedRotatingFileHandler # ─── 日志 ─── fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') log = logging.getLogger('scheduler') log.setLevel(logging.INFO) ch = logging.StreamHandler(sys.stdout) ch.setFormatter(fmt) log.addHandler(ch) fh = TimedRotatingFileHandler(os.path.join(LOG_DIR, 'scheduler.log'), when='midnight', backupCount=90, encoding='utf-8') fh.setFormatter(fmt) log.addHandler(fh) # ─── 引擎 ─── from engine import TradingEngine, load_config cfg = load_config() engine = TradingEngine(cfg) # ─── 调度任务 ─── def is_trading_day(): """简单判断是否交易日(周一~周五)""" return datetime.now().weekday() < 5 def job_pre_market(): """盘前准备""" if not is_trading_day(): return log.info("━━━ 盘前准备 ━━━") try: # 刷新股票池缓存 engine.dm.get_pool_codes() log.info("✅ 盘前准备完成") except Exception as e: log.error(f"盘前准备失败: {e}") traceback.print_exc() def job_market_check(): """盘中检查(止损止盈 + 补仓)""" if not is_trading_day(): return now = datetime.now() if now.hour < 9 or now.hour >= 15: return if now.hour == 9 and now.minute < 30: return log.info(f"━━━ 盘中检查 {now.strftime('%H:%M')} ━━━") try: date = now.strftime('%Y%m%d') # 获取今日价格(批量) daily = engine.dm.get_daily_batch(date) # 更新持仓价格 updated = 0 for code in engine.state['positions']: if code in daily: p = daily[code]['close'] if p > 0: engine.state['positions'][code]['current_price'] = p updated += 1 log.info(f"更新持仓价格: {updated}/{len(engine.state['positions'])}只") # 止损止盈检查 prices = {code: d['close'] for code, d in daily.items() if d['close'] > 0} triggered = engine.check_stop_loss_take_profit(date, prices) for reason, code, price, pnl_pct in triggered: r = '止损' if reason == 'stop_loss' else '止盈' engine.sell(code, price, date, f'{r}{pnl_pct:+.1f}%') # 如果有卖出,尝试补仓 if triggered: log.info("触发卖出,准备补仓扫描...") # 加载因子所需数据(最近300天) stock_data, trade_dates = engine.dm.load_all_history(days=300) if date in trade_dates: code_date_idx = {} for code, sd in stock_data.items(): code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])} buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices) for code, name, price, qty, score in buy_list: engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score) day_idx = trade_dates.index(date) engine.state['last_scan_date'] = date engine.state['last_scan_idx'] = day_idx # 补仓检查(非触发情况下也要看间隔) else: last_scan = engine.state.get('last_scan_idx', -engine.scan_interval) if engine.state['positions']: # 有持仓但可能需要扫描 pass elif not engine.state['positions']: # 空仓,尝试建仓 stock_data, trade_dates = engine.dm.load_all_history(days=300) if date in trade_dates: code_date_idx = {} for code, sd in stock_data.items(): code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])} buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices) for code, name, price, qty, score in buy_list: engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score) day_idx = trade_dates.index(date) engine.state['last_scan_date'] = date engine.state['last_scan_idx'] = day_idx engine.save_state() nav = engine.get_nav() log.info(f"📊 净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%) | " f"持仓 {nav['holdings_count']}只") except Exception as e: log.error(f"盘中检查失败: {e}") traceback.print_exc() def job_daily_report(): """日终报告""" if not is_trading_day(): return log.info("━━━ 日终报告 ━━━") try: date = datetime.now().strftime('%Y%m%d') # 获取收盘价更新 daily = engine.dm.get_daily_batch(date) for code in engine.state['positions']: if code in daily: p = daily[code]['close'] if p > 0: engine.state['positions'][code]['current_price'] = p nav = engine.get_nav() engine._log_daily(date, nav) engine.save_state() report = engine.get_report() log.info(f"\n{report}") # 写入今日报告文件 report_file = os.path.join(LOG_DIR, f"daily_{date}.txt") with open(report_file, 'w') as f: f.write(report) except Exception as e: log.error(f"日终报告失败: {e}") traceback.print_exc() def job_weekly_review(): """周报""" log.info("━━━ 周报 ━━━") try: db_path = engine._db_file() import sqlite3 db = sqlite3.connect(db_path) # 本周交易统计 week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime('%Y%m%d') trades = db.execute( 'SELECT direction, COUNT(*), SUM(pnl) FROM trades WHERE date >= ? GROUP BY direction', (week_start,) ).fetchall() # 净值曲线 navs = db.execute( 'SELECT date, total_nav, return_pct FROM daily_log WHERE date >= ? ORDER BY date', (week_start,) ).fetchall() db.close() nav = engine.get_nav() log.info(f"本周净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)") for direction, count, pnl in trades: log.info(f" {direction}: {count}笔, 盈亏 {pnl or 0:+,.0f}") if navs: log.info(f" 净值变化: ¥{navs[0][1]:,.0f} → ¥{navs[-1][1]:,.0f}") except Exception as e: log.error(f"周报失败: {e}") # ─── HTTP状态接口 ─── def start_http_server(): """简易HTTP状态接口""" from http.server import HTTPServer, BaseHTTPRequestHandler import json class StatusHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/status' or self.path == '/': nav = engine.get_nav() positions = [] for code, pos in engine.state['positions'].items(): p = pos.get('current_price', pos['avg_cost']) positions.append({ 'code': code, 'name': pos['name'], 'qty': pos['qty'], 'avg_cost': pos['avg_cost'], 'current_price': p, 'pnl_pct': round((p / pos['avg_cost'] - 1) * 100, 2), }) data = { 'status': 'running', 'time': datetime.now().isoformat(), 'nav': nav, 'positions': positions, 'trade_count': engine.state.get('trade_count', 0), } self.send_response(200) self.send_header('Content-Type', 'application/json; charset=utf-8') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode()) elif self.path == '/report': self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(engine.get_report().encode()) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass # 静默HTTP日志 server = HTTPServer(('0.0.0.0', 8888), StatusHandler) server.timeout = 1 return server # ─── 主入口 ─── if __name__ == '__main__': log.info("=" * 60) log.info("🚀 纯信号交易系统启动") log.info("=" * 60) # 打印当前状态 nav = engine.get_nav() log.info(f"初始资金: ¥{engine.initial_cash:,.0f}") log.info(f"当前净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)") log.info(f"持仓: {nav['holdings_count']}只") # 启动HTTP http_server = start_http_server() log.info("HTTP状态接口: http://0.0.0.0:8888/status") # 调度器 sched = BlockingScheduler() # 盘前准备(工作日 09:00) sched.add_job(job_pre_market, CronTrigger(day_of_week='mon-fri', hour=9, minute=0), id='pre_market', name='盘前准备') # 盘中检查(工作日 9:30~15:00,每30分钟) for h, m in [(9,30), (10,0), (10,30), (11,0), (11,30), (13,0), (13,30), (14,0), (14,30)]: sched.add_job(job_market_check, CronTrigger(day_of_week='mon-fri', hour=h, minute=m), id=f'check_{h}{m:02d}', name=f'盘中检查{h}:{m:02d}') # 日终报告(工作日 15:10) sched.add_job(job_daily_report, CronTrigger(day_of_week='mon-fri', hour=15, minute=10), id='daily_report', name='日终报告') # 周报(周六 10:00) sched.add_job(job_weekly_review, CronTrigger(day_of_week='sat', hour=10, minute=0), id='weekly_review', name='周报') log.info("调度任务:") for job in sched.get_jobs(): log.info(f" {job.name}: {job.trigger}") # 集成HTTP到调度循环 import threading def run_http(): while True: http_server.handle_request() http_thread = threading.Thread(target=run_http, daemon=True) http_thread.start() # 启动调度 try: sched.start() except (KeyboardInterrupt, SystemExit): log.info("系统关闭") http_server.shutdown()