from routers import credit_api def mapping_fields(response): return response.json()["data"]["mappingOutputFields"] def mock_remote_html(monkeypatch, sample_credit_html_file): monkeypatch.setattr(credit_api, "fetch_remote_html", lambda remote_path: sample_credit_html_file[1]) def test_credit_parse_should_initiate_and_return_result(client, monkeypatch, sample_credit_html_file): mock_remote_html(monkeypatch, sample_credit_html_file) serial_num = "CCDI_CREDIT_TEST_001" initiate_response = client.post( "/api/service/interface/invokeService/xfeature", data={ "serialNum": serial_num, "orgCode": "999000", "runType": "1", "remotePath": "http://127.0.0.1:62318/profile/credit-html/sample.html", "model": "LXCUSTALL", }, ) assert initiate_response.status_code == 200 initiate_data = initiate_response.json() assert initiate_data["success"] is True assert initiate_data["code"] == 10000 assert initiate_data["data"]["status"] == 1 assert initiate_data["data"]["reasonCode"] == 200 assert "文件写入成功" in mapping_fields(initiate_response)["message"] assert "payload" not in mapping_fields(initiate_response) result_response = client.post( "/api/service/interface/invokeService/xfeatureResult", data={ "serialNum": serial_num, "orgCode": "999000", "runType": "1", }, ) assert result_response.status_code == 200 result_data = result_response.json() assert result_data["data"]["status"] == 1 assert result_data["data"]["reasonCode"] == 200 result_fields = mapping_fields(result_response) assert result_fields["status_code"] == "0" assert result_fields["message"] == "成功" assert result_fields["payload"]["lx_header"]["query_cust_name"] == "测试员工" assert result_fields["payload"]["lx_header"]["query_cert_no"] == "320101199001010030" assert "uncle_credit_card_bal" in result_fields["payload"]["lx_debt"] assert "uncle_credit_cart_bal" not in result_fields["payload"]["lx_debt"] def test_credit_parse_should_return_err_99999_for_missing_model(client, monkeypatch, sample_credit_html_file): mock_remote_html(monkeypatch, sample_credit_html_file) response = client.post( "/api/service/interface/invokeService/xfeature", data={ "serialNum": "CCDI_CREDIT_TEST_002", "orgCode": "999000", "runType": "1", "remotePath": "http://127.0.0.1:62318/profile/credit-html/sample.html", }, ) assert response.status_code == 200 assert response.json()["data"]["status"] == 0 assert response.json()["data"]["reasonCode"] == 500 assert mapping_fields(response)["status_code"] == "ERR_99999" assert "model" in mapping_fields(response)["message"] def test_credit_parse_should_support_debug_error_marker(client, monkeypatch, sample_credit_html_file): mock_remote_html(monkeypatch, sample_credit_html_file) response = client.post( "/api/service/interface/invokeService/xfeature", data={ "serialNum": "CCDI_CREDIT_TEST_003", "orgCode": "999000", "runType": "1", "remotePath": "http://127.0.0.1:62318/profile/credit-html/sample.html", "model": "error_ERR_10001", }, ) assert response.status_code == 200 assert mapping_fields(response)["status_code"] == "ERR_10001" def test_credit_result_should_return_err_99999_for_unknown_serial_num(client): response = client.post( "/api/service/interface/invokeService/xfeatureResult", data={ "serialNum": "CCDI_CREDIT_NOT_EXISTS", "orgCode": "999000", "runType": "1", }, ) assert response.status_code == 200 result_fields = mapping_fields(response) assert result_fields["status_code"] == "ERR_99999" assert "征信解析结果未返回" in result_fields["message"] def test_credit_result_should_return_err_99999_for_missing_serial_num(client): response = client.post( "/api/service/interface/invokeService/xfeatureResult", data={ "orgCode": "999000", "runType": "1", }, ) assert response.status_code == 200 result_fields = mapping_fields(response) assert result_fields["status_code"] == "ERR_99999" assert "serialNum" in result_fields["message"]