fix: 修复中介导入成功条数计算错误

问题:
- 导入成功条数显示为负数
- 原因:成功数量计算使用 validRecords.size() - failures.size()
- 但没有使用实际的数据库操作返回值

修复:
- saveBatchWithUpsert 和 saveBatch 方法现在返回 int
- 累加实际的数据库影响行数
- 使用 actualSuccessCount 变量跟踪真实成功数量

影响范围:
- CcdiIntermediaryPersonImportServiceImpl
- CcdiIntermediaryEntityImportServiceImpl
This commit is contained in:
wkc
2026-02-08 17:18:18 +08:00
parent bb0d68c41d
commit 5ec5913759
2058 changed files with 234134 additions and 269 deletions

View File

@@ -0,0 +1,74 @@
"use strict";
var utils = require("../utils");
var GenericWorker = require("../stream/GenericWorker");
/**
* A worker that use a nodejs stream as source.
* @constructor
* @param {String} filename the name of the file entry for this stream.
* @param {Readable} stream the nodejs stream.
*/
function NodejsStreamInputAdapter(filename, stream) {
GenericWorker.call(this, "Nodejs stream input adapter for " + filename);
this._upstreamEnded = false;
this._bindStream(stream);
}
utils.inherits(NodejsStreamInputAdapter, GenericWorker);
/**
* Prepare the stream and bind the callbacks on it.
* Do this ASAP on node 0.10 ! A lazy binding doesn't always work.
* @param {Stream} stream the nodejs stream to use.
*/
NodejsStreamInputAdapter.prototype._bindStream = function (stream) {
var self = this;
this._stream = stream;
stream.pause();
stream
.on("data", function (chunk) {
self.push({
data: chunk,
meta : {
percent : 0
}
});
})
.on("error", function (e) {
if(self.isPaused) {
this.generatedError = e;
} else {
self.error(e);
}
})
.on("end", function () {
if(self.isPaused) {
self._upstreamEnded = true;
} else {
self.end();
}
});
};
NodejsStreamInputAdapter.prototype.pause = function () {
if(!GenericWorker.prototype.pause.call(this)) {
return false;
}
this._stream.pause();
return true;
};
NodejsStreamInputAdapter.prototype.resume = function () {
if(!GenericWorker.prototype.resume.call(this)) {
return false;
}
if(this._upstreamEnded) {
this.end();
} else {
this._stream.resume();
}
return true;
};
module.exports = NodejsStreamInputAdapter;

View File

@@ -0,0 +1,42 @@
"use strict";
var Readable = require("readable-stream").Readable;
var utils = require("../utils");
utils.inherits(NodejsStreamOutputAdapter, Readable);
/**
* A nodejs stream using a worker as source.
* @see the SourceWrapper in http://nodejs.org/api/stream.html
* @constructor
* @param {StreamHelper} helper the helper wrapping the worker
* @param {Object} options the nodejs stream options
* @param {Function} updateCb the update callback.
*/
function NodejsStreamOutputAdapter(helper, options, updateCb) {
Readable.call(this, options);
this._helper = helper;
var self = this;
helper.on("data", function (data, meta) {
if (!self.push(data)) {
self._helper.pause();
}
if(updateCb) {
updateCb(meta);
}
})
.on("error", function(e) {
self.emit("error", e);
})
.on("end", function () {
self.push(null);
});
}
NodejsStreamOutputAdapter.prototype._read = function() {
this._helper.resume();
};
module.exports = NodejsStreamOutputAdapter;