Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
msimerson committed May 12, 2024
1 parent 2118379 commit ed3d132
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 68 deletions.
103 changes: 52 additions & 51 deletions outbound/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,75 +232,76 @@ exports.send_trans_email = function (transaction, next) {
transaction.results = new ResultStore(connection);
}

connection.pre_send_trans_email_respond = retval => {
connection.pre_send_trans_email_respond = async (retval) => {
const deliveries = get_deliveries(transaction);
const hmails = [];
const ok_paths = [];

let todo_index = 1;

async.forEachSeries(deliveries, (deliv, cb) => {
const todo = new TODOItem(deliv.domain, deliv.rcpts, transaction);
todo.uuid = `${todo.uuid}.${todo_index}`;
todo_index++;
this.process_delivery(ok_paths, todo, hmails, cb);
},
(err) => {
if (err) {
for (let i=0, l=ok_paths.length; i<l; i++) {
fs.unlink(ok_paths[i], () => {});
}
transaction.results.add({ name: 'outbound'}, { err });
if (next) next(constants.denysoft, err);
return;
try {
for (const deliv of deliveries) {
const todo = new TODOItem(deliv.domain, deliv.rcpts, transaction);
todo.uuid = `${todo.uuid}.${todo_index}`;
todo_index++;
await this.process_delivery(ok_paths, todo, hmails);
}

for (const hmail of hmails) {
delivery_queue.push(hmail);
}
catch (err) {
for (let i=0, l=ok_paths.length; i<l; i++) {
fs.unlink(ok_paths[i], () => {});
}
transaction.results.add({ name: 'outbound'}, { err });
if (next) next(constants.denysoft, err);
return;
}

transaction.results.add({ name: 'outbound'}, { pass: "queued" });
if (next) {
next(constants.ok, `Message Queued (${transaction.uuid})`);
}
});
for (const hmail of hmails) {
delivery_queue.push(hmail);
}

transaction.results.add({ name: 'outbound'}, { pass: "queued" });
if (next) next(constants.ok, `Message Queued (${transaction.uuid})`);
}

plugins.run_hooks('pre_send_trans_email', connection);
}

exports.process_delivery = function (ok_paths, todo, hmails, cb) {
logger.info(exports, `Transaction delivery for domain: ${todo.domain}`);
const fname = _qfile.name();
const tmp_path = path.join(queue_dir, `${_qfile.platformDOT}${fname}`);
const ws = new FsyncWriteStream(tmp_path, { flags: constants.WRITE_EXCL });
exports.process_delivery = function (ok_paths, todo, hmails) {
return new Promise((resolve, reject) => {

logger.info(exports, `Transaction delivery for domain: ${todo.domain}`);
const fname = _qfile.name();
const tmp_path = path.join(queue_dir, `${_qfile.platformDOT}${fname}`);
const ws = new FsyncWriteStream(tmp_path, { flags: constants.WRITE_EXCL });

ws.on('close', () => {
const dest_path = path.join(queue_dir, fname);
fs.rename(tmp_path, dest_path, err => {
if (err) {
logger.error(exports, `Unable to rename tmp file!: ${err}`);
fs.unlink(tmp_path, () => {});
reject("Queue error");
}
else {
hmails.push(new HMailItem (fname, dest_path, todo.notes));
ok_paths.push(dest_path);
resolve();
}
})
})

ws.on('close', () => {
const dest_path = path.join(queue_dir, fname);
fs.rename(tmp_path, dest_path, err => {
if (err) {
logger.error(exports, `Unable to rename tmp file!: ${err}`);
fs.unlink(tmp_path, () => {});
cb("Queue error");
}
else {
hmails.push(new HMailItem (fname, dest_path, todo.notes));
ok_paths.push(dest_path);
cb();
}
ws.on('error', err => {
logger.error(exports, `Unable to write queue file (${fname}): ${err}`);
ws.destroy();
fs.unlink(tmp_path, () => {});
reject("Queueing failed");
})
})

ws.on('error', err => {
logger.error(exports, `Unable to write queue file (${fname}): ${err}`);
ws.destroy();
fs.unlink(tmp_path, () => {});
cb("Queueing failed");
this.build_todo(todo, ws, () => {
todo.message_stream.pipe(ws, { dot_stuffing: true });
});
})

this.build_todo(todo, ws, () => {
todo.message_stream.pipe(ws, { dot_stuffing: true });
});
}

exports.build_todo = (todo, ws, write_more) => {
Expand Down
5 changes: 2 additions & 3 deletions outbound/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ exports.load_queue = pid => {
}

exports._load_cur_queue = (pid, iteratee, cb) => {
const self = exports;
logger.info(exports, "Loading outbound queue from ", queue_dir);
fs.readdir(queue_dir, (err, files) => {
if (err) {
return logger.error(exports, `Failed to load queue directory (${queue_dir}): ${err}`);
}

self.cur_time = new Date(); // set once so we're not calling it a lot
this.cur_time = new Date(); // set once so we're not calling it a lot

self.load_queue_files(pid, files, iteratee, cb);
this.load_queue_files(pid, files, iteratee, cb);
});
}

Expand Down
25 changes: 11 additions & 14 deletions plugins/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,22 @@ exports.pool_list = cb => {
}

exports.queue_list = function (cb) {
this.outbound.list_queue((err, qlist) => {
if (err) cb(err);

this.outbound.list_queue((err, qlist = []) => {
const result = [];

if (qlist) {
qlist.forEach((todo) => result.push({
for (const todo of qlist) {
result.push({
file: todo.file,
uuid: todo.uuid,
queue_time: todo.queue_time,
domain: todo.domain,
from: todo.mail_from.toString(),
to: todo.rcpt_to.map((r) => r.toString())
}));
})
}

cb(err, result);
});
})
}

exports.queue_stats = function (cb) {
Expand Down Expand Up @@ -164,14 +162,14 @@ exports.queue_push = function (file, cb) {
// cluster IPC

exports.hook_init_master = function (next) {
const self = this;
const plugin = this;

if (!server.cluster) return next();

function message_handler (sender, msg) {
if (msg.event !== 'status.request') return;

self.call_workers(msg, (response) => {
plugin.call_workers(msg, (response) => {
msg.result = response.filter((el) => el != null);
msg.event = 'status.result';
sender.send(msg);
Expand Down Expand Up @@ -213,11 +211,10 @@ exports.call_master = (cmd, cb) => {
}

exports.call_workers = function (cmd, cb) {
const promises = []
for (const w of server.cluster.workers) {
promises.push(this.call_worker(w, cmd))
}
Promise.allSettled(promises).then(r => {
Promise.allSettled(
Object.values(server.cluster.workers).map(w => this.call_worker(w, cmd))
)
.then(r => {
cb(
// r.filter(s => s.status === 'rejected').flatMap(s => s.reason),
r.filter(s => s.status === 'fulfilled').flatMap(s => s.value),
Expand Down

0 comments on commit ed3d132

Please sign in to comment.