Files
ccdi/lsfx-mock-server/tests/test_credit_api.py

125 lines
4.4 KiB
Python

from routers import credit_api
def mapping_fields(response):
return response.json()["data"]["mappingOutputFields"]
def mock_remote_html(monkeypatch, sample_credit_html_file):
monkeypatch.setattr(credit_api, "fetch_remote_html", lambda remote_path: sample_credit_html_file[1])
def test_credit_parse_should_initiate_and_return_result(client, monkeypatch, sample_credit_html_file):
mock_remote_html(monkeypatch, sample_credit_html_file)
serial_num = "CCDI_CREDIT_TEST_001"
initiate_response = client.post(
"/api/service/interface/invokeService/xfeature",
data={
"serialNum": serial_num,
"orgCode": "999000",
"runType": "1",
"remotePath": "http://127.0.0.1:62318/profile/credit-html/sample.html",
"model": "LXCUSTALL",
},
)
assert initiate_response.status_code == 200
initiate_data = initiate_response.json()
assert initiate_data["success"] is True
assert initiate_data["code"] == 10000
assert initiate_data["data"]["status"] == 1
assert initiate_data["data"]["reasonCode"] == 200
assert "文件写入成功" in mapping_fields(initiate_response)["message"]
assert "payload" not in mapping_fields(initiate_response)
result_response = client.post(
"/api/service/interface/invokeService/xfeatureResult",
data={
"serialNum": serial_num,
"orgCode": "999000",
"runType": "1",
},
)
assert result_response.status_code == 200
result_data = result_response.json()
assert result_data["data"]["status"] == 1
assert result_data["data"]["reasonCode"] == 200
result_fields = mapping_fields(result_response)
assert result_fields["status_code"] == "0"
assert result_fields["message"] == "成功"
assert result_fields["payload"]["lx_header"]["query_cust_name"] == "测试员工"
assert result_fields["payload"]["lx_header"]["query_cert_no"] == "320101199001010030"
assert "uncle_credit_card_bal" in result_fields["payload"]["lx_debt"]
assert "uncle_credit_cart_bal" not in result_fields["payload"]["lx_debt"]
def test_credit_parse_should_return_err_99999_for_missing_model(client, monkeypatch, sample_credit_html_file):
mock_remote_html(monkeypatch, sample_credit_html_file)
response = client.post(
"/api/service/interface/invokeService/xfeature",
data={
"serialNum": "CCDI_CREDIT_TEST_002",
"orgCode": "999000",
"runType": "1",
"remotePath": "http://127.0.0.1:62318/profile/credit-html/sample.html",
},
)
assert response.status_code == 200
assert response.json()["data"]["status"] == 0
assert response.json()["data"]["reasonCode"] == 500
assert mapping_fields(response)["status_code"] == "ERR_99999"
assert "model" in mapping_fields(response)["message"]
def test_credit_parse_should_support_debug_error_marker(client, monkeypatch, sample_credit_html_file):
mock_remote_html(monkeypatch, sample_credit_html_file)
response = client.post(
"/api/service/interface/invokeService/xfeature",
data={
"serialNum": "CCDI_CREDIT_TEST_003",
"orgCode": "999000",
"runType": "1",
"remotePath": "http://127.0.0.1:62318/profile/credit-html/sample.html",
"model": "error_ERR_10001",
},
)
assert response.status_code == 200
assert mapping_fields(response)["status_code"] == "ERR_10001"
def test_credit_result_should_return_err_99999_for_unknown_serial_num(client):
response = client.post(
"/api/service/interface/invokeService/xfeatureResult",
data={
"serialNum": "CCDI_CREDIT_NOT_EXISTS",
"orgCode": "999000",
"runType": "1",
},
)
assert response.status_code == 200
result_fields = mapping_fields(response)
assert result_fields["status_code"] == "ERR_99999"
assert "征信解析结果未返回" in result_fields["message"]
def test_credit_result_should_return_err_99999_for_missing_serial_num(client):
response = client.post(
"/api/service/interface/invokeService/xfeatureResult",
data={
"orgCode": "999000",
"runType": "1",
},
)
assert response.status_code == 200
result_fields = mapping_fields(response)
assert result_fields["status_code"] == "ERR_99999"
assert "serialNum" in result_fields["message"]