class MerrittZK::Job

Merritt Ingest Job Queue item

Constants

DIR
PREFIX

Attributes

bid[R]
priority[R]
space_needed[R]

Public Class Methods

acquire_job(zk, state) click to toggle source
# File lib/merritt_zk_job.rb, line 154
def self.acquire_job(zk, state)
  p = "#{DIR}/states/#{state.name.downcase}"
  return nil unless zk.exists?(p)

  zk.children(p).sort.each do |cp|
    j = Job.new(cp[3..]).load(zk)
    begin
      return j if j.lock(zk)
    rescue ZK::Exceptions::NodeExists
      # no action
    end
  end
  nil
end
create_job(zk, bid, data, priority: 5, identifiers: {}, metadata: {}) click to toggle source
# File lib/merritt_zk_job.rb, line 130
def self.create_job(zk, bid, data, priority: 5, identifiers: {}, metadata: {})
  id = QueueItem.create_id(zk, prefix_path)
  job = Job.new(id, bid: bid, data: data, identifiers: identifiers, metadata: metadata)
  job.set_data(zk, ZkKeys::BID, bid)
  job.set_data(zk, ZkKeys::PRIORITY, job.priority)
  job.set_data(zk, ZkKeys::SPACE_NEEEDED, job.space_needed)
  job.set_data(zk, ZkKeys::CONFIGURATION, data)
  job.set_data(zk, ZkKeys::IDENTIFIERS, identifiers) unless identifiers.empty?
  job.set_data(zk, ZkKeys::METADATA, metadata) unless metadata.empty?
  job.set_status_with_priority(zk, JobState.init, priority)
  job.set_job_state_path(zk)
  job.set_batch_state_path(zk)
  job
end
list_jobs_as_json(zk) click to toggle source

List jobs as a json object that will be consumed by the admin tool. This is a transitional representation that can be compatible with legacy job listings.

# File lib/merritt_zk_job.rb, line 189
def self.list_jobs_as_json(zk)
  jobs = []
  zk.children(DIR).sort.each do |cp|
    next if cp == ZkKeys::STATES

    begin
      job = Job.new(cp)
      job.load_optimized(zk)
      jobjson = job.data
      jobjson[:id] = job.id
      jobjson[:bid] = job.bid
      jobjson[:status] = job.status_name
      jobs.append(jobjson)
    rescue StandardError => e
      puts "List Job #{cp} exception: #{e}"
    end
  end
  jobs
end
new(id, bid: nil, data: nil, identifiers: {}, metadata: {}) click to toggle source
Calls superclass method
# File lib/merritt_zk_job.rb, line 14
def initialize(id, bid: nil, data: nil, identifiers: {}, metadata: {})
  super(id, data: data)
  @bid = bid
  @priority = 5
  @space_needed = 0
  @job_state_path = nil
  @batch_state_path = nil
  @retry_count = 0
  @identifiers = identifiers
  @metadata = metadata
  @inventory = {}
end
prefix_path() click to toggle source
# File lib/merritt_zk_job.rb, line 122
def self.prefix_path
  "#{DIR}/#{PREFIX}"
end

Public Instance Methods

batch_state_subpath() click to toggle source
# File lib/merritt_zk_job.rb, line 82
def batch_state_subpath
  return 'batch-failed' if @status.status == :Failed
  return 'batch-completed' if @status.status == :Completed
  return 'batch-deleted' if @status.status == :Deleted

  'batch-processing'
end
creator() click to toggle source
# File lib/merritt_zk_job.rb, line 213
def creator
  data_prop('creator', '')
end
delete(zk) click to toggle source
# File lib/merritt_zk_job.rb, line 169
def delete(zk)
  raise MerrittZK::MerrittStateError, "Delete invalid #{path}" unless @status.deletable?

  unless @job_state_path.nil?
    # puts "DELETE #{@job_state_path}"
    zk.rm_rf(@job_state_path)
  end
  unless @batch_state_path.nil?
    # puts "DELETE #{@batch_state_path}"
    zk.rm_rf(@batch_state_path)
  end
  return if path.nil? || path.empty?

  # puts "DELETE #{path}"
  zk.rm_rf(path)
end
filename() click to toggle source
# File lib/merritt_zk_job.rb, line 225
def filename
  data_prop('filename', '')
end
load_optimized(zk) click to toggle source

for the admin tool

# File lib/merritt_zk_job.rb, line 33
def load_optimized(zk)
  raise MerrittZKNodeInvalid, "Missing Node #{path}" unless zk.exists?(path)

  load_status(zk, json_property(zk, ZkKeys::STATUS))
  @data = json_property(zk, ZkKeys::CONFIGURATION)
  @bid = string_property(zk, ZkKeys::BID)
  self
