| 1 |
module BackgrounDRb |
|---|
| 2 |
# A Worker proxy, which uses +method_missing+ for delegating method calls to the workers |
|---|
| 3 |
class RailsWorkerProxy |
|---|
| 4 |
attr_accessor :worker_name, :worker_method, :data, :worker_key,:middle_man |
|---|
| 5 |
|
|---|
| 6 |
# create new worker proxy |
|---|
| 7 |
def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil) |
|---|
| 8 |
@worker_name = p_worker_name |
|---|
| 9 |
@middle_man = p_middle_man |
|---|
| 10 |
@worker_key = p_worker_key |
|---|
| 11 |
@tried_connections = [] |
|---|
| 12 |
end |
|---|
| 13 |
|
|---|
| 14 |
def method_missing(method_id,*args) |
|---|
| 15 |
worker_method = method_id.to_s |
|---|
| 16 |
arguments = args.first |
|---|
| 17 |
|
|---|
| 18 |
arg,job_key,host_info,scheduled_at = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at) |
|---|
| 19 |
new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc |
|---|
| 20 |
|
|---|
| 21 |
if worker_method =~ |
|---|
| 22 |
method_name = $1 |
|---|
| 23 |
worker_options = compact(:worker => worker_name,:worker_key => worker_key, |
|---|
| 24 |
:worker_method => method_name,:job_key => job_key, :arg => arg) |
|---|
| 25 |
run_method(host_info,:ask_work,worker_options) |
|---|
| 26 |
elsif worker_method =~ |
|---|
| 27 |
raise NoJobKey.new("Must specify a job key with enqueued tasks") if job_key.blank? |
|---|
| 28 |
method_name = $1 |
|---|
| 29 |
marshalled_args = Marshal.dump(arg) |
|---|
| 30 |
enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s, |
|---|
| 31 |
:worker_method => method_name.to_s,:job_key => job_key.to_s, |
|---|
| 32 |
:args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil,:scheduled_at => new_schedule)) |
|---|
| 33 |
elsif worker_method =~ |
|---|
| 34 |
raise NoJobKey.new("Must specify a job key to dequeue tasks") if job_key.blank? |
|---|
| 35 |
method_name = $1 |
|---|
| 36 |
dequeue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s, |
|---|
| 37 |
:worker_method => method_name.to_s,:job_key => job_key.to_s)) |
|---|
| 38 |
else |
|---|
| 39 |
worker_options = compact(:worker => worker_name,:worker_key => worker_key, |
|---|
| 40 |
:worker_method => worker_method,:job_key => job_key,:arg => arg) |
|---|
| 41 |
run_method(host_info,:send_request,worker_options) |
|---|
| 42 |
end |
|---|
| 43 |
end |
|---|
| 44 |
|
|---|
| 45 |
# enqueue tasks to the worker pool |
|---|
| 46 |
def enqueue_task options = {} |
|---|
| 47 |
BdrbJobQueue.insert_job(options) |
|---|
| 48 |
end |
|---|
| 49 |
|
|---|
| 50 |
# remove tasks from the worker pool |
|---|
| 51 |
def dequeue_task options = {} |
|---|
| 52 |
BdrbJobQueue.remove_job(options) |
|---|
| 53 |
end |
|---|
| 54 |
|
|---|
| 55 |
# invoke method on worker |
|---|
| 56 |
def run_method host_info,method_name,worker_options = {} |
|---|
| 57 |
result = [] |
|---|
| 58 |
connection = choose_connection(host_info) |
|---|
| 59 |
raise NoServerAvailable.new("No BackgrounDRb server is found running") if connection.blank? |
|---|
| 60 |
if host_info == :local or host_info.is_a?(String) |
|---|
| 61 |
result << invoke_on_connection(connection,method_name,worker_options) |
|---|
| 62 |
elsif host_info == :all |
|---|
| 63 |
succeeded = false |
|---|
| 64 |
begin |
|---|
| 65 |
connection.each { |conn| result << invoke_on_connection(conn,method_name,worker_options) } |
|---|
| 66 |
succeeded = true |
|---|
| 67 |
rescue BdrbConnError; end |
|---|
| 68 |
raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded |
|---|
| 69 |
else |
|---|
| 70 |
@tried_connections = [connection.server_info] |
|---|
| 71 |
begin |
|---|
| 72 |
result << invoke_on_connection(connection,method_name,worker_options) |
|---|
| 73 |
rescue BdrbConnError => e |
|---|
| 74 |
connection = middle_man.find_next_except_these(@tried_connections) |
|---|
| 75 |
@tried_connections << connection.server_info |
|---|
| 76 |
retry |
|---|
| 77 |
end |
|---|
| 78 |
end |
|---|
| 79 |
return nil if method_name == :ask_work |
|---|
| 80 |
return_result(result) |
|---|
| 81 |
end |
|---|
| 82 |
|
|---|
| 83 |
# choose a backgroundrb server connection and invoke worker method on it. |
|---|
| 84 |
def invoke_on_connection connection,method_name,options = {} |
|---|
| 85 |
raise NoServerAvailable.new("No BackgrounDRb is found running") unless connection |
|---|
| 86 |
connection.send(method_name,options) |
|---|
| 87 |
end |
|---|
| 88 |
|
|---|
| 89 |
# get results back from the cache. Cache can be in-memory worker cache or memcache |
|---|
| 90 |
# based cache |
|---|
| 91 |
def ask_result job_key |
|---|
| 92 |
options = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key) |
|---|
| 93 |
if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' |
|---|
| 94 |
return_result_from_memcache(options) |
|---|
| 95 |
else |
|---|
| 96 |
result = middle_man.backend_connections.map { |conn| conn.ask_result(options) } |
|---|
| 97 |
return_result(result) |
|---|
| 98 |
end |
|---|
| 99 |
end |
|---|
| 100 |
|
|---|
| 101 |
# return runtime information about worker |
|---|
| 102 |
def worker_info |
|---|
| 103 |
t_connections = middle_man.backend_connections |
|---|
| 104 |
result = t_connections.map { |conn| conn.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) } |
|---|
| 105 |
return_result(result) |
|---|
| 106 |
end |
|---|
| 107 |
|
|---|
| 108 |
# generate worker key |
|---|
| 109 |
def gen_key options |
|---|
| 110 |
key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_') |
|---|
| 111 |
key |
|---|
| 112 |
end |
|---|
| 113 |
|
|---|
| 114 |
# return result from memcache |
|---|
| 115 |
def return_result_from_memcache options = {} |
|---|
| 116 |
middle_man.cache[gen_key(options)] |
|---|
| 117 |
end |
|---|
| 118 |
|
|---|
| 119 |
# reset result within memcache for given key |
|---|
| 120 |
def reset_memcache_result(job_key,value) |
|---|
| 121 |
options = compact(:worker => worker_name,:worker_key => worker_key,\ |
|---|
| 122 |
:job_key => job_key) |
|---|
| 123 |
key = gen_key(options) |
|---|
| 124 |
middle_man.cache[key] = value |
|---|
| 125 |
value |
|---|
| 126 |
end |
|---|
| 127 |
|
|---|
| 128 |
def return_result result |
|---|
| 129 |
result = Array(result) |
|---|
| 130 |
result.size <= 1 ? result[0] : result |
|---|
| 131 |
end |
|---|
| 132 |
|
|---|
| 133 |
# delete a worker |
|---|
| 134 |
def delete |
|---|
| 135 |
middle_man.backend_connections.each do |connection| |
|---|
| 136 |
connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key)) |
|---|
| 137 |
end |
|---|
| 138 |
return worker_key |
|---|
| 139 |
end |
|---|
| 140 |
|
|---|
| 141 |
# choose a worker |
|---|
| 142 |
def choose_connection host_info |
|---|
| 143 |
case host_info |
|---|
| 144 |
when :all; middle_man.backend_connections |
|---|
| 145 |
when :local; middle_man.find_local |
|---|
| 146 |
when String; middle_man.find_connection(host_info) |
|---|
| 147 |
else; middle_man.choose_server |
|---|
| 148 |
end |
|---|
| 149 |
end |
|---|
| 150 |
|
|---|
| 151 |
# helper method to compact a hash and for getting rid of nil parameters |
|---|
| 152 |
def compact(options = { }) |
|---|
| 153 |
options.delete_if { |key,value| value.nil? } |
|---|
| 154 |
options |
|---|
| 155 |
end |
|---|
| 156 |
end # end of RailsWorkerProxy class |
|---|
| 157 |
|
|---|
| 158 |
end # end of BackgrounDRb module |
|---|