class MerrittZK::Batch

Merritt Batch Queue Items

Constants

BATCH_UUIDS
DIR
PREFIX

Attributes

has_failure[R]

Public Class Methods

acquire_batch_for_reporting_batch(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 91
def self.acquire_batch_for_reporting_batch(zk)
  zk.children(DIR).sort.each do |cp|
    next unless zk.exists?("#{DIR}/#{cp}/states/batch-processing")
    next unless zk.children("#{DIR}/#{cp}/states/batch-processing").empty?

    b = Batch.new(cp)
    b.load(zk)
    begin
      next if b.status == BatchState::Completed || b.status == BatchState::Failed

      if b.lock(zk)
        b.set_status(zk, BatchState::Reporting)
        return b
      end
    rescue ZK::Exceptions::NodeExists
      # no action
    end
  end
  nil
end
acquire_pending_batch(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 72
def self.acquire_pending_batch(zk)
  zk.children(DIR).sort.each do |cp|
    next if zk.exists?("#{DIR}/#{cp}/#{ZkKeys::STATES}")

    b = Batch.new(cp)
    b.load(zk)
    begin
      if b.lock(zk)
        b.set_data(zk, ZkKeys::STATES, nil)
        b.set_status(zk, BatchState::Processing)
        return b
      end
    rescue ZK::Exceptions::NodeExists
      # no action
    end
  end
  nil
end
batch_uuid_path(uuid) click to toggle source
# File lib/merritt_zk_batch.rb, line 52
def self.batch_uuid_path(uuid)
  "#{BATCH_UUIDS}/#{uuid}"
end
create_batch(zk, submission) click to toggle source
# File lib/merritt_zk_batch.rb, line 62
def self.create_batch(zk, submission)
  id = QueueItem.create_id(zk, prefix_path)
  batch = Batch.new(id, data: submission)
  uuid = submission.fetch(:batchID, '')
  zk.create(batch_uuid_path(uuid), id) unless uuid.empty?
  batch.set_data(zk, ZkKeys::SUBMISSION, submission)
  batch.set_status(zk, BatchState.init)
  batch
end
delete_completed_batches(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 112
def self.delete_completed_batches(zk)
  ids = []
  zk.children(DIR).sort.each do |cp|
    next unless zk.exists?("#{DIR}/#{cp}/states/batch-processing")
    next unless zk.children("#{DIR}/#{cp}/states/batch-processing").empty?

    b = Batch.new(cp)
    b.load(zk)
    begin
      next unless b.status == BatchState::Completed || b.status == BatchState::Deleted

      b.delete(zk)
      ids << b.id
    rescue ZK::Exceptions::NodeExists
      # no action
    end
  end
  ids
end
dir() click to toggle source
# File lib/merritt_zk_batch.rb, line 40
def self.dir
  DIR.to_s
end
find_batch_by_uuid(zk, uuid) click to toggle source
# File lib/merritt_zk_batch.rb, line 159
def self.find_batch_by_uuid(zk, uuid)
  return if uuid.empty?

  p = batch_uuid_path(uuid)
  return unless zk.exists?(p)

  arr = zk.get(p)
  return if arr.nil?

  bid = arr[0]
  return if bid.empty?

  Batch.new(bid)
end
new(id, data: nil) click to toggle source
Calls superclass method
# File lib/merritt_zk_batch.rb, line 15
def initialize(id, data: nil)
  super(id, data: data)
  @has_failure = false
end
prefix_path() click to toggle source
# File lib/merritt_zk_batch.rb, line 44
def self.prefix_path
  "#{DIR}/#{PREFIX}"
end

Public Instance Methods

batch_uuid() click to toggle source
# File lib/merritt_zk_batch.rb, line 56
def batch_uuid
  return '' if @data.nil?

  @data.fetch(:batchID, '')
end
delete(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 174
def delete(zk)
  raise MerrittZK::MerrittStateError, "Delete invalid #{path}" unless @status.deletable?

  %w[batch-processing batch-failed batch-completed].each do |state|
    p = "#{path}/states/#{state}"
    next unless zk.exists?(p)

    zk.children(p).each do |cp|
      MerrittZK::Job.new(cp).load(zk).delete(zk)
    end
  end

  load(zk) if @data.nil?
  zk.delete(Batch.batch_uuid_path(batch_uuid)) unless batch_uuid.empty?

  return if path.nil? || path.empty?

  # puts "DELETE #{path}"
  zk.rm_rf(path)
end
get_completed_jobs(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 132
def get_completed_jobs(zk)
  get_jobs(zk, 'batch-completed')
end
get_deleted_jobs(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 136
def get_deleted_jobs(zk)
  get_jobs(zk, 'batch-deleted')
end
get_failed_jobs(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 140
def get_failed_jobs(zk)
  get_jobs(zk, 'batch-failed')
end
get_jobs(zk, state) click to toggle source
# File lib/merritt_zk_batch.rb, line 148
def get_jobs(zk, state)
  jobs = []
  p = "#{path}/states/#{state}"
  if zk.exists?(p)
    zk.children(p).each do |cp|
      jobs << Job.new(cp, bid: id)
    end
  end
  jobs
end
get_processing_jobs(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 144
def get_processing_jobs(zk)
  get_jobs(zk, 'batch-processing')
end
load_has_failure(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 22
def load_has_failure(zk)
  @has_failure = false
  p = "#{path}/states/batch-failed"
  return unless zk.exists?(p)
  return if zk.children(p).empty?

  @has_failure = true
end
load_properties(zk) click to toggle source
# File lib/merritt_zk_batch.rb, line 31
def load_properties(zk)
  @data = json_property(zk, ZkKeys::SUBMISSION)
  load_has_failure(zk)
end
path() click to toggle source
# File lib/merritt_zk_batch.rb, line 48
def path
  "#{DIR}/#{@id}"
end
states() click to toggle source
# File lib/merritt_zk_batch.rb, line 36
def states
  BatchState.states
end