class MerrittZK::QueueItem

Base class for Merritt Queue Items that follow similar conventions

Attributes

data[R]
id[R]
status[R]

Public Class Methods

create_id(zk, prefix) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 109
def self.create_id(zk, prefix)
  path = zk.create(prefix, data: nil, mode: :persistent_sequential, ignore: :no_node)
  path.split('/')[-1]
end
new(id, data: nil) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 28
def initialize(id, data: nil)
  @id = id
  @data = data
  @status = nil
end
serialize(v) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 100
def self.serialize(v)
  return nil if v.nil?
  return v.to_s if v.is_a?(Integer)
  return v unless v.is_a?(Hash)
  return nil if v.empty?

  v.to_json
end

Public Instance Methods

data_prop(_prop, defval) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 143
def data_prop(_prop, defval)
  return defval if @data.nil?

  @data.fetch(defval, '')
end
int_property(zk, key) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 78
def int_property(zk, key)
  string_property(zk, key).to_i
end
json_property(zk, key) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 69
def json_property(zk, key)
  s = string_property(zk, key)
  begin
    JSON.parse(s, symbolize_names: true)
  rescue StandardError
    raise MerrittZKNodeInvalid, "Node Object for (#{p}) does not contain valid json: #{s}"
  end
end
load(zk) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 44
def load(zk)
  raise MerrittZKNodeInvalid, "Missing Node #{path}" unless zk.exists?(path)

  load_status(zk, json_property(zk, ZkKeys::STATUS))
  load_properties(zk)
  self
end
load_properties(zk) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 57
def load_properties(zk)
  # Job will override
end
load_status(_zk, js) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 52
def load_status(_zk, js)
  s = js.fetch(:status, 'na').to_sym
  @status = states.fetch(s, nil)
end
lock(zk) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 135
def lock(zk)
  zk.create("#{path}/#{ZkKeys::LOCK}", data: nil, mode: :ephemeral)
end
path() click to toggle source
# File lib/merritt_zk_queue_item.rb, line 92
def path
  'na'
end
set_data(zk, key, data) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 82
def set_data(zk, key, data)
  p = "#{path}/#{key}"
  d = QueueItem.serialize(data)
  if zk.exists?(p)
    zk.set(p, d)
  else
    zk.create(p, data: d)
  end
end
set_status(zk, status, message = '') click to toggle source
# File lib/merritt_zk_queue_item.rb, line 121
def set_status(zk, status, message = '')
  return if status == @status

  json = status_object(status)
  json[:message] = message
  data = QueueItem.serialize(json)
  if @status.nil?
    zk.create(status_path, data)
  else
    zk.set(status_path, data)
  end
  @status = status
end
states() click to toggle source
# File lib/merritt_zk_queue_item.rb, line 40
def states
  {}
end
status_name() click to toggle source
# File lib/merritt_zk_queue_item.rb, line 36
def status_name
  status.name
end
status_object(status) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 114
def status_object(status)
  {
    status: status.name,
    last_modified: Time.now.to_s
  }
end
status_path() click to toggle source
# File lib/merritt_zk_queue_item.rb, line 96
def status_path
  "#{path}/status"
end
string_property(zk, key) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 61
def string_property(zk, key)
  p = "#{path}/#{key}"
  arr = zk.get(p)
  raise MerrittZKNodeInvalid, "Node Object for (#{p}) missing" if arr.nil?

  arr[0]
end
unlock(zk) click to toggle source
# File lib/merritt_zk_queue_item.rb, line 139
def unlock(zk)
  zk.delete("#{path}/#{ZkKeys::LOCK}")
end