Skip to content

Commit

Permalink
Implement Provisioning Queue Modes and API, various Job related fixes…
Browse files Browse the repository at this point in the history
… (CFM-26, CFM-154, CFM-253, CFM-286)
  • Loading branch information
Benn Oshrin committed Dec 26, 2025
1 parent 0fc877a commit 87dd415
Show file tree
Hide file tree
Showing 18 changed files with 522 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,11 @@ protected function syncEntity(
// We have a currently provisioned record and the subject is not Deleted,
// patch it with $data and try saving.
$patchedEntity = $SpTable->patchEntity($curEntity, $data->toArray(), ['validate' => false]);

// We always force an update to the modified timestamp in order to allow status()
// to report the last time provisioning ran. Cake has a touch() function as part
// of TimestampBehavior to do this, but our use of auto-tables means we can't use that.
$patchedEntity->modified = new \Cake\I18n\DateTime();

$SpTable->saveOrFail(
$patchedEntity,
Expand Down
9 changes: 9 additions & 0 deletions app/config/routes.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,20 @@
$builder->applyMiddleware('bodyparser');
// Use setPass to make parameter show up as function parameter
// Model specific actions, which will usually have more specific URLs:
// Note that while the UI uses dashes in URL paths, because we use {model} for the generic
// CRUD operations below we use underscores for consistency. Also, actions used here must
// be authorized in the relevant model permissions (even though they'll be used by ApiV2Controller
// and not the model's native controller).
$builder->post(
'/api_users/generate/{id}',
['controller' => 'ApiV2', 'action' => 'generateApiKey', 'model' => 'api_users'])
->setPass(['id'])
->setPatterns(['id' => '[0-9]+']);
$builder->post(
'/provisioning_targets/provision/{id}',
['controller' => 'ApiV2', 'action' => 'provision', 'model' => 'provisioning_targets'])
->setPass(['id'])
->setPatterns(['id' => '[0-9]+']);
// These establish the usual CRUD options on all models:
$builder->delete(
'/{model}/{id}', ['controller' => 'ApiV2', 'action' => 'delete'])
Expand Down
3 changes: 3 additions & 0 deletions app/config/schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@
"status": {},
"provisioning_group_id": { "type": "integer", "foreignkey": { "table": "groups", "column": "id" } },
"retry_interval": { "type": "integer" },
"max_retry": { "type": "integer" },
"ordr": {}
},
"indexes": {
Expand Down Expand Up @@ -758,7 +759,9 @@
"parameters": { "type": "text" },
"requeue_interval": { "type": "integer" },
"retry_interval": { "type": "integer" },
"max_retry": { "type": "integer" },
"requeued_from_job_id": { "type": "integer", "foreignkey": { "table": "jobs", "column": "id" }},
"retry_count": { "type": "integer" },
"status": {},
"assigned_host": { "type": "string", "size": 64 },
"assigned_pid": { "type": "integer" },
Expand Down
20 changes: 17 additions & 3 deletions app/plugins/CoreJob/src/Lib/Jobs/ProvisionerJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use Cake\ORM\TableRegistry;
use \App\Lib\Enum\JobStatusEnum;
use \App\Lib\Enum\ProvisionerModeEnum;
use \App\Lib\Enum\ProvisioningContextEnum;

class ProvisionerJob {
/**
Expand Down Expand Up @@ -97,7 +98,12 @@ protected function processEntity(
int &$lastPct
): bool {
try {
$EntityTable->requestProvisioning($entityId, $model, $target->id);
$EntityTable->requestProvisioning(
id: $entityId,
context: ProvisioningContextEnum::Queue,
provisioningTargetId: $target->id,
job: $job
);

$JobHistoryRecordsTable->record(
jobId: $job->id,
Expand Down Expand Up @@ -186,9 +192,17 @@ public function run(
}

if(!empty($parameters['entities'])) {
// We have one or more explicitly specified entities to process
// We have one or more explicitly specified entities to process.
// Entities might be an int (single request) or comma separated string
// (because PHP).

$ids = [];

$ids = explode(',', $parameters['entities']);
if(is_int($parameters['entities'])) {
$ids[] = $parameters['entities'];
} else {
$ids = explode(',', $parameters['entities']);
}

$count = count($ids);

Expand Down
12 changes: 12 additions & 0 deletions app/resources/locales/en_US/command.po
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,24 @@ msgstr "Queue runner {0} requesting assignment {1} for CO {2}"
msgid "job.run.child.running"
msgstr "Queue runner {0} processing job ID {1}"

msgid "job.run.done.empty"
msgstr "No jobs left in the queue, queue runner done processing CO {0}"

msgid "job.run.done.max"
msgstr "Queue runner reached maximum number of jobs ({0}), exiting"

msgid "job.run.max"
msgstr "Reached max queue runner count ({0}), waiting for one to exit"

msgid "job.run.piddone"
msgstr "Queue runner PID {0} completed"

msgid "job.run.request"
msgstr "Queue runner requesting assignment {0} for CO {1}"

msgid "job.run.running"
msgstr "Queue runner processing job ID {0}"

msgid "job.run.start"
msgstr "Launching queue runner {0} (of {1}) for CO {2}"

Expand Down
3 changes: 3 additions & 0 deletions app/resources/locales/en_US/error.po
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ msgstr "Job Plugin not provided"
msgid "Jobs.failed.abnormal"
msgstr "The Job terminated unexpectedly"

msgid "Jobs.failed.abnormal.count"
msgstr "The Job terminated unexpectedly ({0} open transaction/s)"

msgid "Jobs.plugin.parameter.bool"
msgstr "Boolean values may only be 0 or 1"

Expand Down
18 changes: 18 additions & 0 deletions app/resources/locales/en_US/field.po
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,12 @@ msgstr "Finish Summary"
msgid "Jobs.finish_time"
msgstr "Finished"

msgid "Jobs.max_retry"
msgstr "Max Retries"

msgid "Jobs.max_retry.desc"
msgstr "The maximum number of times this Job will be retried on failure"

msgid "Jobs.percent_complete"
msgstr "Percent Complete"

Expand All @@ -693,6 +699,12 @@ msgstr "After the job successfully completes, it will automatically be requeued
msgid "Jobs.requeued_from_job_id"
msgstr "Requeued From Job"

msgid "Jobs.retry_count"
msgstr "Retry Count"

msgid "Jobs.retry_count.desc"
msgstr "The retry count for this Job instance"

msgid "Jobs.retry_interval"
msgstr "Retry Interval"

Expand Down Expand Up @@ -935,6 +947,12 @@ msgstr "Subject Object Type"
msgid "ProvisioningHistoryRecords.subjectid"
msgstr "Subject Object ID"

msgid "ProvisioningTargets.max_retry"
msgstr "Max Retries"

msgid "ProvisioningTargets.max_retry.desc"
msgstr "For Queue and Queue On Error modes, the maximum number of times to retry on failure (set to 0 to retry indefinitely)"

msgid "ProvisioningTargets.ordr.desc"
msgstr "The order in which this provisioner will be run when provisioning occurs (leave blank to run after all current provisioners)"

Expand Down
6 changes: 6 additions & 0 deletions app/resources/locales/en_US/result.po
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ msgstr "Sent Update Match Attributes Request"
msgid "ProvisioningTargets.queued.ok"
msgstr "Reprovisioning for {0} queued for {1} ({2})"

msgid "ProvisioningTargets.queued.error.ok"
msgstr "Provisioning queued for {0} ({1}): {2}"

msgid "ProvisioningTargets.queued.queue.ok"
msgstr "Provisioning queued for {0} ({1})"

msgid "removed"
msgstr "removed"

Expand Down
118 changes: 98 additions & 20 deletions app/src/Command/JobCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ public function buildOptionParser(ConsoleOptionParser $parser): ConsoleOptionPar
'short' => 'j',
'help' => __d('command', 'opt.job.plugin')
]
)->addOption(
/* )->addOption(
'parallel',
[
'required' => false,
'short' => 'p',
'default' => '1',
'help' => __d('command', 'opt.job.parallel')
]
]*/
)->addOption(
'max',
[
Expand Down Expand Up @@ -143,20 +143,66 @@ public function execute(Arguments $args, ConsoleIo $io)
// The total number of actively running children
$pidcount = 0;

// The maximum number of runners to run at any one time
$max = (int)$args->getOption('max');
// The maximum number of jobs a queue runner will process before exiting
$maxjobs = (int)$args->getOption('max');
// The number of parallel runners for each CO
$parallel = (int)$args->getOption('parallel');
// $parallel = (int)$args->getOption('parallel');

// The maximum number of jobs a queue runner will process before exiting
$maxjobs = 100;
// The number of jobs launched (across all COs we're processing)
$jobcount = 0;

// Initialize the CoIdEventListener, which will be used to set the CO ID for
// (eg) tables with CO specific validation rules
$CoIdEventListener = new CoIdEventListener();
EventManager::instance()->on($CoIdEventListener);

foreach($coIds as $coId) {
// We probably need to do something like this (from synchronous running, below)
// $CoIdEventListener = new CoIdEventListener((int)$args->getOption('co_id'));
// EventManager::instance()->on($CoIdEventListener);
// but this wouldn't remove the previous $coId, so for now Sync Jobs can't be run
// via the queue. See CFM-400.
// Update the CoIdEventListener's CO ID. This works because EventManager
// is calling by reference, not copy.
$CoIdEventListener->updateCoId((int)$args->getOption('co_id'));

while($jobcount <= $maxjobs) {
$jobcount++;

$io->verbose(__d('command', 'job.run.request', [$jobcount, $coId]));

// Request a job to run
$job = $JobTable->assignNext($coId);

if(!$job) {
// Nothing to do, move on to the next CO
$io->verbose(__d('command', 'job.run.done.empty', [$coId]));
continue 2;
}

$io->verbose(__d('command', 'job.run.running', [$job->id]));

try {
$JobTable->process($job);
}
catch(\Exception $e) {
// The only Exception would be if the Job is in an invalid state,
// which shouldn't happen because we just ran assignNext()
$io->error($e->getMessage());
}

// Confirm the Job was properly finished. This was originally intended
// for use with fork()/wait(), but should work well enough here.
$JobTable->confirmFinished(getmypid());
}

if($jobcount > $maxjobs) {
$io->verbose(__d('command', 'job.run.done.max', [$maxjobs]));
break;
}

/* When calling pcntl_fork(), the child process inherits the same
file descriptors as the parent, including (problematically) the
connections to the database server. There doesn't seem to be a
functional way to generate new file descriptors, and requiring
Job plugins to use an alternate database connection creates an
implausible burden (see how complicated CloneCommand is, for
comparison). So for now we'll just disable $parallel.
// We start counting from 1 rather than 0 to simplify console output
for($i = 1;$i <= $parallel;$i++) {
Expand All @@ -179,12 +225,16 @@ public function execute(Arguments $args, ConsoleIo $io)
// configuration and create a new configuration on the fly so we don't have
// to pollute the database config file.
ConnectionManager::setConfig('plugin', ConnectionManager::getConfig('default'));
// XXX this doesn't seem to work, so plugins must always access the 'plugin' database, at least for now (CFM-253)
// ConnectionManager::alias('plugin', 'default');
$cxn = ConnectionManager::get('plugin');
$config = ConnectionManager::getConfig('default');
$JobTable->setConnection($cxn);
// This doesn't work
// ConnectionManager::drop('default');
// ConnectionManager::setConfig('default', $config);
// This doesn't work either, and isn't plausible anyway
// ConnectionManager::alias('plugin', 'default');
// $cxn = ConnectionManager::get('plugin');
// $JobTable->setConnection($cxn);
for($j = 1;$j <= $maxjobs;$j++) {
$io->verbose(__d('command', 'job.run.child.request', [$i, $j, $coId]));
Expand All @@ -201,7 +251,7 @@ public function execute(Arguments $args, ConsoleIo $io)
$io->verbose(__d('command', 'job.run.child.running', [$newPid, $job->id]));
try {
$JobTable->process($job);
$JobTable->process($job, 'plugin');
}
catch(\Exception $e) {
// The only Exception would be if the Job is in an invalid state,
Expand Down Expand Up @@ -232,9 +282,10 @@ public function execute(Arguments $args, ConsoleIo $io)
$io->verbose(__d('command', 'job.run.piddone', [$pid]));
$pidcount--;
}
}
}*/
}

/*
// We are the parent, and we're done launching queue runners. wait() for them.
while($pidcount > 0) {
$io->out(__d('command', 'job.run.waiting', $pidcount));
Expand All @@ -248,7 +299,7 @@ public function execute(Arguments $args, ConsoleIo $io)
$io->verbose(__d('command', 'job.run.piddone', [$pid]));
$pidcount--;
}
}*/
} else {
// We have a specific job to process. Note that JobCommand can't require -j
// since the -r usage doesn't need it, so we have to check for it manually.
Expand Down Expand Up @@ -292,6 +343,33 @@ public function execute(Arguments $args, ConsoleIo $io)

$JobTable->process($job);
}

// Check that the plugin terminated correctly without leaving any open
// transactions. We don't need to do this when running the queue because
// confirmFinished() will check for stragglers.

$cxn = ConnectionManager::get('default');

$txnCount = 0;

while($cxn->inTransaction()) {
// We need to clear out any transactions in order to properly close the job

$txnCount++;
$cxn->rollback();
}

// Note we'e overwriting $job
$job = $JobTable->get($job->id);

if(!$job->isFinished()) {
// Terminate the job
$JobTable->finish(
job: $job,
summary: __d('error', 'Jobs.failed.abnormal.count', [$txnCount]),
result: JobStatusEnum::Failed
);
}
}
}
}
Loading

0 comments on commit 87dd415

Please sign in to comment.