commit 4ae80611726248057e3a1fdb14bdfd7008d9aea0 Author: OpenClaw Trading Date: Sun Apr 5 16:59:50 2026 +0800 初始提交:纯信号量化交易系统 v1.0 策略:纯信号模式(不定期换仓,止损止盈卖出后因子选股补仓) 股票池:沪深300+中证500(800+只) 止损-8% / 止盈+25% / 单只20% / 最多5只 5年回测:+371.7%,夏普0.82,年化21.3% 组件: - engine.py: 核心交易引擎 + 因子评分 + 数据管理 - scheduler.py: APScheduler定时调度 + HTTP状态接口 - trade_tool.py: 命令行工具 - config.json: 策略参数配置 - Dockerfile + docker-compose.yml: 容器化部署 日志系统: - 文件日志(按日轮转,保留90天) - SQLite: trades/daily_log/signal_log/system_log diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..97ec344 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# 运行时数据 +data/ +!data/.gitkeep + +# 日志 +logs/ +!logs/.gitkeep + +# Python +__pycache__/ +*.pyc +*.pyo +.venv/ + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store +Thumbs.db + +# 敏感信息 +*.env diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e2db851 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,31 @@ +FROM python:3.11-slim + +LABEL maintainer="trading-signal" +LABEL description="A股纯信号量化交易系统" + +WORKDIR /app + +# 系统依赖 +RUN apt-get update && apt-get install -y --no-install-recommends \ + tzdata curl && \ + ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ + echo "Asia/Shanghai" > /etc/timezone && \ + apt-get clean && rm -rf /var/lib/apt/lists/* + +# Python依赖 +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# 应用代码 +COPY scripts/ /app/scripts/ +COPY config/ /app/config/ + +# 数据和日志通过volume挂载 +VOLUME ["/app/data", "/app/logs"] + +# 健康检查 +HEALTHCHECK --interval=60s --timeout=10s --retries=3 \ + CMD test -f /app/data/state.json || exit 1 + +# 启动 +CMD ["python3", "-u", "/app/scripts/scheduler.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..f800cf7 --- /dev/null +++ b/README.md @@ -0,0 +1,172 @@ +# A股纯信号量化交易系统 + +## 策略说明 + +纯信号交易策略(固定参数): +- **不做定期换仓**,只根据止损/止盈信号卖出 +- 卖出后立即因子选股补仓,每10天扫描补充新标的 +- 止损 -8% / 止盈 +25% +- 单只仓位 ≤20%,最多持有5只 +- 股票池:沪深300+中证500(约800-900只) + +## 系统架构 + +``` +trading-system/ +├── config/ +│ └── config.json # 策略配置 +├── scripts/ +│ ├── engine.py # 核心交易引擎 + 因子评分 +│ ├── scheduler.py # APScheduler定时调度 +│ ├── trade_tool.py # 命令行工具 +│ ├── start.sh # Host模式启动 +│ └── docker-start.sh # Docker模式启动 +├── data/ # 运行数据(gitignore) +│ ├── state.json # 策略状态(持仓/现金/净值) +│ ├── trading.db # SQLite交易日志 +│ └── pool_codes.json # 股票池缓存 +├── logs/ # 运行日志 +│ ├── engine.log # 引擎日志(按日轮转,保留90天) +│ ├── scheduler.log # 调度日志 +│ └── daily_YYYYMMDD.txt # 日终报告 +├── Dockerfile # Docker构建 +├── docker-compose.yml +└── requirements.txt +``` + +## 快速开始 + +### Host模式(推荐NAS使用) + +```bash +# 启动调度器(后台运行) +nohup bash scripts/start.sh > logs/startup.log 2>&1 & + +# 或手动运行一天 +python3 scripts/trade_tool.py run --date 20260403 +``` + +### Docker模式 + +```bash +# 构建 & 启动 +bash scripts/docker-start.sh + +# 查看日志 +docker logs -f trading-signal +``` + +### 命令行工具 + +```bash +# 查看状态 +python3 scripts/trade_tool.py status + +# 初始化(重置资金) +python3 scripts/trade_tool.py init --cash 100000 + +# 手动买入 +python3 scripts/trade_tool.py buy 600519.SH --price 1700 --qty 100 + +# 手动卖出 +python3 scripts/trade_tool.py sell 600519.SH --price 1750 + +# 交易历史 +python3 scripts/trade_tool.py history --limit 20 + +# 净值曲线 +python3 scripts/trade_tool.py nav +``` + +## 定时任务 + +| 时间 | 任务 | 说明 | +|------|------|------| +| 09:00 | 盘前准备 | 刷新股票池 | +| 9:30-14:30 | 每30分钟 | 止损止盈检查 + 补仓 | +| 15:10 | 日终报告 | 更新收盘价 + 生成报告 | +| 周六 10:00 | 周报 | 本周交易统计 | + +## HTTP接口 + +- `http://localhost:8888/status` — JSON状态 +- `http://localhost:8888/report` — 文本报告 + +## 日志系统 + +### 文件日志 +- `logs/engine.log` — 每日轮转,保留90天 +- `logs/daily_YYYYMMDD.txt` — 每日报告 + +### SQLite日志表 + +| 表名 | 内容 | +|------|------| +| trades | 交易记录(买卖方向/价格/盈亏/原因) | +| daily_log | 每日净值快照 | +| signal_log | 信号记录(买入评分/卖出原因) | +| system_log | 系统事件日志 | + +### 查询示例 + +```python +import sqlite3 +db = sqlite3.connect('data/trading.db') + +# 最近交易 +db.execute('SELECT date,name,direction,qty,price,pnl,pnl_pct,reason FROM trades ORDER BY id DESC LIMIT 10').fetchall() + +# 净值曲线 +db.execute('SELECT date,total_nav,return_pct FROM daily_log ORDER BY date').fetchall() + +# 交易胜率 +db.execute('SELECT COUNT(*), SUM(CASE WHEN pnl>0 THEN 1 END) FROM trades WHERE direction="sell"').fetchall() +``` + +## 配置说明 + +`config/config.json`: + +```json +{ + "initial_cash": 100000, // 初始资金 + "position": { + "max_position_pct": 0.20, // 单只上限20% + "max_holdings": 5, // 最多5只 + "top_n_buy": 3 // 每次最多买3只 + }, + "exit": { + "stop_loss_pct": -0.08, // 止损-8% + "take_profit_pct": 0.25 // 止盈+25% + }, + "scan": { + "interval_days": 10, // 10天扫描间隔 + "factor_pool_size": 50 // 因子Top N + } +} +``` + +## 因子评分 + +10维纯量价因子: +| 因子 | 权重 | 方向 | +|------|------|------| +| 历史波动率(20日) | 20% | 低波动好 | +| ATR比率 | 18% | 低ATR好 | +| 量比(5/20日) | 10% | 缩量好 | +| 放量突破 | 8% | — | +| MACD柱趋势 | 6% | — | +| 60日动量 | 4% | 低动量好 | +| MA20斜率 | 4% | — | +| RSI(14) | 10% | 低RSI好 | +| MA距离 | 10% | 均线下方好 | +| 换手率 | 10% | 适中好 | + +## 回测表现(5年) + +| 指标 | 数值 | +|------|------| +| 总收益 | +371.7% | +| 年化收益 | 21.3% | +| 最大回撤 | -35.0% | +| 夏普比率 | 0.82 | diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..58eac69 --- /dev/null +++ b/config/config.json @@ -0,0 +1,51 @@ +{ + "tushare_token": "${TUSHARE_TOKEN}", + "initial_cash": 100000, + "strategy": "pure_signal", + + "pool": { + "source": "hs300_zz500", + "min_price": 3.0, + "min_history_days": 60 + }, + + "position": { + "max_position_pct": 0.20, + "max_holdings": 5, + "top_n_buy": 3, + "min_buy_amount": 5000 + }, + + "exit": { + "stop_loss_pct": -0.08, + "take_profit_pct": 0.25, + "trailing_stop_pct": 0.05 + }, + + "fee": { + "commission_rate": 0.0003, + "min_commission": 5, + "stamp_tax_rate": 0.001 + }, + + "scan": { + "interval_days": 10, + "factor_pool_size": 50 + }, + + "schedule": { + "pre_market": "09:00", + "market_open": "09:30", + "check_interval_min": 30, + "market_close": "15:00", + "daily_report": "15:10", + "weekly_review": "18:00", + "weekly_day": "fri" + }, + + "notify": { + "enabled": false, + "webhook_url": "", + "feishu_token": "" + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..80e699e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,22 @@ +version: "3.8" + +services: + trading-signal: + build: . + container_name: trading-signal + restart: unless-stopped + volumes: + - ./data:/app/data + - ./logs:/app/logs + - ./config:/app/config:ro + environment: + - TZ=Asia/Shanghai + - TUSHARE_TOKEN=4348b35f0b5c7a85b988264e4962be1ca57e0f46faf6148806e5ebd4 + - LOG_LEVEL=INFO + ports: + - "8888:8888" # 状态HTTP接口 + logging: + driver: json-file + options: + max-size: "50m" + max-file: "5" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d500311 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +tushare>=1.4.0 +numpy>=1.24.0 +pandas>=2.0.0 +APScheduler>=3.10.0 diff --git a/scripts/docker-start.sh b/scripts/docker-start.sh new file mode 100644 index 0000000..8e0194b --- /dev/null +++ b/scripts/docker-start.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# 交易系统 Docker 启动 +# 先构建镜像再运行 + +set -e + +BASE_DIR="$(cd "$(dirname "$0")/.." && pwd)" +cd "$BASE_DIR" + +# 构建镜像(如果不存在) +if ! docker images | grep -q trading-signal; then + echo "构建Docker镜像..." + docker build -t trading-signal:latest . +fi + +# 停止旧容器 +docker rm -f trading-signal 2>/dev/null || true + +# 启动 +docker run -d \ + --name trading-signal \ + --restart unless-stopped \ + -e TZ=Asia/Shanghai \ + -e TUSHARE_TOKEN=4348b35f0b5c7a85b988264e4962be1ca57e0f46faf6148806e5ebd4 \ + -v "$BASE_DIR/data:/app/data" \ + -v "$BASE_DIR/logs:/app/logs" \ + -p 8888:8888 \ + trading-signal:latest + +echo "✅ 交易系统已启动" +echo " 状态: http://localhost:8888/status" +echo " 日志: docker logs -f trading-signal" diff --git a/scripts/engine.py b/scripts/engine.py new file mode 100644 index 0000000..1904ddd --- /dev/null +++ b/scripts/engine.py @@ -0,0 +1,894 @@ +#!/usr/bin/env python3 +""" +纯信号交易引擎 v2.0 +================== +固定参数纯信号策略: + - 不定期换仓,只根据止损/止盈卖出 + - 卖出后立即因子选股补仓 + - 每10天扫描补充新标的 + - 固定止损-8% / 止盈+25% + - 单只20%仓位 / 最多5只 +""" + +import os, sys, json, time, math, sqlite3, logging, traceback +from datetime import datetime, timedelta +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed + +import numpy as np +import tushare as ts + +# ─── 路径 ─── +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +APP_DIR = os.path.dirname(BASE_DIR) +DATA_DIR = os.path.join(APP_DIR, 'data') +LOG_DIR = os.path.join(APP_DIR, 'logs') +CONFIG_DIR = os.path.join(APP_DIR, 'config') + +for d in [DATA_DIR, LOG_DIR]: + os.makedirs(d, exist_ok=True) + +# ─── 日志 ─── +def setup_logger(name, log_file=None, level=logging.INFO): + logger = logging.getLogger(name) + logger.setLevel(level) + fmt = logging.Formatter( + '%(asctime)s [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + # Console + ch = logging.StreamHandler(sys.stdout) + ch.setFormatter(fmt) + logger.addHandler(ch) + # File + if log_file: + from logging.handlers import TimedRotatingFileHandler + fh = TimedRotatingFileHandler( + os.path.join(LOG_DIR, log_file), + when='midnight', backupCount=90, encoding='utf-8' + ) + fh.setFormatter(fmt) + logger.addHandler(fh) + return logger + +log = setup_logger('engine', 'engine.log') + +# ─── 配置加载 ─── +def load_config(): + cfg_path = os.path.join(CONFIG_DIR, 'config.json') + if os.path.exists(cfg_path): + with open(cfg_path) as f: + cfg = json.load(f) + # 环境变量替换 + token = os.environ.get('TUSHARE_TOKEN', '') + if token: + cfg['tushare_token'] = token + return cfg + return { + 'tushare_token': os.environ.get('TUSHARE_TOKEN', ''), + 'initial_cash': 100000, + 'position': { + 'max_position_pct': 0.20, 'max_holdings': 5, + 'top_n_buy': 3, 'min_buy_amount': 5000 + }, + 'exit': {'stop_loss_pct': -0.08, 'take_profit_pct': 0.25}, + 'fee': {'commission_rate': 0.0003, 'min_commission': 5, 'stamp_tax_rate': 0.001}, + 'scan': {'interval_days': 10, 'factor_pool_size': 50}, + } + + +# ===================================================================== +# 因子引擎 +# ===================================================================== +class FactorEngine: + """纯量价因子评分""" + + FACTOR_DEFS = { + 'hist_vol_20d': {'weight': 0.20, 'direction': 1}, + 'atr_ratio': {'weight': 0.18, 'direction': 1}, + 'volume_ratio': {'weight': 0.10, 'direction': -1}, + 'vol_breakout': {'weight': 0.08, 'direction': -1}, + 'macd_hist_trend': {'weight': 0.06, 'direction': -1}, + 'mom_60d': {'weight': 0.04, 'direction': -1}, + 'ma20_slope': {'weight': 0.04, 'direction': 1}, + 'rsi_14': {'weight': 0.10, 'direction': -1}, + 'ma_dist': {'weight': 0.10, 'direction': -1}, + 'turnover_avg': {'weight': 0.10, 'direction': 1}, + } + + def score_all(self, stock_data, date, date_to_idx, pool_size=50): + """评分所有股票,返回 [(code, score)] 排序后Top pool_size""" + raw_scores = {} + for code, sd in stock_data.items(): + idx_map = date_to_idx.get(code, {}) + if date not in idx_map: + continue + eidx = idx_map[date] + if eidx < 60: + continue + + closes = sd['closes'][:eidx+1] + if len(closes) < 60: + continue + + try: + c = np.array(closes, dtype=float) + highs = np.array(sd['highs'][:eidx+1], dtype=float) + lows = np.array(sd['lows'][:eidx+1], dtype=float) + volumes = np.array(sd['volumes'][:eidx+1], dtype=float) + + n = len(c) + factors = {} + + # hist_vol_20d + if n >= 21: + rets = np.diff(c[-21:]) / c[-21:-1] + factors['hist_vol_20d'] = np.std(rets) + + # atr_ratio + if n >= 15: + prev_close = np.append(c[-15], c[-14:-1]) + tr = np.maximum(highs[-14:] - lows[-14:], + np.maximum(np.abs(highs[-14:] - prev_close), + np.abs(lows[-14:] - prev_close))) + atr14 = np.mean(tr) + if c[-1] > 0: + factors['atr_ratio'] = atr14 / c[-1] + + # volume_ratio + if n >= 21: + vol5 = np.mean(volumes[-5:]) + vol20 = np.mean(volumes[-20:]) + if vol20 > 0: + factors['volume_ratio'] = vol5 / vol20 + + # vol_breakout + if n >= 21: + ma_vol = np.mean(volumes[-20:]) + if ma_vol > 0: + factors['vol_breakout'] = volumes[-1] / ma_vol + + # macd_hist_trend + if n >= 35: + ema12 = c[-1] + ema26 = c[-1] + for i in range(max(0, n-35), n): + ema12 = c[i] * (2/13) + ema12 * (11/13) + ema26 = c[i] * (2/27) + ema26 * (25/27) + dif = ema12 - ema26 + factors['macd_hist_trend'] = dif / c[-1] if c[-1] > 0 else 0 + + # mom_60d + if n >= 61: + factors['mom_60d'] = (c[-1] / c[-61] - 1) + + # ma20_slope + if n >= 25: + ma20_now = np.mean(c[-20:]) + ma20_5 = np.mean(c[-25:-5]) + if ma20_5 > 0: + factors['ma20_slope'] = (ma20_now / ma20_5 - 1) + + # rsi_14 + if n >= 16: + deltas = np.diff(c[-15:]) + gains = np.where(deltas > 0, deltas, 0) + losses = np.where(deltas < 0, -deltas, 0) + avg_gain = np.mean(gains) + avg_loss = np.mean(losses) + if avg_loss > 0: + rs = avg_gain / avg_loss + factors['rsi_14'] = 100 - 100 / (1 + rs) + else: + factors['rsi_14'] = 100 + + # ma_dist (距离MA60) + if n >= 61: + ma60 = np.mean(c[-60:]) + if ma60 > 0: + factors['ma_dist'] = (c[-1] / ma60 - 1) + + # turnover_avg + if n >= 11 and len(sd.get('amounts', [])) >= n: + amounts = sd['amounts'][:eidx+1] + avg_turn = np.mean([a/(c[i]*1e7) if c[i] > 0 and i < len(amounts) else 0 + for i, a in enumerate(amounts[-10:])]) + factors['turnover_avg'] = avg_turn + + # 计算综合评分 + if len(factors) >= 5: + score = 0 + for fname, fdef in self.FACTOR_DEFS.items(): + if fname in factors: + val = factors[fname] + if np.isnan(val) or np.isinf(val): + continue + score += val * fdef['weight'] * fdef['direction'] + raw_scores[code] = score + + except Exception: + continue + + ranked = sorted(raw_scores.items(), key=lambda x: -x[1]) + return ranked[:pool_size] + + +# ===================================================================== +# 数据管理 +# ===================================================================== +class DataManager: + """Tushare数据拉取与缓存""" + + def __init__(self, token): + self.pro = ts.pro_api(token) + self._stock_basic = None + self._index_daily = {} + self._pool_codes = None + + def get_stock_basic(self): + if self._stock_basic is None: + log.info("拉取股票基础信息...") + self._stock_basic = self.pro.stock_basic( + exchange='', list_status='L', + fields='ts_code,symbol,name,industry,list_date' + ) + return self._stock_basic + + def get_hs300_codes(self): + log.info("拉取沪深300成分股...") + df = self.pro.index_weight(index_code='399300.SZ', start_date='20250101') + if df is None or df.empty: + df = self.pro.index_weight(index_code='399300.SZ') + codes = df['con_code'].unique().tolist() + log.info(f" 沪深300: {len(codes)}只") + return codes + + def get_zz500_codes(self): + log.info("拉取中证500成分股...") + df = self.pro.index_weight(index_code='000905.SH', start_date='20250101') + if df is None or df.empty: + df = self.pro.index_weight(index_code='000905.SH') + codes = df['con_code'].unique().tolist() + log.info(f" 中证500: {len(codes)}只") + return codes + + def get_pool_codes(self): + """获取股票池(沪深300+中证500)""" + if self._pool_codes is not None: + return self._pool_codes + + # 尝试加载本地缓存 + cache_file = os.path.join(DATA_DIR, 'pool_codes.json') + if os.path.exists(cache_file): + age = time.time() - os.path.getmtime(cache_file) + if age < 7 * 86400: # 7天缓存 + with open(cache_file) as f: + self._pool_codes = json.load(f) + log.info(f"股票池缓存命中: {len(self._pool_codes)}只") + return self._pool_codes + + hs300 = self.get_hs300_codes() + zz500 = self.get_zz500_codes() + all_codes = list(set(hs300 + zz500)) + + # 过滤ST和退市 + basic = self.get_stock_basic() + st_names = basic[basic['name'].str.contains('ST|退', na=False)]['ts_code'].tolist() + all_codes = [c for c in all_codes if c not in st_names] + + self._pool_codes = all_codes + with open(cache_file, 'w') as f: + json.dump(all_codes, f) + log.info(f"股票池: {len(all_codes)}只(去重后)") + return all_codes + + def get_trade_dates(self, start_date=None, end_date=None): + if end_date is None: + end_date = datetime.now().strftime('%Y%m%d') + if start_date is None: + start_date = (datetime.now() - timedelta(days=60)).strftime('%Y%m%d') + df = self.pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open=1) + return sorted(df['cal_date'].tolist()) + + def get_daily_batch(self, trade_date): + """批量获取某天全市场日线""" + try: + df = self.pro.daily(trade_date=trade_date) + if df is None or df.empty: + return {} + result = {} + for _, row in df.iterrows(): + result[row['ts_code']] = { + 'open': float(row.get('open', 0)), + 'high': float(row.get('high', 0)), + 'low': float(row.get('low', 0)), + 'close': float(row.get('close', 0)), + 'volume': float(row.get('vol', 0)), + 'amount': float(row.get('amount', 0)), + } + return result + except Exception as e: + log.warning(f"拉取{trade_date}日线失败: {e}") + return {} + + def get_stock_daily(self, ts_code, start_date, end_date): + """获取单只股票历史日线""" + try: + df = self.pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date) + if df is None or df.empty: + return None + df = df.sort_values('trade_date') + return { + 'dates': df['trade_date'].tolist(), + 'opens': df['open'].astype(float).tolist(), + 'highs': df['high'].astype(float).tolist(), + 'lows': df['low'].astype(float).tolist(), + 'closes': df['close'].astype(float).tolist(), + 'volumes': df['vol'].astype(float).tolist(), + 'amounts': df['amount'].astype(float).tolist(), + } + except Exception as e: + log.debug(f"拉取{ts_code}失败: {e}") + return None + + def get_index_daily(self, ts_code='000300.SH', days=120): + """获取指数日线""" + end = datetime.now().strftime('%Y%m%d') + start = (datetime.now() - timedelta(days=int(days*1.5))).strftime('%Y%m%d') + try: + df = self.pro.index_daily(ts_code=ts_code, start_date=start, end_date=end) + if df is None or df.empty: + return [], [] + df = df.sort_values('trade_date') + return df['close'].astype(float).tolist(), df['vol'].astype(float).tolist() + except: + return [], [] + + def load_all_history(self, days=300): + """加载所有股票池成分的历史数据""" + pool = self.get_pool_codes() + end = datetime.now().strftime('%Y%m%d') + start = (datetime.now() - timedelta(days=int(days*1.5))).strftime('%Y%m%d') + + trade_dates = self.get_trade_dates(start, end) + if not trade_dates: + log.error("无法获取交易日历") + return {}, [] + + log.info(f"交易日历: {len(trade_dates)}天 ({trade_dates[0]}~{trade_dates[-1]})") + + # 按天批量拉取 + stock_data = {} + for i, td in enumerate(trade_dates): + daily = self.get_daily_batch(td) + if not daily: + continue + for code, row in daily.items(): + if code not in pool: + continue + if code not in stock_data: + stock_data[code] = { + 'dates': [], 'opens': [], 'highs': [], 'lows': [], + 'closes': [], 'volumes': [], 'amounts': [], 'name': code + } + sd = stock_data[code] + sd['dates'].append(td) + sd['opens'].append(row['open']) + sd['highs'].append(row['high']) + sd['lows'].append(row['low']) + sd['closes'].append(row['close']) + sd['volumes'].append(row['volume']) + sd['amounts'].append(row['amount']) + + if (i+1) % 50 == 0: + log.info(f" [{i+1}/{len(trade_dates)}] {len(stock_data)}只") + + log.info(f"数据加载完成: {len(stock_data)}只 × {len(trade_dates)}天") + + # 填充中文名 + try: + basic = self.get_stock_basic() + name_map = dict(zip(basic['ts_code'], basic['name'])) + for code in stock_data: + if code in name_map: + stock_data[code]['name'] = name_map[code] + except: + pass + + return stock_data, trade_dates + + +# ===================================================================== +# 交易引擎 +# ===================================================================== +class TradingEngine: + """纯信号交易引擎""" + + def __init__(self, config=None): + self.cfg = config or load_config() + self.token = self.cfg.get('tushare_token', '') + self.dm = DataManager(self.token) + self.factor = FactorEngine() + + # 策略参数 + pos_cfg = self.cfg.get('position', {}) + exit_cfg = self.cfg.get('exit', {}) + fee_cfg = self.cfg.get('fee', {}) + scan_cfg = self.cfg.get('scan', {}) + + self.initial_cash = float(self.cfg.get('initial_cash', 100000)) + self.max_position_pct = pos_cfg.get('max_position_pct', 0.20) + self.max_holdings = pos_cfg.get('max_holdings', 5) + self.top_n_buy = pos_cfg.get('top_n_buy', 3) + self.stop_loss_pct = exit_cfg.get('stop_loss_pct', -0.08) + self.take_profit_pct = exit_cfg.get('take_profit_pct', 0.25) + self.commission_rate = fee_cfg.get('commission_rate', 0.0003) + self.min_commission = fee_cfg.get('min_commission', 5) + self.stamp_tax_rate = fee_cfg.get('stamp_tax_rate', 0.001) + self.scan_interval = scan_cfg.get('interval_days', 10) + self.pool_size = scan_cfg.get('factor_pool_size', 50) + + # 状态 + self.state = self._load_state() + self._init_db() + + def _state_file(self): + return os.path.join(DATA_DIR, 'state.json') + + def _db_file(self): + return os.path.join(DATA_DIR, 'trading.db') + + def _load_state(self): + path = self._state_file() + if os.path.exists(path): + with open(path) as f: + return json.load(f) + return { + 'cash': self.initial_cash, + 'positions': {}, + 'last_scan_date': None, + 'last_scan_idx': -self.scan_interval, + 'trade_count': 0, + 'created': datetime.now().isoformat(), + 'nav_history': [], + } + + def save_state(self): + self.state['updated'] = datetime.now().isoformat() + with open(self._state_file(), 'w') as f: + json.dump(self.state, f, ensure_ascii=False, indent=2) + + def _init_db(self): + db = sqlite3.connect(self._db_file()) + db.execute('''CREATE TABLE IF NOT EXISTS trades ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, + code TEXT NOT NULL, + name TEXT, + direction TEXT NOT NULL, + qty INTEGER, + price REAL, + amount REAL, + commission REAL, + stamp_tax REAL, + net_amount REAL, + pnl REAL, + pnl_pct REAL, + reason TEXT, + factor_score REAL, + created_at TEXT DEFAULT CURRENT_TIMESTAMP)''') + db.execute('''CREATE TABLE IF NOT EXISTS daily_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL UNIQUE, + cash REAL, + market_value REAL, + total_nav REAL, + holdings_count INTEGER, + return_pct REAL, + details TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP)''') + db.execute('''CREATE TABLE IF NOT EXISTS signal_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, + signal_type TEXT, + code TEXT, + name TEXT, + score REAL, + details TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP)''') + db.execute('''CREATE TABLE IF NOT EXISTS system_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, + level TEXT, + module TEXT, + message TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP)''') + db.commit() + db.close() + + def _log_trade(self, trade): + """记录交易到SQLite""" + db = sqlite3.connect(self._db_file()) + db.execute('''INSERT INTO trades + (date,code,name,direction,qty,price,amount,commission,stamp_tax,net_amount,pnl,pnl_pct,reason,factor_score) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', + (trade['date'], trade['code'], trade.get('name',''), + trade['direction'], trade.get('qty',0), trade.get('price',0), + trade.get('amount',0), trade.get('commission',0), + trade.get('stamp_tax',0), trade.get('net_amount',0), + trade.get('pnl'), trade.get('pnl_pct'), trade.get('reason',''), + trade.get('factor_score'))) + db.commit() + db.close() + self.state['trade_count'] = self.state.get('trade_count', 0) + 1 + + def _log_daily(self, date, nav_details): + """记录每日净值""" + db = sqlite3.connect(self._db_file()) + db.execute('''INSERT OR REPLACE INTO daily_log + (date,cash,market_value,total_nav,holdings_count,return_pct,details) + VALUES (?,?,?,?,?,?,?)''', + (date, nav_details['cash'], nav_details['market_value'], + nav_details['total_nav'], nav_details['holdings_count'], + nav_details['return_pct'], json.dumps(nav_details, ensure_ascii=False))) + db.commit() + db.close() + + def _log_signal(self, date, signal_type, code, name, score, details=''): + """记录信号""" + db = sqlite3.connect(self._db_file()) + db.execute('''INSERT INTO signal_log + (date,signal_type,code,name,score,details) + VALUES (?,?,?,?,?,?)''', + (date, signal_type, code, name, score, details)) + db.commit() + db.close() + + def _log_system(self, date, level, module, message): + """记录系统事件""" + db = sqlite3.connect(self._db_file()) + db.execute('''INSERT INTO system_log + (date,level,module,message) + VALUES (?,?,?,?)''', (date, level, module, message)) + db.commit() + db.close() + + # ─── 核心交易逻辑 ─── + + def buy(self, code, name, price, qty, date, reason='', factor_score=0): + """买入""" + amount = qty * price + commission = max(amount * self.commission_rate, self.min_commission) + total_cost = amount + commission + + if self.state['cash'] < total_cost: + log.warning(f"资金不足: 需{total_cost:.0f} 有{self.state['cash']:.0f}") + return False + + self.state['cash'] -= total_cost + self.state['positions'][code] = { + 'name': name, 'qty': qty, + 'avg_cost': price, 'current_price': price, + 'buy_date': date, 'factor_score': factor_score, + } + + trade = { + 'date': date, 'code': code, 'name': name, + 'direction': 'buy', 'qty': qty, 'price': price, + 'amount': round(amount, 2), 'commission': round(commission, 2), + 'net_amount': round(total_cost, 2), + 'reason': reason, 'factor_score': factor_score, + } + self._log_trade(trade) + self._log_signal(date, 'buy', code, name, factor_score, reason) + log.info(f"🟢 买入 {name}({code}) {qty}股@¥{price:.2f} = ¥{amount:,.0f} [{reason}]") + return True + + def sell(self, code, price, date, reason=''): + """卖出""" + if code not in self.state['positions']: + return False + + pos = self.state['positions'][code] + qty = pos['qty'] + amount = qty * price + commission = max(amount * self.commission_rate, self.min_commission) + stamp_tax = amount * self.stamp_tax_rate + net_revenue = amount - commission - stamp_tax + + buy_cost = qty * pos['avg_cost'] + pnl = net_revenue - buy_cost + pnl_pct = (price / pos['avg_cost'] - 1) * 100 + + self.state['cash'] += net_revenue + del self.state['positions'][code] + + trade = { + 'date': date, 'code': code, 'name': pos['name'], + 'direction': 'sell', 'qty': qty, 'price': price, + 'amount': round(amount, 2), 'commission': round(commission, 2), + 'stamp_tax': round(stamp_tax, 2), 'net_amount': round(net_revenue, 2), + 'pnl': round(pnl, 2), 'pnl_pct': round(pnl_pct, 2), + 'reason': reason, 'factor_score': pos.get('factor_score', 0), + } + self._log_trade(trade) + self._log_signal(date, 'sell', code, pos['name'], 0, f'{reason} PnL:{pnl:+.0f}({pnl_pct:+.1f}%)') + + emoji = '✅' if pnl > 0 else '❌' + log.info(f"{emoji} 卖出 {pos['name']}({code}) {qty}股@¥{price:.2f} " + f"PnL ¥{pnl:+,.0f}({pnl_pct:+.1f}%) [{reason}]") + return True + + def get_nav(self): + """计算当前净值""" + mv = sum( + pos.get('current_price', pos['avg_cost']) * pos['qty'] + for pos in self.state['positions'].values() + ) + total = self.state['cash'] + mv + return { + 'cash': round(self.state['cash'], 2), + 'market_value': round(mv, 2), + 'total_nav': round(total, 2), + 'holdings_count': len(self.state['positions']), + 'return_pct': round((total / self.initial_cash - 1) * 100, 4), + } + + def get_today_price(self, code): + """获取实时/最新价格""" + try: + df = self.dm.pro.daily(ts_code=code, + start_date=datetime.now().strftime('%Y%m%d'), + end_date=datetime.now().strftime('%Y%m%d')) + if df is not None and not df.empty: + return float(df.iloc[0]['close']) + except: + pass + # 用前一个交易日 + try: + df = self.dm.pro.daily(ts_code=code, limit=1) + if df is not None and not df.empty: + return float(df.iloc[0]['close']) + except: + pass + # 用持仓成本 + if code in self.state['positions']: + return self.state['positions'][code].get('current_price', 0) + return 0 + + def check_stop_loss_take_profit(self, date, prices=None): + """检查所有持仓的止损止盈""" + triggered = [] + for code in list(self.state['positions'].keys()): + pos = self.state['positions'][code] + p = (prices or {}).get(code, 0) + if p <= 0: + p = self.get_today_price(code) + if p <= 0: + continue + + pos['current_price'] = p + pnl_pct = (p / pos['avg_cost'] - 1) + + if pnl_pct <= self.stop_loss_pct: + triggered.append(('stop_loss', code, p, pnl_pct)) + elif pnl_pct >= self.take_profit_pct: + triggered.append(('take_profit', code, p, pnl_pct)) + + return triggered + + def scan_buy(self, date, stock_data=None, date_to_idx=None, prices=None): + """因子选股扫描,返回推荐买入列表""" + if stock_data and date_to_idx: + scored = self.factor.score_all(stock_data, date, date_to_idx, self.pool_size) + else: + # 实盘模式:需要先加载数据 + return [] + + nav = self.get_nav() + max_new = self.max_holdings - len(self.state['positions']) + if max_new <= 0: + return [] + + buy_list = [] + for code, score in scored: + if len(buy_list) >= min(self.top_n_buy, max_new): + break + if code in self.state['positions']: + continue + p = (prices or {}).get(code, 0) + if p <= 0 or p < 3: + continue + + buy_amt = min(self.state['cash'] * 0.9, nav['total_nav'] * self.max_position_pct) + qty = int(buy_amt / p / 100) * 100 + if qty < 100: + continue + + name = stock_data[code].get('name', code) + buy_list.append((code, name, p, qty, score)) + + return buy_list + + # ─── 完整运行一天 ─── + + def run_daily(self, date=None): + """ + 运行一天的交易逻辑: + 1. 加载数据 + 2. 更新持仓价格 + 3. 检查止损止盈 + 4. 补仓扫描(如果需要) + 5. 记录日志 + """ + if date is None: + date = datetime.now().strftime('%Y%m%d') + + log.info(f"═══════════════════════════════════════") + log.info(f"📋 开始处理 {date}") + + # 1. 加载数据 + log.info("📥 加载历史数据...") + stock_data, trade_dates = self.dm.load_all_history(days=300) + + if date not in trade_dates: + log.info(f"{date} 非交易日,跳过") + return + + day_idx = trade_dates.index(date) + if day_idx < 60: + log.warning("数据不足60天,无法运行") + return + + # 构建索引 + code_date_idx = {} + for code, sd in stock_data.items(): + code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])} + + # 2. 当天价格 + today_prices = {} + for code, sd in stock_data.items(): + idx_map = code_date_idx.get(code, {}) + if date in idx_map: + eidx = idx_map[date] + if eidx < len(sd['closes']): + p = sd['closes'][eidx] + if p > 0: + today_prices[code] = p + + # 更新持仓价格 + for code, pos in self.state['positions'].items(): + if code in today_prices: + pos['current_price'] = today_prices[code] + + # 3. 止损止盈检查 + triggered = self.check_stop_loss_take_profit(date, today_prices) + for reason, code, price, pnl_pct in triggered: + r = '止损' if reason == 'stop_loss' else '止盈' + self.sell(code, price, date, f'{r}{pnl_pct:+.1f}%') + + # 4. 补仓扫描 + last_scan = self.state.get('last_scan_idx', -self.scan_interval) + need_scan = triggered or (day_idx - last_scan) >= self.scan_interval + + if need_scan and len(self.state['positions']) < self.max_holdings: + log.info(f"🔍 扫描补仓 (triggered={len(triggered)>0}, interval={day_idx - last_scan}天)") + buy_list = self.scan_buy(date, stock_data, code_date_idx, today_prices) + for code, name, price, qty, score in buy_list: + self.buy(code, name, price, qty, date, + f'因子{score:.3f}', score) + self.state['last_scan_date'] = date + self.state['last_scan_idx'] = day_idx + + # 5. 记录日志 + nav = self.get_nav() + self._log_daily(date, nav) + + # 更新净值历史(保留最近365天) + self.state.setdefault('nav_history', []).append({ + 'date': date, 'nav': nav['total_nav'], 'ret': nav['return_pct'] + }) + if len(self.state.get('nav_history', [])) > 365: + self.state['nav_history'] = self.state['nav_history'][-365:] + + self.save_state() + + # 日终报告 + log.info(f"📊 日终净值: ¥{nav['total_nav']:,.2f} " + f"({nav['return_pct']:+.2f}%) | " + f"现金 ¥{nav['cash']:,.0f} | " + f"持仓 {nav['holdings_count']}只") + + self._log_system(date, 'INFO', 'engine', + f"日终净值{nav['total_nav']:.2f} 收益{nav['return_pct']:+.2f}% 持仓{nav['holdings_count']}只") + + # ─── 报告 ─── + + def get_report(self): + """生成完整报告""" + nav = self.get_nav() + lines = [ + "=" * 50, + "📊 纯信号交易系统状态", + "=" * 50, + f"日期: {datetime.now().strftime('%Y-%m-%d %H:%M')}", + f"本金: ¥{self.initial_cash:,.0f}", + f"净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)", + f"现金: ¥{nav['cash']:,.0f}", + f"持仓: {nav['holdings_count']}只", + f"交易次数: {self.state.get('trade_count', 0)}", + "", + "策略参数:", + f" 止损: {self.stop_loss_pct*100:.0f}% | 止盈: +{self.take_profit_pct*100:.0f}%", + f" 单只上限: {self.max_position_pct*100:.0f}% | 最多: {self.max_holdings}只", + f" 扫描间隔: {self.scan_interval}天", + "", + ] + + if self.state['positions']: + lines.append("持仓明细:") + for code, pos in self.state['positions'].items(): + p = pos.get('current_price', pos['avg_cost']) + pnl = (p / pos['avg_cost'] - 1) * 100 + lines.append( + f" {pos['name']}({code}) {pos['qty']}股 " + f"@¥{pos['avg_cost']:.2f} 现¥{p:.2f} {pnl:+.1f}%" + ) + + # 最近交易 + db = sqlite3.connect(self._db_file()) + rows = db.execute( + 'SELECT date,code,name,direction,qty,price,pnl,pnl_pct,reason ' + 'FROM trades ORDER BY id DESC LIMIT 10' + ).fetchall() + db.close() + + if rows: + lines.append("\n最近交易:") + for r in rows: + d, code, name, direction, qty, price, pnl, pnl_pct, reason = r + if direction == 'buy': + lines.append(f" 🟢 {d} {name}({code}) {qty}股@¥{price:.2f}") + else: + lines.append( + f" {'✅' if (pnl or 0) > 0 else '❌'} {d} {name}({code}) " + f"{qty}股@¥{price:.2f} {pnl:+.0f}({pnl_pct:+.1f}%) [{reason}]" + ) + + return '\n'.join(lines) + + +# ===================================================================== +# CLI +# ===================================================================== +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description='纯信号交易引擎') + parser.add_argument('--action', default='daily', + choices=['daily', 'status', 'report', 'init', 'backfill']) + parser.add_argument('--date', default=None, help='指定日期 YYYYMMDD') + parser.add_argument('--cash', type=float, default=None, help='初始资金') + args = parser.parse_args() + + cfg = load_config() + if args.cash: + cfg['initial_cash'] = args.cash + + engine = TradingEngine(cfg) + + if args.action == 'init': + log.info("初始化交易系统...") + engine.save_state() + log.info(f"初始资金: ¥{engine.initial_cash:,.0f}") + print(engine.get_report()) + + elif args.action == 'daily': + engine.run_daily(args.date) + + elif args.action == 'status' or args.action == 'report': + print(engine.get_report()) + + elif args.action == 'backfill': + """回填历史数据运行(用于初始化持仓)""" + log.info("回填模式:加载近期数据并建仓...") + engine.run_daily(args.date) diff --git a/scripts/scheduler.py b/scripts/scheduler.py new file mode 100644 index 0000000..8702a6a --- /dev/null +++ b/scripts/scheduler.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +""" +定时调度器 +========= +- 工作日 09:00 盘前准备 +- 工作日 09:30 开盘检查 +- 工作日 每30分钟 止损止盈检查 + 补仓 +- 工作日 15:10 日终报告 +- 每周六 10:00 周报 + +同时启动HTTP状态接口(8888端口) +""" + +import os, sys, json, logging, traceback +from datetime import datetime, timedelta + +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.triggers.cron import CronTrigger + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +APP_DIR = os.path.dirname(BASE_DIR) +LOG_DIR = os.path.join(APP_DIR, 'logs') +DATA_DIR = os.path.join(APP_DIR, 'data') +CONFIG_DIR = os.path.join(APP_DIR, 'config') +sys.path.insert(0, BASE_DIR) + +from logging.handlers import TimedRotatingFileHandler + +# ─── 日志 ─── +fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') +log = logging.getLogger('scheduler') +log.setLevel(logging.INFO) +ch = logging.StreamHandler(sys.stdout) +ch.setFormatter(fmt) +log.addHandler(ch) +fh = TimedRotatingFileHandler(os.path.join(LOG_DIR, 'scheduler.log'), when='midnight', backupCount=90, encoding='utf-8') +fh.setFormatter(fmt) +log.addHandler(fh) + +# ─── 引擎 ─── +from engine import TradingEngine, load_config + +cfg = load_config() +engine = TradingEngine(cfg) + +# ─── 调度任务 ─── + +def is_trading_day(): + """简单判断是否交易日(周一~周五)""" + return datetime.now().weekday() < 5 + + +def job_pre_market(): + """盘前准备""" + if not is_trading_day(): + return + log.info("━━━ 盘前准备 ━━━") + try: + # 刷新股票池缓存 + engine.dm.get_pool_codes() + log.info("✅ 盘前准备完成") + except Exception as e: + log.error(f"盘前准备失败: {e}") + traceback.print_exc() + + +def job_market_check(): + """盘中检查(止损止盈 + 补仓)""" + if not is_trading_day(): + return + now = datetime.now() + if now.hour < 9 or now.hour >= 15: + return + if now.hour == 9 and now.minute < 30: + return + + log.info(f"━━━ 盘中检查 {now.strftime('%H:%M')} ━━━") + try: + date = now.strftime('%Y%m%d') + + # 获取今日价格(批量) + daily = engine.dm.get_daily_batch(date) + + # 更新持仓价格 + updated = 0 + for code in engine.state['positions']: + if code in daily: + p = daily[code]['close'] + if p > 0: + engine.state['positions'][code]['current_price'] = p + updated += 1 + log.info(f"更新持仓价格: {updated}/{len(engine.state['positions'])}只") + + # 止损止盈检查 + prices = {code: d['close'] for code, d in daily.items() if d['close'] > 0} + triggered = engine.check_stop_loss_take_profit(date, prices) + + for reason, code, price, pnl_pct in triggered: + r = '止损' if reason == 'stop_loss' else '止盈' + engine.sell(code, price, date, f'{r}{pnl_pct:+.1f}%') + + # 如果有卖出,尝试补仓 + if triggered: + log.info("触发卖出,准备补仓扫描...") + # 加载因子所需数据(最近300天) + stock_data, trade_dates = engine.dm.load_all_history(days=300) + if date in trade_dates: + code_date_idx = {} + for code, sd in stock_data.items(): + code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])} + + buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices) + for code, name, price, qty, score in buy_list: + engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score) + + day_idx = trade_dates.index(date) + engine.state['last_scan_date'] = date + engine.state['last_scan_idx'] = day_idx + + # 补仓检查(非触发情况下也要看间隔) + else: + last_scan = engine.state.get('last_scan_idx', -engine.scan_interval) + if engine.state['positions']: + # 有持仓但可能需要扫描 + pass + elif not engine.state['positions']: + # 空仓,尝试建仓 + stock_data, trade_dates = engine.dm.load_all_history(days=300) + if date in trade_dates: + code_date_idx = {} + for code, sd in stock_data.items(): + code_date_idx[code] = {dt: i for i, dt in enumerate(sd['dates'])} + + buy_list = engine.scan_buy(date, stock_data, code_date_idx, prices) + for code, name, price, qty, score in buy_list: + engine.buy(code, name, price, qty, date, f'因子{score:.3f}', score) + + day_idx = trade_dates.index(date) + engine.state['last_scan_date'] = date + engine.state['last_scan_idx'] = day_idx + + engine.save_state() + + nav = engine.get_nav() + log.info(f"📊 净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%) | " + f"持仓 {nav['holdings_count']}只") + + except Exception as e: + log.error(f"盘中检查失败: {e}") + traceback.print_exc() + + +def job_daily_report(): + """日终报告""" + if not is_trading_day(): + return + log.info("━━━ 日终报告 ━━━") + try: + date = datetime.now().strftime('%Y%m%d') + + # 获取收盘价更新 + daily = engine.dm.get_daily_batch(date) + for code in engine.state['positions']: + if code in daily: + p = daily[code]['close'] + if p > 0: + engine.state['positions'][code]['current_price'] = p + + nav = engine.get_nav() + engine._log_daily(date, nav) + engine.save_state() + + report = engine.get_report() + log.info(f"\n{report}") + + # 写入今日报告文件 + report_file = os.path.join(LOG_DIR, f"daily_{date}.txt") + with open(report_file, 'w') as f: + f.write(report) + + except Exception as e: + log.error(f"日终报告失败: {e}") + traceback.print_exc() + + +def job_weekly_review(): + """周报""" + log.info("━━━ 周报 ━━━") + try: + db_path = engine._db_file() + import sqlite3 + db = sqlite3.connect(db_path) + + # 本周交易统计 + week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime('%Y%m%d') + trades = db.execute( + 'SELECT direction, COUNT(*), SUM(pnl) FROM trades WHERE date >= ? GROUP BY direction', + (week_start,) + ).fetchall() + + # 净值曲线 + navs = db.execute( + 'SELECT date, total_nav, return_pct FROM daily_log WHERE date >= ? ORDER BY date', + (week_start,) + ).fetchall() + + db.close() + + nav = engine.get_nav() + log.info(f"本周净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)") + + for direction, count, pnl in trades: + log.info(f" {direction}: {count}笔, 盈亏 {pnl or 0:+,.0f}") + + if navs: + log.info(f" 净值变化: ¥{navs[0][1]:,.0f} → ¥{navs[-1][1]:,.0f}") + + except Exception as e: + log.error(f"周报失败: {e}") + + +# ─── HTTP状态接口 ─── + +def start_http_server(): + """简易HTTP状态接口""" + from http.server import HTTPServer, BaseHTTPRequestHandler + import json + + class StatusHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == '/status' or self.path == '/': + nav = engine.get_nav() + positions = [] + for code, pos in engine.state['positions'].items(): + p = pos.get('current_price', pos['avg_cost']) + positions.append({ + 'code': code, 'name': pos['name'], + 'qty': pos['qty'], 'avg_cost': pos['avg_cost'], + 'current_price': p, + 'pnl_pct': round((p / pos['avg_cost'] - 1) * 100, 2), + }) + data = { + 'status': 'running', + 'time': datetime.now().isoformat(), + 'nav': nav, + 'positions': positions, + 'trade_count': engine.state.get('trade_count', 0), + } + self.send_response(200) + self.send_header('Content-Type', 'application/json; charset=utf-8') + self.end_headers() + self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode()) + elif self.path == '/report': + self.send_response(200) + self.send_header('Content-Type', 'text/plain; charset=utf-8') + self.end_headers() + self.wfile.write(engine.get_report().encode()) + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format, *args): + pass # 静默HTTP日志 + + server = HTTPServer(('0.0.0.0', 8888), StatusHandler) + server.timeout = 1 + return server + + +# ─── 主入口 ─── + +if __name__ == '__main__': + log.info("=" * 60) + log.info("🚀 纯信号交易系统启动") + log.info("=" * 60) + + # 打印当前状态 + nav = engine.get_nav() + log.info(f"初始资金: ¥{engine.initial_cash:,.0f}") + log.info(f"当前净值: ¥{nav['total_nav']:,.2f} ({nav['return_pct']:+.2f}%)") + log.info(f"持仓: {nav['holdings_count']}只") + + # 启动HTTP + http_server = start_http_server() + log.info("HTTP状态接口: http://0.0.0.0:8888/status") + + # 调度器 + sched = BlockingScheduler() + + # 盘前准备(工作日 09:00) + sched.add_job(job_pre_market, CronTrigger(day_of_week='mon-fri', hour=9, minute=0), + id='pre_market', name='盘前准备') + + # 盘中检查(工作日 9:30~15:00,每30分钟) + for h, m in [(9,30), (10,0), (10,30), (11,0), (11,30), (13,0), (13,30), (14,0), (14,30)]: + sched.add_job(job_market_check, CronTrigger(day_of_week='mon-fri', hour=h, minute=m), + id=f'check_{h}{m:02d}', name=f'盘中检查{h}:{m:02d}') + + # 日终报告(工作日 15:10) + sched.add_job(job_daily_report, CronTrigger(day_of_week='mon-fri', hour=15, minute=10), + id='daily_report', name='日终报告') + + # 周报(周六 10:00) + sched.add_job(job_weekly_review, CronTrigger(day_of_week='sat', hour=10, minute=0), + id='weekly_review', name='周报') + + log.info("调度任务:") + for job in sched.get_jobs(): + log.info(f" {job.name}: {job.trigger}") + + # 集成HTTP到调度循环 + import threading + def run_http(): + while True: + http_server.handle_request() + + http_thread = threading.Thread(target=run_http, daemon=True) + http_thread.start() + + # 启动调度 + try: + sched.start() + except (KeyboardInterrupt, SystemExit): + log.info("系统关闭") + http_server.shutdown() diff --git a/scripts/start.sh b/scripts/start.sh new file mode 100644 index 0000000..bd3cc6c --- /dev/null +++ b/scripts/start.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# 交易系统启动脚本(直接在host上运行) +set -e + +BASE_DIR="$(cd "$(dirname "$0")/.." && pwd)" +SCRIPTS_DIR="$BASE_DIR/scripts" +LOG_DIR="$BASE_DIR/logs" +DATA_DIR="$BASE_DIR/data" + +mkdir -p "$LOG_DIR" "$DATA_DIR" + +# 检查依赖 +python3 -c "import tushare, numpy, pandas" 2>/dev/null || { + echo "安装依赖..." + pip3 install --break-system-packages tushare numpy pandas APScheduler +} + +# 设置环境变量 +export TZ=Asia/Shanghai +export TUSHARE_TOKEN="${TUSHARE_TOKEN:-4348b35f0b5c7a85b988264e4962be1ca57e0f46faf6148806e5ebd4}" + +echo "========================================" +echo "🚀 启动纯信号交易系统" +echo "========================================" +echo "日志目录: $LOG_DIR" +echo "数据目录: $DATA_DIR" +echo "========================================" + +exec python3 -u "$SCRIPTS_DIR/scheduler.py" diff --git a/scripts/trade_tool.py b/scripts/trade_tool.py new file mode 100644 index 0000000..41a0a30 --- /dev/null +++ b/scripts/trade_tool.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +工具脚本:手动执行交易操作 +""" + +import sys, os, argparse +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from engine import TradingEngine, load_config + +def main(): + parser = argparse.ArgumentParser(description='交易工具') + sub = parser.add_subparsers(dest='cmd') + + # 状态 + sub.add_parser('status', help='查看状态') + + # 初始化 + p_init = sub.add_parser('init', help='初始化(重置资金)') + p_init.add_argument('--cash', type=float, default=100000) + + # 手动买入 + p_buy = sub.add_parser('buy', help='手动买入') + p_buy.add_argument('code', help='股票代码 如 600519.SH') + p_buy.add_argument('--name', default='') + p_buy.add_argument('--price', type=float, required=True) + p_buy.add_argument('--qty', type=int, required=True) + p_buy.add_argument('--reason', default='手动买入') + + # 手动卖出 + p_sell = sub.add_parser('sell', help='手动卖出') + p_sell.add_argument('code', help='股票代码') + p_sell.add_argument('--price', type=float, required=True) + p_sell.add_argument('--reason', default='手动卖出') + + # 运行一天 + p_run = sub.add_parser('run', help='运行指定日期') + p_run.add_argument('--date', default=None) + + # 交易历史 + p_hist = sub.add_parser('history', help='交易历史') + p_hist.add_argument('--limit', type=int, default=20) + + # 净值曲线 + sub.add_parser('nav', help='净值历史') + + args = parser.parse_args() + cfg = load_config() + engine = TradingEngine(cfg) + + if args.cmd == 'status' or args.cmd is None: + print(engine.get_report()) + + elif args.cmd == 'init': + engine.state = { + 'cash': args.cash, 'positions': {}, + 'last_scan_date': None, 'last_scan_idx': -engine.scan_interval, + 'trade_count': 0, 'created': engine.state.get('created'), + 'nav_history': [], + } + engine.save_state() + print(f"✅ 已重置,初始资金: ¥{args.cash:,.0f}") + + elif args.cmd == 'buy': + name = args.name or args.code + engine.buy(args.code, name, args.price, args.qty, + datetime.now().strftime('%Y%m%d'), args.reason) + engine.save_state() + + elif args.cmd == 'sell': + engine.sell(args.code, args.price, + datetime.now().strftime('%Y%m%d'), args.reason) + engine.save_state() + + elif args.cmd == 'run': + engine.run_daily(args.date) + + elif args.cmd == 'history': + import sqlite3 + db = sqlite3.connect(engine._db_file()) + rows = db.execute( + 'SELECT date,code,name,direction,qty,price,pnl,pnl_pct,reason ' + 'FROM trades ORDER BY id DESC LIMIT ?', (args.limit,) + ).fetchall() + db.close() + print(f"最近 {args.limit} 笔交易:") + for r in rows: + d, code, name, direction, qty, price, pnl, pnl_pct, reason = r + if direction == 'buy': + print(f" 🟢 {d} {name}({code}) {qty}股@¥{price:.2f}") + else: + print(f" {'✅' if (pnl or 0) > 0 else '❌'} {d} {name}({code}) " + f"{qty}股@¥{price:.2f} ¥{pnl or 0:+,.0f}({pnl_pct or 0:+.1f}%) [{reason}]") + + elif args.cmd == 'nav': + history = engine.state.get('nav_history', []) + if not history: + print("无净值历史") + else: + print("净值历史:") + for h in history: + bar = '█' * max(0, int(h['ret'] / 2)) if h['ret'] > 0 else '░' * max(0, int(-h['ret'] / 2)) + print(f" {h['date']}: ¥{h['nav']:>10,.2f} {h['ret']:>+6.2f}% {bar}") + + +if __name__ == '__main__': + main()