MetaWorker class

BackgrounDRb workers are asynchrounous reactors which work using events You are free to use threads in your workers, but be reasonable with them. Following methods are available to all workers from parent classes.

  • BackgrounDRb::MetaWorker#connect

    Above method connects to an external tcp server and integrates the connection within reactor loop of worker. For example:

         class TimeClient
           def receive_data(p_data)
             worker.get_external_data(p_data)
           end
    
           def post_init
             p "***************** : connection completed"
           end
         end
    
         class FooWorker < BackgrounDRb::MetaWorker
           set_worker_name :foo_worker
           def create(args = nil)
             external_connection = nil
             connect("localhost",11009,TimeClient) { |conn| external_connection = conn }
           end
    
           def get_external_data(p_data)
             puts "And external data is : #{p_data}"
           end
         end
    
  • BackgrounDRb::MetaWorker#start_server

    Above method allows you to start a tcp server from your worker, all the accepted connections are integrated with event loop of worker

       class TimeServer
    
         def receive_data(p_data)
         end
    
         def post_init
           add_periodic_timer(2) { say_hello_world }
         end
    
         def connection_completed
         end
    
         def say_hello_world
           p "***************** : invoking hello world #{Time.now}"
           send_data("Hello World\n")
         end
       end
    
       class ServerWorker < BackgrounDRb::MetaWorker
         set_worker_name :server_worker
         def create(args = nil)
           # start the server when worker starts
           start_server("0.0.0.0",11009,TimeServer) do |client_connection|
             client_connection.say_hello_world
           end
         end
       end
    
Methods
Attributes
[RW] config_file
[RW] logger
[RW] my_schedule
[RW] run_time
[RW] thread_pool
[RW] trigger
[RW] trigger_type
Public Instance methods
check_for_timer_events()
     # File server/meta_worker.rb, line 299
299:     def check_for_timer_events
300:       super
301:       return if @worker_method_triggers.nil? or @worker_method_triggers.empty?
302:       @worker_method_triggers.each do |key,value|
303:         if value[:runtime] < Time.now.to_i
304:           (t_data = value[:data]) ? send(key,t_data) : send(key)
305:           value[:runtime] = value[:trigger].fire_time_after(Time.now).to_i
306:         end
307:       end
308:     end
connection_completed()
     # File server/meta_worker.rb, line 297
297:     def connection_completed; end
invoke_worker_method()

we are overriding the function that checks for timers def check_for_timer_events

  super
  return unless @my_schedule
  if @run_time < Time.now.to_i
    # self.send(@my_schedule[:worker_method]) if self.respond_to?(@my_schedule[:worker_method])
    invoke_worker_method
    @run_time = @trigger.fire_time_after(Time.now).to_i
  end

end

     # File server/meta_worker.rb, line 326
326:     def invoke_worker_method
327:       if self.respond_to?(@my_schedule[:worker_method]) && @my_schedule[:data]
328:         self.send(@my_schedule[:worker_method],@my_schedule[:data])
329:       elsif self.respond_to?(@my_schedule[:worker_method])
330:         self.send(@my_schedule[:worker_method])
331:       end
332:     end
load_schedule()
     # File server/meta_worker.rb, line 217
217:     def load_schedule
218:       case @my_schedule[:trigger_args]
219:       when String
220:         @trigger_type = :cron_trigger
221:         cron_args = @my_schedule[:trigger_args] || "0 0 0 0 0"
222:         @trigger = BackgrounDRb::CronTrigger.new(cron_args)
223:       when Hash
224:         @trigger_type = :trigger
225:         @trigger = BackgrounDRb::Trigger.new(@my_schedule[:trigger_args])
226:       end
227:       @run_time = @trigger.fire_time_after(Time.now).to_i
228:     end
load_schedule_from_args()

loads workers schedule from options supplied from rails a user may pass trigger arguments to dynamically define the schedule

     # File server/meta_worker.rb, line 180
180:     def load_schedule_from_args
181:       @my_schedule = @worker_options[:schedule]
182:       new_load_schedule if @my_schedule
183:     end
new_load_schedule()

new experimental scheduler

     # File server/meta_worker.rb, line 231
231:     def new_load_schedule
232:       @worker_method_triggers = { }
233:       @my_schedule.each do |key,value|
234:         case value[:trigger_args]
235:         when String
236:           cron_args = value[:trigger_args] || "0 0 0 0 0"
237:           trigger = BackgrounDRb::CronTrigger.new(cron_args)
238:         when Hash
239:           trigger = BackgrounDRb::Trigger.new(value[:trigger_args])
240:         end
241:         @worker_method_triggers[key] = { :trigger => trigger,:data => value[:data],:runtime => trigger.fire_time_after(Time.now).to_i }
242:       end
243:     end
process_request(p_data)

