MetaWorker class
BackgrounDRb workers are asynchrounous reactors which work using events You are free to use threads in your workers, but be reasonable with them. Following methods are available to all workers from parent classes.
- BackgrounDRb::MetaWorker#connect
Above method connects to an external tcp server and integrates the connection within reactor loop of worker. For example:
class TimeClient def receive_data(p_data) worker.get_external_data(p_data) end def post_init p "***************** : connection completed" end end class FooWorker < BackgrounDRb::MetaWorker set_worker_name :foo_worker def create(args = nil) external_connection = nil connect("localhost",11009,TimeClient) { |conn| external_connection = conn } end def get_external_data(p_data) puts "And external data is : #{p_data}" end end - BackgrounDRb::MetaWorker#start_server
Above method allows you to start a tcp server from your worker, all the accepted connections are integrated with event loop of worker
class TimeServer def receive_data(p_data) end def post_init add_periodic_timer(2) { say_hello_world } end def connection_completed end def say_hello_world p "***************** : invoking hello world #{Time.now}" send_data("Hello World\n") end end class ServerWorker < BackgrounDRb::MetaWorker set_worker_name :server_worker def create(args = nil) # start the server when worker starts start_server("0.0.0.0",11009,TimeServer) do |client_connection| client_connection.say_hello_world end end end
- check_for_timer_events
- connection_completed
- invoke_worker_method
- load_schedule
- load_schedule_from_args
- new_load_schedule
- process_request
- receive_data
- register_result
- register_status
- run_user_threads
- send_response
- unbind
- worker_init
| [RW] | config_file | |
| [RW] | logger | |
| [RW] | my_schedule | |
| [RW] | run_time | |
| [RW] | thread_pool | |
| [RW] | trigger | |
| [RW] | trigger_type |
[ show source ]
# File server/meta_worker.rb, line 299
299: def check_for_timer_events
300: super
301: return if @worker_method_triggers.nil? or @worker_method_triggers.empty?
302: @worker_method_triggers.each do |key,value|
303: if value[:runtime] < Time.now.to_i
304: (t_data = value[:data]) ? send(key,t_data) : send(key)
305: value[:runtime] = value[:trigger].fire_time_after(Time.now).to_i
306: end
307: end
308: end
[ show source ]
# File server/meta_worker.rb, line 297
297: def connection_completed; end
we are overriding the function that checks for timers def check_for_timer_events
super
return unless @my_schedule
if @run_time < Time.now.to_i
# self.send(@my_schedule[:worker_method]) if self.respond_to?(@my_schedule[:worker_method])
invoke_worker_method
@run_time = @trigger.fire_time_after(Time.now).to_i
end
end
[ show source ]
# File server/meta_worker.rb, line 326
326: def invoke_worker_method
327: if self.respond_to?(@my_schedule[:worker_method]) && @my_schedule[:data]
328: self.send(@my_schedule[:worker_method],@my_schedule[:data])
329: elsif self.respond_to?(@my_schedule[:worker_method])
330: self.send(@my_schedule[:worker_method])
331: end
332: end
[ show source ]
# File server/meta_worker.rb, line 217
217: def load_schedule
218: case @my_schedule[:trigger_args]
219: when String
220: @trigger_type = :cron_trigger
221: cron_args = @my_schedule[:trigger_args] || "0 0 0 0 0"
222: @trigger = BackgrounDRb::CronTrigger.new(cron_args)
223: when Hash
224: @trigger_type = :trigger
225: @trigger = BackgrounDRb::Trigger.new(@my_schedule[:trigger_args])
226: end
227: @run_time = @trigger.fire_time_after(Time.now).to_i
228: end
loads workers schedule from options supplied from rails a user may pass trigger arguments to dynamically define the schedule
[ show source ]
# File server/meta_worker.rb, line 180
180: def load_schedule_from_args
181: @my_schedule = @worker_options[:schedule]
182: new_load_schedule if @my_schedule
183: end
new experimental scheduler
[ show source ]
# File server/meta_worker.rb, line 231
231: def new_load_schedule
232: @worker_method_triggers = { }
233: @my_schedule.each do |key,value|
234: case value[:trigger_args]
235: when String
236: cron_args = value[:trigger_args] || "0 0 0 0 0"
237: trigger = BackgrounDRb::CronTrigger.new(cron_args)
238: when Hash
239: trigger = BackgrounDRb::Trigger.new(value[:trigger_args])
240: end
241: @worker_method_triggers[key] = { :trigger => trigger,:data => value[:data],:runtime => trigger.fire_time_after(Time.now).to_i }
242: end
243: end
method is responsible for invoking appropriate method in user
[ show source ]
# File server/meta_worker.rb, line 198
198: def process_request(p_data)
199: user_input = p_data[:data]
200: logger.info "#{user_input[:worker_method]} #{user_input[:data]}"
201: if (user_input[:worker_method]).nil? or !respond_to?(user_input[:worker_method])
202: logger.info "Undefined method #{user_input[:worker_method]} called on worker #{worker_name}"
203: return
204: end
205: called_method_arity = self.method(user_input[:worker_method]).arity
206: logger.info "Arity of method is #{called_method_arity}"
207: result = nil
208: if called_method_arity != 0
209: result = self.send(user_input[:worker_method],user_input[:data])
210: else
211: result = self.send(user_input[:worker_method])
212: end
213: result = "dummy_result" unless result
214: send_response(p_data,result)
215: end
receives requests/responses from master process or other workers
[ show source ]
# File server/meta_worker.rb, line 186
186: def receive_data p_data
187: if p_data[:data][:worker_method] == :exit
188: exit
189: return
190: end
191: case p_data[:type]
192: when :request: process_request(p_data)
193: when :response: process_response(p_data)
194: end
195: end
[ show source ]
# File server/meta_worker.rb, line 264
264: def register_result p_data
265: result = { :type => :result, :data => p_data }
266: begin
267: send_data(result)
268: rescue TypeError => e
269: logger.info(e.to_s)
270: logger.info(e.backtrace.join("\n"))
271: rescue
272: logger.info($!.to_s)
273: logger.info($!.backtrace.join("\n"))
274: end
275: end
probably this method should be made thread safe, so as a method needs to have a lock or something before it can use the method
[ show source ]
# File server/meta_worker.rb, line 247
247: def register_status p_data
248: status = {:type => :status,:data => p_data}
249: begin
250: send_data(status)
251: rescue TypeError => e
252: status = {:type => :status,:data => "invalid_status_dump_check_log"}
253: send_data(status)
254: logger.info(e.to_s)
255: logger.info(e.backtrace.join("\n"))
256: rescue
257: status = {:type => :status,:data => "invalid_status_dump_check_log"}
258: send_data(status)
259: logger.info($!.to_s)
260: logger.info($!.backtrace.join("\n"))
261: end
262: end
method would allow user threads to run exclusively for a while
[ show source ]
# File server/meta_worker.rb, line 311
311: def run_user_threads
312: @thread_pool.exclusive_run
313: end
[ show source ]
# File server/meta_worker.rb, line 277
277: def send_response input,output
278: input[:data] = output
279: input[:type] = :response
280: begin
281: send_data(input)
282: rescue TypeError => e
283: logger.info(e.to_s)
284: logger.info(e.backtrace.join("\n"))
285: input[:data] = "invalid_result_dump_check_log"
286: send_data(input)
287: rescue
288: logger.info($!.to_s)
289: logger.info($!.backtrace.join("\n"))
290: input[:data] = "invalid_result_dump_check_log"
291: send_data(input)
292: end
293: end
[ show source ]
# File server/meta_worker.rb, line 295
295: def unbind; end
does initialization of worker stuff and invokes create method in user defined worker class
[ show source ]
# File server/meta_worker.rb, line 158
158: def worker_init
159: @thread_pool = ThreadPool.new(20)
160:
161: @config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result)
162: # load_rails_env
163: @logger = PacketLogger.new(self)
164: if(@worker_options && @worker_options[:schedule] && no_auto_load)
165: load_schedule_from_args
166: elsif(@config_file[:schedules] && @config_file[:schedules][worker_name.to_sym])
167: @my_schedule = @config_file[:schedules][worker_name.to_sym]
168: new_load_schedule if @my_schedule
169: end
170: if respond_to?(:create)
171: create_arity = method(:create).arity
172: (create_arity == 0) ? create : create(@worker_options[:data])
173: end
174: @logger.info "#{worker_name} started"
175: @logger.info "Schedules for worker loaded"
176: end