Methods
Attributes
| [RW] | size | |
| [RW] | threads | |
| [RW] | work_queue |
Public Class methods
[ show source ]
# File server/meta_worker.rb, line 29
29: def initialize(size)
30: @size = size
31: @threads = []
32: @work_queue = Queue.new
33: @running_tasks = Queue.new
34: @size.times { add_thread }
35: end
Public Instance methods
[ show source ]
# File server/meta_worker.rb, line 61
61: def add_thread
62: @threads << Thread.new do
63: while true
64: task = @work_queue.pop
65: @running_tasks << task
66: block_arity = task.block.arity
67: begin
68: block_arity == 0 ? task.block.call : task.block.call(*(task.data))
69: rescue
70: logger.info($!.to_s)
71: logger.info($!.backtrace.join("\n"))
72: end
73: @running_tasks.pop
74: end
75: end
76: end
can be used to make a call in threaded manner passed block runs in a thread from thread pool for example in a worker method you can do:
def fetch_url(url)
puts "fetching url #{url}"
thread_pool.defer(url) do |url|
begin
data = Net::HTTP.get(url,'/')
File.open("#{RAILS_ROOT}/log/pages.txt","w") do |fl|
fl.puts(data)
end
rescue
logger.info "Error downloading page"
end
end
end
you can invoke above method from rails as:
MiddleMan.ask_work(:worker => :rss_worker, :worker_method => :fetch_url, :data => "www.example.com")
assuming method is defined in rss_worker
[ show source ]
# File server/meta_worker.rb, line 57
57: def defer(*args,&block)
58: @work_queue << WorkData.new(args,&block)
59: end
method ensures exclusive run of deferred tasks for 2 seconds, so as they do get a chance to run.
[ show source ]
# File server/meta_worker.rb, line 79
79: def exclusive_run
80: if @running_tasks.empty? && @work_queue.empty?
81: return
82: else
83: # puts "going to sleep for a while"
84: sleep(0.05)
85: return
86: end
87: end