Package org.cdlib.mrt.zk


package org.cdlib.mrt.zk

Merritt Queue Design

The Merritt Ingest Queue will be refactored in 2024 to enable a number of goals.
  • Match ingest workload to available resources (compute, memory, working storage)
  • dynamically provision resources to match demand
  • dynamically manage running thread count based on processing load
  • Hold jobs based on temporary holds (collection lock, storage node lock, queue hold)
  • Graceful resumption of processing in progress
  • Allow processing to be resumed on any ingest host. The previous implementation managed state in memory which prevented this capability
  • Accurate notification of ingest completion (including inventory recording)
  • Send accurate summary email on completion of a batch regardless of any interruption that occurred while processing

Batch Queue vs Job Queue

The work of the Merritt Ingest Service takes place at a Job level. Merritt Depositors initiate submissions at a Batch level. The primary function of the Batch Queue is to provide notification to a depositor once all jobs for a batch have completed.

Use of ZooKeeper

Merritt Utilizes ZooKeeper for the following features
  • Creation/validation of distributed (ephemeral node) locks
  • Creation of unique node names across the distributed node structure (sequential nodes)
  • Manage data across the distributed node structure to allow any worker to acquire a job/batch (persistent nodes)

The ZooKeeper documentation advises keeping the payload of shared data relatively small.

The Merritt ZooKeeper design sames read-only data as JSON objects.

More volatile (read/write) fields are saved as Int, Long, String and very small JSON objects.

Code Examples

Create Batch

 ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null);
 JSONObject batchSub = new JSONObject("...");
 Batch batch = Batch.createBatch(zk, batchSub);
 

Consumer Daemon Acquires Batch and Creates Jobs

 // An ephemeral lock is created when the batch is acquired
 // The ephemeral lock will be released when the ZooKeeper connection is closed
 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Batch batch = Batch.acquirePendingBatch(zk)
   JSONObject jobConfig = Job.createJobConfiguration(...);
   Job j = Job.createJob(zk, batch.id(), jobConfig);
 }
 

Consumer Daemon Acquires Pending Job and Moves Job to Estimating

 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Job jj = Job.acquireJob(zk, JobState.Pending);
   jj.setStatus(zk, jj.status().stateChange(JobState.Estimating));
 }
 

Consumer Daemon Acquires Estimating Job and Updates Priority

 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Job jj = Job.acquireJob(zk, JobState.Estimating);
   jj.setPriority(3);
   jj.setStatus(zk, jj.status().success()));
 }
 

Acquire Completed Batch, Perform Reporting

 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Batch batch = Batch.acquireBatchForReporting(zk);
   //Notify depositor of job status
   batch.setStatus(zk, batch.status().success()));
   //Admin thread will perform batch.delete(zk);
 }
 

Create Assembly Request

 ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null);
 JSONObject tokenData = new JSONObject("...");
 JSONObject tokenData = Access.createTokenData(...)
 Access access = Access.createAssembly(zk, Access.Queues.small, tokenData);
 

Consumer Daemon Acquires Assembly Request

 // An ephemeral lock is created when the batch is acquired
 // The ephemeral lock will be released when the ZooKeeper connection is closed
 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Access access = Batch.acquirePendingAssembly(zk, Access.Queues.small);
   //Do stuff
   access.setStatus(zk, AccessState.Processing);
 }
 
See Also:
  • Class
    Description
    Class to manage a Merritt Access Assembly Request.
    Access Assembly Queue names
    Access Assembly State Transitions
    Class to manage a Merritt Ingest Batch in the Batch Queue.
    Batch State Transitions
    Common interface for Ingest Queue State Enums
    Class to manage a Merritt Ingest Job in the Job Queue.
    Job State Transitions
    Lookup key names for properties stored as JSON within ZooKeeper nodes.
    Static methods to set and release Merritt Locks.
    Exception indicates that an illegal operation was invoked on an object with an incompatible state (ie attempting to delete a Job or Batch that is not in a deleteable state)
    Exception thown when attempting to modify the data payload for a ZooKeeper node.
    Base Class for Common functions for Merritt Ingest Batches and Merritt Ingest Jobs.
    Standardized path names for Merritt Zookeeper nodes
    Standardized path names for Merritt Zookeeper nodes
    Standardized prefix names for Merritt Zookeeper sequential nodes
    The static methods in this class also provides a simplified interface for common ZooKeeper API calls.
    Defines relative pathnames to ZooKeeper nodes for a Batch or a Job.