Package gofer :: Package agent :: Module main
[hide private]
[frames] | no frames]

Source Code for Module gofer.agent.main

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  import sys 
 17  import os 
 18  import logging 
 19  from getopt import getopt, GetoptError 
 20  from gofer import * 
 21  from gofer.pam import PAM 
 22  from gofer.agent import * 
 23  from gofer.agent.plugin import PluginLoader, Plugin 
 24  from gofer.agent.lock import Lock, LockFailed 
 25  from gofer.agent.config import Config, nvl 
 26  from gofer.agent.logutil import getLogger 
 27  from gofer.agent.rmi import Scheduler 
 28  from time import sleep 
 29  from threading import Thread 
 30   
 31  log = getLogger(__name__) 
 32  cfg = Config() 
 33   
 34   
35 -class ActionThread(Thread):
36 """ 37 Run actions independently of main thread. 38 @ivar actions: A list of actions to run. 39 @type actions: [L{Action},..] 40 """ 41
42 - def __init__(self, actions):
43 """ 44 @param actions: A list of actions to run. 45 @type actions: [L{Action},..] 46 """ 47 self.actions = actions 48 Thread.__init__(self, name='Actions') 49 self.setDaemon(True)
50
51 - def run(self):
52 """ 53 Run actions. 54 """ 55 while True: 56 for action in self.actions: 57 action() 58 sleep(1)
59 60
61 -class Snapshot(dict):
62 """ 63 Plugin property snapshot. 64 Used to track changes in plugin properties. 65 """ 66 __getattr__ = dict.get 67
68 - def changed(self, **properties):
69 keys = [] 70 for k,v in properties.items(): 71 if self.get(k) != v: 72 keys.append(k) 73 return keys
74 75
76 -class PluginMonitorThread(Thread):
77 """ 78 Run actions independantly of main thread. 79 @ivar plugin: A plugin to monitor. 80 @type plugin: L{Plugin} 81 """ 82
83 - def __init__(self, plugin):
84 """ 85 @param plugin: A plugin to monitor. 86 @type plugin: L{Plugin} 87 """ 88 self.plugin = plugin 89 self.snapshot = Snapshot() 90 Thread.__init__(self, name='%s-monitor' % plugin.name) 91 self.setDaemon(True)
92
93 - def run(self):
94 """ 95 Monitor plugin attach/detach. 96 """ 97 while True: 98 try: 99 self.update() 100 except: 101 log.exception('plugin %s', self.plugin.name) 102 sleep(1)
103
104 - def update(self):
105 """ 106 Update plugin messaging sessions. 107 When a change in URL or UUID is detected the 108 associated plugin is: 109 - detached 110 - attached (URL and UUID specified) 111 """ 112 plugin = self.plugin 113 snapshot = self.snapshot 114 url = plugin.geturl() 115 uuid = plugin.getuuid() 116 if not snapshot.changed(url=url, uuid=uuid): 117 return # unchanged 118 if plugin.detach(): 119 log.info('uuid="%s", detached', snapshot.uuid) 120 snapshot.update(url=url, uuid=uuid) 121 if url and uuid: 122 plugin.attach(uuid) 123 log.info('uuid="%s", attached', uuid)
124 125
126 -class Agent:
127 """ 128 Gofer (main) agent. 129 Starts (2) threads. A thread to run actions and 130 another to monitor/update plugin sessions on the bus. 131 """ 132 133 WAIT = None 134
135 - def __init__(self, plugins):
136 """ 137 @param plugins: A list of loaded plugins 138 @type plugins: list 139 """ 140 self.plugins = plugins 141 PAM.SERVICE = nvl(cfg.pam.service, PAM.SERVICE)
142
143 - def start(self, block=True):
144 """ 145 Start the agent. 146 """ 147 plugins = self.plugins 148 actionThread = self.__startActions(plugins) 149 self.__startScheduler(plugins) 150 self.__startPlugins(plugins) 151 log.info('agent started.') 152 if block: 153 actionThread.join(self.WAIT)
154
155 - def __startActions(self, plugins):
156 """ 157 Start actions on enabled plugins. 158 @param plugins: A list of loaded plugins. 159 @type plugins: list 160 @return: The started action thread. 161 @rtype: L{ActionThread} 162 """ 163 actions = [] 164 for plugin in plugins: 165 actions.extend(plugin.actions) 166 actionThread = ActionThread(actions) 167 actionThread.start() 168 return actionThread
169
170 - def __startScheduler(self, plugins):
171 """ 172 Start the RMI scheduler. 173 @param plugins: A list of loaded plugins. 174 @type plugins: list 175 @return: The started scheduler thread. 176 @rtype: L{Scheduler} 177 """ 178 scheduler = Scheduler(plugins) 179 scheduler.start() 180 return scheduler
181
182 - def __startPlugins(self, plugins):
183 """ 184 Start the plugins. 185 Create and start a plugin monitor thread for each plugin. 186 @param plugins: A list of loaded plugins. 187 @type plugins: list 188 """ 189 for plugin in plugins: 190 if plugin.enabled(): 191 pt = PluginMonitorThread(plugin) 192 pt.start()
193 194
195 -class AgentLock(Lock):
196 """ 197 Agent lock ensure that agent only has single instance running. 198 @cvar PATH: The lock file absolute path. 199 @type PATH: str 200 """ 201 202 PATH = '/var/run/%sd.pid' % NAME 203
204 - def __init__(self):
205 Lock.__init__(self, self.PATH)
206 207
208 -def start(daemon=True):
209 """ 210 Agent main. 211 Add recurring, time-based actions here. 212 All actions must be subclass of L{action.Action}. 213 """ 214 lock = AgentLock() 215 try: 216 lock.acquire(0) 217 except LockFailed, e: 218 raise Exception('Agent already running') 219 if daemon: 220 daemonize(lock) 221 try: 222 pl = PluginLoader() 223 plugins = pl.load(eager()) 224 agent = Agent(plugins) 225 agent.start() 226 finally: 227 lock.release()
228
229 -def eager():
230 return int(nvl(cfg.loader.eager, 0))
231
232 -def usage():
233 """ 234 Show usage. 235 """ 236 s = [] 237 s.append('\n%sd <options>' % NAME) 238 s.append(' -h, --help') 239 s.append(' Show help') 240 s.append(' -c, --console') 241 s.append(' Run in the foreground and not as a daemon.') 242 s.append(' default: 0') 243 s.append(' -p [seconds], --profile [seconds]') 244 s.append(' Run (foreground) and print code profiling statistics.') 245 s.append('\n') 246 print '\n'.join(s)
247
248 -def daemonize(lock):
249 """ 250 Daemon configuration. 251 """ 252 pid = os.fork() 253 if pid == 0: # child 254 os.setsid() 255 os.chdir('/') 256 os.close(0) 257 os.close(1) 258 os.close(2) 259 dn = os.open('/dev/null', os.O_RDWR) 260 os.dup(dn) 261 os.dup(dn) 262 os.dup(dn) 263 else: # parent 264 lock.setpid(pid) 265 os.waitpid(pid, os.WNOHANG) 266 os._exit(0)
267
268 -def setupLogging():
269 """ 270 Set logging levels based on configuration. 271 """ 272 for p in nvl(cfg.logging, []): 273 level = cfg.logging[p] 274 if not level: 275 continue 276 try: 277 L = getattr(logging, level.upper()) 278 logger = logging.getLogger(p) 279 logger.setLevel(L) 280 except: 281 pass
282
283 -def profile():
284 """ 285 Code profiler using YAPPI 286 http://code.google.com/p/yappi 287 """ 288 import yappi 289 yappi.start() 290 start(False) 291 yappi.stop() 292 for pstat in yappi.get_stats(yappi.SORTTYPE_TSUB): 293 print pstat
294
295 -def main():
296 daemon = True 297 setupLogging() 298 try: 299 opts, args = getopt(sys.argv[1:], 'hcp:', ['help','console','prof']) 300 for opt,arg in opts: 301 if opt in ('-h', '--help'): 302 usage() 303 sys.exit(0) 304 if opt in ('-c', '--console'): 305 daemon = False 306 continue 307 if opt in ('-p', '--prof'): 308 __start = profile 309 Agent.WAIT = int(arg) 310 profile() 311 return 312 start(daemon) 313 except GetoptError, e: 314 print e 315 usage()
316 317 if __name__ == '__main__': 318 main() 319