| 1 |
module BackgrounDRb |
|---|
| 2 |
class WorkData |
|---|
| 3 |
attr_accessor :args,:block,:job_method,:persistent_job_id,:job_key |
|---|
| 4 |
def initialize(args,job_key,job_method,persistent_job_id) |
|---|
| 5 |
@args = args |
|---|
| 6 |
@job_key = job_key |
|---|
| 7 |
@job_method = job_method |
|---|
| 8 |
@persistent_job_id = persistent_job_id |
|---|
| 9 |
end |
|---|
| 10 |
end |
|---|
| 11 |
|
|---|
| 12 |
class ThreadPool |
|---|
| 13 |
attr_accessor :size,:threads,:work_queue,:logger |
|---|
| 14 |
attr_accessor :result_queue,:master |
|---|
| 15 |
|
|---|
| 16 |
def initialize(master,size,logger) |
|---|
| 17 |
@master = master |
|---|
| 18 |
@logger = logger |
|---|
| 19 |
@size = size |
|---|
| 20 |
@threads = [] |
|---|
| 21 |
@work_queue = Queue.new |
|---|
| 22 |
@size.times { add_thread } |
|---|
| 23 |
end |
|---|
| 24 |
|
|---|
| 25 |
# can be used to make a call in threaded manner |
|---|
| 26 |
# passed block runs in a thread from thread pool |
|---|
| 27 |
# for example in a worker method you can do: |
|---|
| 28 |
# def user_tags url |
|---|
| 29 |
# thread_pool.defer(:fetch_url,url) |
|---|
| 30 |
# end |
|---|
| 31 |
# def fetch_url(url) |
|---|
| 32 |
# begin |
|---|
| 33 |
# data = Net::HTTP.get(url,'/') |
|---|
| 34 |
# File.open("#{RAILS_ROOT}/log/pages.txt","w") do |fl| |
|---|
| 35 |
# fl.puts(data) |
|---|
| 36 |
# end |
|---|
| 37 |
# rescue |
|---|
| 38 |
# logger.info "Error downloading page" |
|---|
| 39 |
# end |
|---|
| 40 |
# end |
|---|
| 41 |
# you can invoke above method from rails as: |
|---|
| 42 |
# MiddleMan.worker(:rss_worker).async_user_tags(:arg => "en.wikipedia.org") |
|---|
| 43 |
# assuming method is defined in rss_worker |
|---|
| 44 |
|
|---|
| 45 |
def defer(method_name,args = nil) |
|---|
| 46 |
job_key = Thread.current[:job_key] |
|---|
| 47 |
persistent_job_id = Thread.current[:persistent_job_id] |
|---|
| 48 |
@work_queue << WorkData.new(args,job_key,method_name,persistent_job_id) |
|---|
| 49 |
end |
|---|
| 50 |
|
|---|
| 51 |
# Start worker threads |
|---|
| 52 |
def add_thread |
|---|
| 53 |
@threads << Thread.new do |
|---|
| 54 |
Thread.current[:job_key] = nil |
|---|
| 55 |
Thread.current[:persistent_job_id] = nil |
|---|
| 56 |
while true |
|---|
| 57 |
task = @work_queue.pop |
|---|
| 58 |
Thread.current[:job_key] = task.job_key |
|---|
| 59 |
Thread.current[:persistent_job_id] = task.persistent_job_id |
|---|
| 60 |
block_result = run_task(task) |
|---|
| 61 |
end |
|---|
| 62 |
end |
|---|
| 63 |
end |
|---|
| 64 |
|
|---|
| 65 |
# run tasks popped out of queue |
|---|
| 66 |
def run_task task |
|---|
| 67 |
block_arity = master.method(task.job_method).arity |
|---|
| 68 |
begin |
|---|
| 69 |
ActiveRecord::Base.verify_active_connections! |
|---|
| 70 |
t_data = task.args |
|---|
| 71 |
result = nil |
|---|
| 72 |
if block_arity != 0 |
|---|
| 73 |
result = master.send(task.job_method,task.args) |
|---|
| 74 |
else |
|---|
| 75 |
result = master.send(task.job_method) |
|---|
| 76 |
end |
|---|
| 77 |
return result |
|---|
| 78 |
rescue |
|---|
| 79 |
logger.info($!.to_s) |
|---|
| 80 |
logger.info($!.backtrace.join("\n")) |
|---|
| 81 |
return nil |
|---|
| 82 |
end |
|---|
| 83 |
end |
|---|
| 84 |
end #end of class ThreadPool |
|---|
| 85 |
end # end of module BackgrounDRb |
|---|
| 86 |
|
|---|