class MerrittZK::Access

Merritt Object Assembly Queue Item

Constants

DIR
LARGE
PREFIX
SMALL

Public Class Methods

acquire_pending_assembly(zk, queue_name) click to toggle source
# File lib/merritt_zk_access.rb, line 49
def self.acquire_pending_assembly(zk, queue_name)
  zk.children(Access.dir(queue_name)).sort.each do |cp|
    a = Access.new(queue_name, cp)
    a.load(zk)
    next unless a.status == AccessState::Pending

    begin
      return a if a.lock(zk)
    rescue ZK::Exceptions::NodeExists
      # no action
    end
  end
  nil
end
create_assembly(zk, queue_name, token) click to toggle source
# File lib/merritt_zk_access.rb, line 41
def self.create_assembly(zk, queue_name, token)
  id = QueueItem.create_id(zk, prefix_path(queue_name))
  access = Access.new(queue_name, id, data: token)
  access.set_data(zk, ZkKeys::TOKEN, token)
  access.set_status(zk, AccessState.init)
  access
end
dir(queue_name) click to toggle source
# File lib/merritt_zk_access.rb, line 29
def self.dir(queue_name)
  "#{DIR}/#{queue_name}"
end
list_jobs_as_json(zk) click to toggle source

List jobs as a json object that will be consumed by the admin tool. This is a transitional representation that can be compatible with legacy job listings.

# File lib/merritt_zk_access.rb, line 76
def self.list_jobs_as_json(zk)
  jobs = []
  [SMALL, LARGE].each do |queue|
    zk.children("#{DIR}/#{queue}").sort.each do |cp|
      job = Access.new(queue, cp)
      job.load(zk)
      jobjson = job.data
      jobjson[:id] = cp
      jobjson[:queueNode] = Access.dir(queue)
      jobjson[:path] = job.path
      jobjson[:queueStatus] = jobjson[:status]
      jobjson[:status] = job.status.status
      jobjson[:date] = job.json_property(zk, ZkKeys::STATUS).fetch(:last_modified, '')
      jobs.append(jobjson)
    rescue StandardError => e
      puts "List Access #{cp} exception: #{e}"
    end
  end
  jobs
end
new(queue_name, id, data: nil) click to toggle source
Calls superclass method
# File lib/merritt_zk_access.rb, line 16
def initialize(queue_name, id, data: nil)
  super(id, data: data)
  @queue_name = queue_name
end
prefix_path(queue_name) click to toggle source
# File lib/merritt_zk_access.rb, line 33
def self.prefix_path(queue_name)
  "#{dir(queue_name)}/#{PREFIX}"
end

Public Instance Methods

delete(zk) click to toggle source
# File lib/merritt_zk_access.rb, line 64
def delete(zk)
  raise MerrittZK::MerrittStateError, "Delete invalid #{path}" unless @status.deletable?

  return if path.nil? || path.empty?

  # puts "DELETE #{path}"
  zk.rm_rf(path)
end
load_properties(zk) click to toggle source
# File lib/merritt_zk_access.rb, line 21
def load_properties(zk)
  @data = json_property(zk, ZkKeys::TOKEN)
end
path() click to toggle source
# File lib/merritt_zk_access.rb, line 37
def path
  "#{Access.dir(@queue_name)}/#{@id}"
end
states() click to toggle source
# File lib/merritt_zk_access.rb, line 25
def states
  AccessState.states
end