Changeset 209

Show
Ignore:
Timestamp:
11/21/07 11:53:11 (1 year ago)
Author:
gethema..@gmail.com
Message:

check in log worker code

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/version10/framework/core.rb

    r207 r209  
    204204        @timer_hash.each do |key,timer| 
    205205          if timer.run_now? 
    206             #begin 
    207206            timer.run 
    208             #             rescue 
    209             #               @timer_hash.delete(key) 
    210             #             end 
    211207            @timer_hash.delete(key) if !timer.respond_to?(:interval) 
    212208          end 
  • branches/version10/framework/packet.rb

    r207 r209  
    1 require "rubygems" 
    21require "socket" 
    3 require "base64" 
    42require "yaml" 
    53require "forwardable" 
     
    2826require "connection" 
    2927require "worker" 
    30 #require "cpu_worker" 
    3128 
    3229# This file is just a runner of things and hence does basic initialization of thingies required for running 
     
    3431 
    3532 
    36 PACKET_APP=File.expand_path'../' unless defined?(PACKET_APP) 
     33PACKET_APP = File.expand_path'../' unless defined?(PACKET_APP) 
    3734 
    3835module Packet 
  • branches/version10/framework/packet_master.rb

    r207 r209  
    1111    def self.run 
    1212      master_reactor_instance = new 
     13      master_reactor_instance.live_workers = DoubleKeyedHash.new 
    1314      yield(master_reactor_instance) 
    1415      master_reactor_instance.load_workers 
     
    5455          dump_object(p_object,connection) 
    5556        end 
    56         def_delegators :@reactor, :start_server, :connect, :add_periodic_timer, :add_timer, :cancel_timer,:reconnect 
     57        def_delegators :@reactor, :start_server, :connect, :add_periodic_timer, :add_timer, :cancel_timer,:reconnect, :start_worker 
    5758      end 
    5859      handler_instance.workers = @live_workers 
     
    8081    # where worker is actually required in master as well as in worker. 
    8182    def load_workers 
    82       @live_workers = DoubleKeyedHash.new 
     83 
    8384      if defined?(WORKER_ROOT) 
    8485        worker_root = WORKER_ROOT 
     
    9293        require worker_name 
    9394        worker_klass = Object.const_get(worker_name.classify) 
     95        next if worker_klass.no_auto_load 
    9496        fork_and_load(worker_klass) 
    9597      end 
    9698 
    97       # FIXME: easiest and yet perhaps a bit ugly 
     99      # FIXME: easiest and yet perhaps a bit ugly, its just to make sure that from each 
     100      # worker proxy one can access other workers 
    98101      @live_workers.each do |key,worker_instance| 
    99102        worker_instance.workers = @live_workers 
     
    101104    end 
    102105 
     106    def start_worker(worker_name,options = {}) 
     107      require worker_name.to_s 
     108      worker_klass = Object.const_get(worker_name.classify) 
     109      fork_and_load(worker_klass,options) 
     110    end 
     111 
    103112    # method forks given worker file in a new process 
    104     def fork_and_load(worker_klass
     113    def fork_and_load(worker_klass,worker_options = { }
    105114      t_worker_name = worker_klass.worker_name 
    106115      worker_pimp = worker_klass.worker_proxy.to_s 
     
    110119      # socket to which master process is going to write 
    111120      worker_read_end,master_write_end = UNIXSocket.pair(Socket::SOCK_STREAM) 
     121      worker_read_fd,master_write_fd = UNIXSocket.pair 
    112122 
    113       worker_read_fd,master_write_fd = UNIXSocket.pair 
    114123      if((pid = fork()).nil?) 
    115         # close file handles which are not required in child 
    116124        $0 = "ruby #{worker_klass.worker_name}" 
    117125        master_write_end.close 
     
    119127        master_write_fd.close 
    120128        # master_write_end.close if master_write_end 
    121         worker_klass.start_worker(:write_end => worker_write_end,:read_end => worker_read_end,:read_fd => worker_read_fd
     129        worker_klass.start_worker(:write_end => worker_write_end,:read_end => worker_read_end,:read_fd => worker_read_fd,:options => worker_options
    122130      end 
    123131      Process.detach(pid) 
    124       # if no pimp exists for the given class then we should create a pimp class for the worker 
    125       # meta programmatically. 
     132 
    126133      unless worker_pimp.blank? 
    127134        require worker_pimp 
  • branches/version10/framework/worker.rb

    r207 r209  
    55    iattr_accessor :fd_reader,:msg_writer,:msg_reader,:worker_name 
    66    iattr_accessor :worker_proxy 
    7     attr_accessor :worker_started 
     7    iattr_accessor :no_auto_load 
     8    attr_accessor :worker_started, :worker_options 
    89    after_connection :provide_workers 
    910 
     
    1516      @fd_reader = messengers[:read_fd] 
    1617      t_instance = new 
     18      t_instance.worker_options = messengers[:options] 
    1719      t_instance.worker_init if t_instance.respond_to?(:worker_init) 
    1820      t_instance.start_reactor 
     
    3638        send_data(:data => t_data,:function => options[:function],:callback_signature => t_callback.signature) 
    3739      else 
    38         send_data(:data => t_data,:function => options[:function]
     40        send_data(:data => t_data,:function => options[:function],:requested_worker => options[:worker],:requesting_worker => worker_name,:type => :request
    3941      end 
    4042    end 
  • branches/version10/server/master_worker.rb

    r206 r209  
    1010require "cron_trigger" 
    1111require "trigger" 
     12require "log_worker" 
    1213 
    1314require File.expand_path("#{RAILS_HOME}/config/environment.rb") 
     
    5758  end 
    5859  def connection_completed; end 
    59  
    60   def log log_data 
    61     workers[:log_worker].send_request(:type => :request, :data => log_data) 
    62   end 
    6360end 
    6461 
     
    6865    config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 
    6966    Packet::Reactor.run do |t_reactor| 
     67      t_reactor.start_worker("log_worker") 
    7068      t_reactor.start_server(config_file[:backgroundrb][:ip],config_file[:backgroundrb][:port],MasterWorker) 
    7169    end 
  • branches/version10/server/meta_worker.rb

    r196 r209  
    11# FIXME: need to wrap workers within a namespace 
    22module BackgrounDRb 
     3  # this class is a dummy class that implements things required for passing data to 
     4  # actual logger worker 
     5  class PacketLogger 
     6    def initialize(worker) 
     7      @worker = worker 
     8    end 
     9    def info(p_data) 
     10      @worker.send_request(:worker => :log_worker, :data => p_data) 
     11    end 
     12 
     13    def debug 
     14      @worker.send_request(:worker => :log_worker, :data => p_data) 
     15    end 
     16  end 
     17 
    318  class MetaWorker < Packet::Worker 
    419    attr_accessor :config_file, :my_schedule, :run_time, :trigger_type, :trigger 
     20    attr_accessor :logger 
    521 
    622    def worker_init 
    723      @config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 
     24      @logger = PacketLogger.new(self) 
    825      if @config_file[:schedules] 
    926        @my_schedule = @config_file[:schedules][worker_name.to_sym] 
     
    1128      end 
    1229      create if respond_to?(:create) 
     30    end 
     31 
     32    # a user may pass trigger arguments to dynamically define the schedule 
     33    def load_schedule_from_args 
     34 
    1335    end 
    1436