Workers are your building blocks of Asynchronous Task Processing. An empty auto generated worker looks like this:
class BillingWorker < BackgrounDRb::MetaWorker
set_worker_name :billing_worker
def create(args = nil)
# method gets called, when new instance of worker is created.
end
end
set_worker_name will set the worker name which can be later used while invoking tasks on the worker.
create method gets called when worker is loaded for the first time. If you are starting your worker
from rails, you can pass arguments to create method using:
MiddleMan.new_worker(:worker => :billing_worker,\
:worker_key => user_session,:data => current_user.id)
Using Workers
You can invoke random tasks on workers from rails or you can schedule them using config file. Look into Scheduling section for scheduling and Rails Integration section for invoking worker tasks from rails.
Inbuilt instance methods available in your workers:
cache: Can be used to store random results from worker which can be later retrieved from rails. For example:cache[key] = some_data
add_timer: Look in scheduler sectionadd_periodic_timer: Look in scheduler sectionthread_pool: Look belowconnect: Look in Advanced section. Used to connect to external TCP/IP servers.start_server: Look in Advanced section. Used to start TCP/IP server from worker.send_data: Can be used to send objects to master process. You can ignore this method.job_key: When you invoke a task from rails by passing a job_key, that job_key can be accessed in workers withjob_key. For example: From rails:MiddleMan.worker(:foo_worker).async_some_task(:arg => urls, :job_key => current_user[:id])Now thisjob_keycan be accessed inside workers with:class FooWorker < BackgrounDRb::MetaWorker def some_task urls .. do some work with urls .. cache[job_key] = result end end
Options via class methods :
Following class methods are available for further tuning of workers:
pool_size: Can be used to control thread pool size. Accepts pool size as integer value.set_no_auto_load: Can be used to disable auto loading of workers when BackgrounDRb starts. Accepts true or false.reload_on_schedule: Can be used to enable reloading of worker at scheduled execution time. Accepts true or false.set_worker_name: Can be used to set worker name. Accepts symbol as worker name.
Following snippet demonstrates their usages:
class HelloWorker < BackgrounDRb::MetaWorker set_worker_name :hello_worker reload_on_schedule true pool_size 10 end
When reload_on_schedule is true, worker won’t be loaded while BackgrounDRb starts and hence you don’t need
set_no_auto_load option there.
Remember BackgrounDRb follows event model of network programming, but sad truth of life is not all networking libraries follow this model and hence they make use of blocking IO and threads. BackgrounDRb allows you to run all such tasks concurrently in threads which are internally managed by BackgrounDRb thread pool.
Each worker has access to object thread_pool which can be used to run task in a thread concurrently.
thread_pool.defer(:scrap_wikipedia,scrap_url)
So whatever code you write within scrap_wikipedia method is going to run concurrently.
WARNING: Many of the Ruby libraries out there aren’t thread safe and they may not work as advertised when used from threads(example: Mechanize,Scrubyt)
Update : Using MemCache to store result objects is strongly recommended. Inbuilt cache works, but may give unpredictable results. Also, using Memcache serves as an out of process cache, which can be queried at any time. If your worker is doing some processing, inbuilt cache may not return result until worker picks up that request.
All workers can cache results using cache attribute. This result object can be then
queried from rails using ask_result. For example:
class ProgressWorker < BackgrounDRb::MetaWorker
set_worker_name :progress_worker
def create
@counter = 0
add_periodic_timer(2) { increment_counter }
end
def increment_counter
@counter += 1
cache[some_key] = counter
end
end
And using MiddleMan proxy, you can keep querying the status of progress bar :
MiddleMan.worker(:progress_worker).ask_result(some_key)
By default, cache is a worker local hash like object, which is used for storing results.
But if you plan to store lots of objects in cache from your worker, it may not be an
optimal solution. You can easily replace in-worker cache with memcache.
You need to change backgroundrb.yml file like this, for using memcache for object caching:
:backgroundrb: :ip: 0.0.0.0 :port: 11006 :result_storage: memcache :memcache: "10.0.0.1:11211,10.0.0.2:11211"
Everything else remains the same.
BackgrounDRb now have out of box support for persistent job queues which are persisted to the database. API to add a task in the job_queue is pretty simple:
MiddleMan(:hello_worker).enq_some_task(:arg => "hello_world",:job_key => "boy")
So in your hello worker:
class HelloWorker
def some_task args
.. do some work ..
persistent_job.finish! #=> marks the job as finished. totally thread safe
end
end
persistent_job is a thread local variable and will refer to currently
running queued task can be used from thread pool as well. For example:
class HelloWorker
def some_task args
thread_pool.defer(:fetch_url,args)
end
def fetch_url tags
.. runs in thread ..
.. fetch tasks ..
persistent_job.finish!
end
end
BackgrounDRb comes with a baked in mechanism to write test cases. First make sure that you
have bdrb_test_helper.rb in the test directory of your rails app (run rake backgroundrb:setup, if you dont have one).
Just put your worker test cases in test/unit directory of your rails application and require the helper. Now, you should be good to go.
require File.join(File.dirname(__FILE__) + "/../bdrb_test_helper")
require "god_worker"
context "When god worker starts" do
setup do
god_worker = GodWorker.new
end
end
All above helper file does is that it stubs out, relevant worker methods, which really need network IO. There can be methods added, which aren’t stubbed, for all such methods you are encouraged to stub them and send the patch to the backgroundrb mailing list.