method is responsible for invoking appropriate method in user

     # File server/meta_worker.rb, line 198
198:     def process_request(p_data)
199:       user_input = p_data[:data]
200:       logger.info "#{user_input[:worker_method]} #{user_input[:data]}"
201:       if (user_input[:worker_method]).nil? or !respond_to?(user_input[:worker_method])
202:         logger.info "Undefined method #{user_input[:worker_method]} called on worker #{worker_name}"
203:         return
204:       end
205:       called_method_arity = self.method(user_input[:worker_method]).arity
206:       logger.info "Arity of method is #{called_method_arity}"
207:       result = nil
208:       if called_method_arity != 0
209:         result = self.send(user_input[:worker_method],user_input[:data])
210:       else
211:         result = self.send(user_input[:worker_method])
212:       end
213:       result = "dummy_result" unless result
214:       send_response(p_data,result)
215:     end
receive_data(p_data)

receives requests/responses from master process or other workers

     # File server/meta_worker.rb, line 186
186:     def receive_data p_data
187:       if p_data[:data][:worker_method] == :exit
188:         exit
189:         return
190:       end
191:       case p_data[:type]
192:       when :request: process_request(p_data)
193:       when :response: process_response(p_data)
194:       end
195:     end
register_result(p_data)
     # File server/meta_worker.rb, line 264
264:     def register_result p_data
265:       result = { :type => :result, :data => p_data }
266:       begin
267:         send_data(result)
268:       rescue TypeError => e
269:         logger.info(e.to_s)
270:         logger.info(e.backtrace.join("\n"))
271:       rescue
272:         logger.info($!.to_s)
273:         logger.info($!.backtrace.join("\n"))
274:       end
275:     end
register_status(p_data)

probably this method should be made thread safe, so as a method needs to have a lock or something before it can use the method

     # File server/meta_worker.rb, line 247
247:     def register_status p_data
248:       status = {:type => :status,:data => p_data}
249:       begin
250:         send_data(status)
251:       rescue TypeError => e
252:         status = {:type => :status,:data => "invalid_status_dump_check_log"}
253:         send_data(status)
254:         logger.info(e.to_s)
255:         logger.info(e.backtrace.join("\n"))
256:       rescue
257:         status = {:type => :status,:data => "invalid_status_dump_check_log"}
258:         send_data(status)
259:         logger.info($!.to_s)
260:         logger.info($!.backtrace.join("\n"))
261:       end
262:     end
run_user_threads()

method would allow user threads to run exclusively for a while

     # File server/meta_worker.rb, line 311
311:     def run_user_threads
312:       @thread_pool.exclusive_run
313:     end
send_response(input,output)
     # File server/meta_worker.rb, line 277
277:     def send_response input,output
278:       input[:data] = output
279:       input[:type] = :response
280:       begin
281:         send_data(input)
282:       rescue TypeError => e
283:         logger.info(e.to_s)
284:         logger.info(e.backtrace.join("\n"))
285:         input[:data] = "invalid_result_dump_check_log"
286:         send_data(input)
287:       rescue
288:         logger.info($!.to_s)
289:         logger.info($!.backtrace.join("\n"))
290:         input[:data] = "invalid_result_dump_check_log"
291:         send_data(input)
292:       end
293:     end
unbind()
     # File server/meta_worker.rb, line 295
295:     def unbind; end
worker_init()

does initialization of worker stuff and invokes create method in user defined worker class

     # File server/meta_worker.rb, line 158
158:     def worker_init
159:       @thread_pool = ThreadPool.new(20)
160: 
161:       @config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result)
162:       # load_rails_env
163:       @logger = PacketLogger.new(self)
164:       if(@worker_options && @worker_options[:schedule] && no_auto_load)
165:         load_schedule_from_args
166:       elsif(@config_file[:schedules] && @config_file[:schedules][worker_name.to_sym])
167:         @my_schedule = @config_file[:schedules][worker_name.to_sym]
168:         new_load_schedule if @my_schedule
169:       end
170:       if respond_to?(:create)
171:         create_arity = method(:create).arity
172:         (create_arity == 0) ? create : create(@worker_options[:data])
173:       end
174:       @logger.info "#{worker_name} started"
175:       @logger.info "Schedules for worker loaded"
176:     end