end
load_properties(zk) click to toggle source
# File lib/merritt_zk_job.rb, line 42
def load_properties(zk)
  @data = json_property(zk, ZkKeys::CONFIGURATION)
  @bid = string_property(zk, ZkKeys::BID)
  @priority = int_property(zk, ZkKeys::PRIORITY)
  @space_needed = int_property(zk, ZkKeys::SPACE_NEEEDED)
  @identifiers = json_property(zk, ZkKeys::IDENTIFIERS) if zk.exists?("#{path}/#{ZkKeys::IDENTIFIERS}")
  @metadata = json_property(zk, ZkKeys::METADATA) if zk.exists?("#{path}/#{ZkKeys::METADATA}")
  @inventory = json_property(zk, ZkKeys::INVENTORY) if zk.exists?("#{path}/#{ZkKeys::INVENTORY}")
  set_job_state_path(zk)
  set_batch_state_path(zk)
end
load_status(zk, js) click to toggle source
Calls superclass method
# File lib/merritt_zk_job.rb, line 27
def load_status(zk, js)
  super(zk, js)
  @retry_count = js.fetch(:retry_count, 0)
end
path() click to toggle source
# File lib/merritt_zk_job.rb, line 126
def path
  "#{DIR}/#{@id}"
end
profile() click to toggle source
# File lib/merritt_zk_job.rb, line 217
def profile
  data_prop('profile', '')
end
response_form() click to toggle source
# File lib/merritt_zk_job.rb, line 221
def response_form
  data_prop('responseForm', '')
end
set_batch_state_path(zk) click to toggle source
# File lib/merritt_zk_job.rb, line 90
def set_batch_state_path(zk)
  bs = format('%s/%s/states/%s/%s', Batch.dir, @bid, batch_state_subpath, id)
  return if bs == @batch_state_path

  zk.delete(@batch_state_path) if @batch_state_path
  @batch_state_path = bs
  return if zk.exists?(@batch_state_path)

  p = File.dirname(@batch_state_path)
  pp = File.dirname(p)
  zk.create(pp, data: nil) unless zk.exists?(pp)
  zk.create(p, data: nil) unless zk.exists?(p)
  zk.create(@batch_state_path, data: nil)
end
set_job_state_path(zk) click to toggle source
# File lib/merritt_zk_job.rb, line 105
def set_job_state_path(zk)
  js = format('%s/states/%s/%02d-%s', DIR, status.name.downcase, priority, id)
  return if js == @job_state_path

  zk.delete(@job_state_path) if @job_state_path
  @job_state_path = js
  return if zk.exists?(@job_state_path)

  p = File.dirname(@job_state_path)
  zk.create(p, data: nil) unless zk.exists?(p)
  zk.create(@job_state_path, data: nil)
end
set_priority(zk, priority) click to toggle source
# File lib/merritt_zk_job.rb, line 56
def set_priority(zk, priority)
  return if priority == @priority

  @priority = priority
  set_data(zk, ZkKeys::PRIORITY, priority)
end
set_space_needed(zk, space_needed) click to toggle source
# File lib/merritt_zk_job.rb, line 63
def set_space_needed(zk, space_needed)
  return if space_needed == @space_needed

  @space_needed = space_needed
  set_data(zk, ZkKeys::SPACE_NEEEDED, space_needed)
end
set_status(zk, status, message = '', job_retry: false) click to toggle source
Calls superclass method
# File lib/merritt_zk_job.rb, line 70
def set_status(zk, status, message = '', job_retry: false)
  @retry_count += 1 if job_retry
  super(zk, status, message)
  set_job_state_path(zk)
  set_batch_state_path(zk)
end
set_status_with_priority(zk, status, priority) click to toggle source
# File lib/merritt_zk_job.rb, line 77
def set_status_with_priority(zk, status, priority)
  set_priority(zk, priority)
  set_status(zk, status)
end
states() click to toggle source
# File lib/merritt_zk_job.rb, line 118
def states
  JobState.states
end
status_object(status) click to toggle source
# File lib/merritt_zk_job.rb, line 145
def status_object(status)
  {
    status: status.name,
    last_successful_status: nil,
    last_modified: Time.now.to_s,
    retry_count: @retry_count
  }
end
submitter() click to toggle source
# File lib/merritt_zk_job.rb, line 209
def submitter
  data_prop('submitter', '')
end
title() click to toggle source
# File lib/merritt_zk_job.rb, line 237
def title
  data_prop('title', '')
end
type() click to toggle source
# File lib/merritt_zk_job.rb, line 233
def type
  data_prop('type', '')
end
udpate() click to toggle source
# File lib/merritt_zk_job.rb, line 229
def udpate
  data_prop('update', false)
end