Package gofer :: Package agent :: Module plugin
[hide private]
[frames] | no frames]

Source Code for Module gofer.agent.plugin

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  """ 
 17  Plugin classes. 
 18  """ 
 19   
 20  import os 
 21  import imp 
 22  import inspect 
 23  from threading import RLock 
 24  from gofer import * 
 25  from gofer.rmi.dispatcher import Dispatcher 
 26  from gofer.rmi.threadpool import ThreadPool 
 27  from gofer.rmi.consumer import RequestConsumer 
 28  from gofer.rmi.decorators import Remote 
 29  from gofer.agent.deplist import DepList 
 30  from gofer.agent.config import Base, Config, nvl 
 31  from gofer.agent.action import Actions 
 32  from gofer.agent.whiteboard import Whiteboard 
 33  from gofer.messaging import Queue 
 34  from gofer.messaging.broker import Broker 
 35  from logging import getLogger 
 36   
 37  log = getLogger(__name__) 
38 39 40 -class Plugin(object):
41 """ 42 Represents a plugin. 43 @ivar name: The plugin name. 44 @type name: str 45 @ivar synonyms: The plugin synonyms. 46 @type synonyms: list 47 @ivar descriptor: The plugin descriptor. 48 @type descriptor: str 49 @cvar plugins: The dict of loaded plugins. 50 @type plugins: dict 51 """ 52 plugins = {} 53 54 @classmethod
55 - def add(cls, plugin):
56 """ 57 Add the plugin. 58 @param plugin: The plugin to add. 59 @type plugin: L{Plugin} 60 @return: The added plugin 61 @rtype: L{Plugin} 62 """ 63 cls.plugins[plugin.name] = plugin 64 for syn in plugin.synonyms: 65 if syn == plugin.name: 66 continue 67 cls.plugins[syn] = plugin 68 return plugin
69 70 @classmethod
71 - def delete(cls, plugin):
72 """ 73 Delete the plugin. 74 @param plugin: The plugin to delete. 75 @type plugin: L{Plugin} 76 """ 77 for k,v in cls.plugins.items(): 78 if v == plugin: 79 del cls.plugins[k] 80 return plugin
81 82 @classmethod
83 - def find(cls, name):
84 """ 85 Find a plugin by name or synonym. 86 @param name: A plugin name or synonym. 87 @type name: str 88 @return: The plugin when found. 89 @rtype: L{Plugin} 90 """ 91 return cls.plugins.get(name)
92 93 @classmethod
94 - def all(cls):
95 """ 96 Get a unique list of loaded plugins. 97 @return: A list of plugins 98 @rtype: list 99 """ 100 unique = [] 101 for p in cls.plugins.values(): 102 if p in unique: 103 continue 104 unique.append(p) 105 return unique
106
107 - def __init__(self, name, descriptor, synonyms=[]):
108 """ 109 @param name: The plugin name. 110 @type name: str 111 @param descriptor: The plugin descriptor. 112 @type descriptor: L{PluginDescriptor} 113 @param synonyms: The plugin synonyms. 114 @type synonyms: list 115 """ 116 self.name = name 117 self.descriptor = descriptor 118 self.synonyms = [] 119 for syn in synonyms: 120 if syn == name: 121 continue 122 self.synonyms.append(syn) 123 self.__mutex = RLock() 124 self.__pool = None 125 self.impl = None 126 self.actions = [] 127 self.dispatcher = Dispatcher([]) 128 self.whiteboard = Whiteboard() 129 self.consumer = None
130
131 - def names(self):
132 """ 133 Get I{all} the names by which the plugin can be found. 134 @return: A list of name and synonyms. 135 @rtype: list 136 """ 137 names = [self.name] 138 names += self.synonyms 139 return names
140
141 - def enabled(self):
142 """ 143 Get whether the plugin is enabled. 144 @return: True if enabled. 145 @rtype: bool 146 """ 147 cfg = self.cfg() 148 try: 149 return int(nvl(cfg.main.enabled, 0)) 150 except: 151 return 0
152
153 - def getuuid(self):
154 """ 155 Get the plugin's messaging UUID. 156 @return: The plugin's messaging UUID. 157 @rtype: str 158 """ 159 self.__lock() 160 try: 161 cfg = self.cfg() 162 return nvl(cfg.messaging.uuid) 163 finally: 164 self.__unlock()
165
166 - def geturl(self):
167 """ 168 Get the broker URL 169 @return: The broker URL 170 @rtype: str 171 """ 172 main = Config() 173 cfg = self.cfg() 174 return nvl(cfg.messaging.url, 175 nvl(main.messaging.url))
176
177 - def getbroker(self):
178 """ 179 Get the amqp broker for this plugin. Each plugin can 180 connect to a different broker. 181 @return: The broker if configured. 182 @rtype: L{Broker} 183 """ 184 cfg = self.cfg() 185 main = Config() 186 broker = Broker(self.geturl()) 187 broker.cacert = \ 188 nvl(cfg.messaging.cacert, 189 nvl(main.messaging.cacert)) 190 broker.clientcert = \ 191 nvl(cfg.messaging.clientcert, 192 nvl(main.messaging.clientcert)) 193 log.info('broker (qpid) configured: %s', broker) 194 return broker
195
196 - def getpool(self):
197 """ 198 Get the plugin's thread pool. 199 @return: ThreadPool. 200 """ 201 if self.__pool is None: 202 n = self.nthreads() 203 self.__pool = ThreadPool(1, n, duplex=False) 204 return self.__pool
205
206 - def setuuid(self, uuid, save=False):
207 """ 208 Set the plugin's UUID. 209 @param uuid: The new UUID. 210 @type uuid: str 211 @param save: Save to plugin descriptor. 212 @type save: bool 213 """ 214 self.__lock() 215 try: 216 cfg = self.cfg() 217 if uuid: 218 cfg.messaging.uuid = uuid 219 else: 220 delattr(cfg.messaging, 'uuid') 221 if save: 222 cfg.write() 223 finally: 224 self.__unlock()
225
226 - def seturl(self, url, save=False):
227 """ 228 Set the plugin's URL. 229 @param url: The new URL. 230 @type url: str 231 @param save: Save to plugin descriptor. 232 @type save: bool 233 """ 234 self.__lock() 235 try: 236 cfg = self.cfg() 237 if url: 238 cfg.messaging.url = url 239 else: 240 delattr(cfg.messaging, 'url') 241 if save: 242 cfg.write() 243 finally: 244 self.__unlock()
245
246 - def nthreads(self):
247 """ 248 Get the number of theads in the plugin's pool. 249 @return: number of theads. 250 @rtype: int 251 """ 252 main = Config() 253 cfg = self.cfg() 254 value = \ 255 nvl(cfg.messaging.threads, 256 nvl(main.messaging.threads, 1)) 257 value = int(value) 258 assert(value >= 1) 259 return value
260
261 - def attach(self, uuid=None):
262 """ 263 Attach (connect) to AMQP broker using the specified uuid. 264 @param uuid: The (optional) messaging UUID. 265 @type uuid: str 266 """ 267 cfg = self.cfg() 268 if not uuid: 269 uuid = self.getuuid() 270 broker = self.getbroker() 271 url = broker.url 272 queue = Queue(uuid) 273 consumer = RequestConsumer(queue, url=url) 274 consumer.start() 275 self.consumer = consumer
276
277 - def detach(self):
278 """ 279 Detach (disconnect) from AMQP broker (if connected). 280 """ 281 if self.consumer: 282 self.consumer.close() 283 self.consumer = None 284 return True 285 else: 286 return False
287
288 - def cfg(self):
289 """ 290 Get the plugin descriptor. 291 @return: The plugin descriptor 292 @rtype: L{PluginDescriptor} 293 """ 294 return self.descriptor
295
296 - def dispatch(self, request):
297 """ 298 Dispatch (invoke) the specified RMI request. 299 @param request: An RMI request 300 @type request: L{Envelope} 301 @return: The RMI returned. 302 """ 303 return self.dispatcher.dispatch(request)
304
305 - def provides(self, name):
306 """ 307 Get whether a plugin provides the specified class. 308 @param name: A class (or module) name. 309 @type name: str 310 @return: True if provides. 311 @rtype: bool 312 """ 313 return self.dispatcher.provides(name)
314
315 - def export(self, name):
316 """ 317 Export an object defined in the plugin (module). 318 The name must reference a class or function object. 319 @param name: A name (class|function) 320 @type name: str 321 @return: The named item. 322 @rtype: (class|function) 323 @raise NameError: when not found 324 """ 325 try: 326 obj = getattr(self.impl, name) 327 valid = inspect.isclass(obj) or inspect.isfunction(obj) 328 if valid: 329 return obj 330 raise TypeError, '(%s) must be class|function' % name 331 except AttributeError: 332 raise NameError(name)
333
334 - def __lock(self):
335 self.__mutex.acquire()
336
337 - def __unlock(self):
338 self.__mutex.release()
339
340 341 -class PluginDescriptor(Base):
342 """ 343 Provides a plugin descriptor 344 """ 345 346 ROOT = '/etc/%s/plugins' % NAME 347 348 @classmethod
349 - def __mkdir(cls):
350 if not os.path.exists(cls.ROOT): 351 os.makedirs(cls.ROOT)
352 353 @classmethod
354 - def load(cls):
355 """ 356 Load the plugin descriptors. 357 @return: A list of descriptors. 358 @rtype: list 359 """ 360 unsorted = [] 361 cls.__mkdir() 362 for name, path in cls.__list(): 363 try: 364 inst = cls(path) 365 inst.__dict__['__path__'] = path 366 unsorted.append((name, inst)) 367 except: 368 log.exception(path) 369 return cls.__sort(unsorted)
370 371 @classmethod
372 - def __list(cls):
373 files = os.listdir(cls.ROOT) 374 for fn in sorted(files): 375 plugin,ext = fn.split('.',1) 376 if not ext in ('.conf'): 377 continue 378 path = os.path.join(cls.ROOT, fn) 379 if os.path.isdir(path): 380 continue 381 yield (plugin, path)
382 383 @classmethod
384 - def __sort(cls, descriptors):
385 """ 386 Sort descriptors based on defined dependencies. 387 Dependencies defined by [main].requires 388 @param descriptors: A list of descriptor tuples (name,descriptor) 389 @type descriptors: list 390 @return: The sorted list 391 @rtype: list 392 """ 393 index = {} 394 for d in descriptors: 395 index[d[0]] = d 396 L = DepList() 397 for n,d in descriptors: 398 r = (n, d.__requires()) 399 L.add(r) 400 sorted = [] 401 for name in [x[0] for x in L.sort()]: 402 d = index[name] 403 sorted.append(d) 404 return sorted
405
406 - def __requires(self):
407 """ 408 Get the list of declared required plugins. 409 @return: A list of plugin names. 410 @rtype: list 411 """ 412 required = [] 413 declared = nvl(self.main.requires) 414 if declared: 415 plugins = declared.split(',') 416 required = [s.strip() for s in plugins] 417 return tuple(required)
418
419 - def write(self):
420 """ 421 Write the descriptor to the filesystem 422 Written to: __path__. 423 """ 424 path = self.__dict__['__path__'] 425 f = open(path, 'w') 426 f.write(str(self)) 427 f.close()
428
429 430 -class PluginLoader:
431 """ 432 Agent plugins loader. 433 @cvar PATH: A list of paths to directories containing plugins. 434 @type PATH: list 435 @ivar plugins: A dict of plugins and configuratons 436 @type plugins: dict 437 """ 438 439 PATH = [ 440 '/usr/share/%s/plugins' % NAME, 441 '/usr/lib/%s/plugins' % NAME, 442 '/usr/lib64/%s/plugins' % NAME, 443 '/opt/%s/plugins' % NAME, 444 ] 445
446 - def load(self, eager=True):
447 """ 448 Load the plugins. 449 @param eager: Load disabled plugins. 450 @type eager: bool 451 @return: A list of loaded plugins 452 @rtype: list 453 """ 454 loaded = [] 455 for plugin, cfg in PluginDescriptor.load(): 456 if self.__noload(cfg, eager): 457 continue 458 p = self.__import(plugin, cfg) 459 if not p: 460 continue # load failed 461 if not p.enabled(): 462 log.warn('plugin: %s, DISABLED', p.name) 463 loaded.append(p) 464 return loaded
465
466 - def __noload(self, cfg, eager):
467 """ 468 Determine whether the plugin should be loaded. 469 @param cfg: A plugin descriptor. 470 @type cfg: L{PluginDescriptor} 471 @param eager: The I{eager} load flag. 472 @type eager: bool 473 @return: True when not loaded. 474 @rtype: bool 475 """ 476 try: 477 return not ( eager or int(cfg.main.enabled) ) 478 except: 479 return False
480 481
482 - def __import(self, plugin, cfg):
483 """ 484 Import a module by file name. 485 @param plugin: The plugin (module) name. 486 @type plugin: str 487 @param cfg: A plugin descriptor. 488 @type cfg: L{PluginDescriptor} 489 @return: The loaded module. 490 @rtype: Module 491 """ 492 Remote.clear() 493 Actions.clear() 494 syn = self.__mangled(plugin) 495 p = Plugin(plugin, cfg, (syn,)) 496 Plugin.add(p) 497 try: 498 path = self.__findplugin(plugin) 499 mod = imp.load_source(syn, path) 500 p.impl = mod 501 log.info('plugin "%s", imported as: "%s"', plugin, syn) 502 for fn in Remote.find(syn): 503 fn.gofer.plugin = p 504 if p.enabled(): 505 collated = Remote.collated() 506 p.dispatcher = Dispatcher(collated) 507 p.actions = Actions.collated() 508 return p 509 except: 510 Plugin.delete(p) 511 log.exception('plugin "%s", import failed', plugin)
512
513 - def __findplugin(self, plugin):
514 """ 515 Find a plugin module. 516 @param plugin: The plugin name. 517 @type plugin: str 518 @return: The fully qualified path to the plugin module. 519 @rtype: str 520 @raise Exception: When not found. 521 """ 522 mod = '%s.py' % plugin 523 for root in self.PATH: 524 path = os.path.join(root, mod) 525 if os.path.exists(path): 526 log.info('using: %s', path) 527 return path 528 raise Exception('%s, not found in:%s' % (mod, self.PATH))
529 530
531 - def __mangled(self, plugin):
532 """ 533 Get the module name for the specified plugin. 534 @param plugin: The name of the plugin. 535 @type plugin: str 536 @return: The (mangled if necessary) plugin's module name. 537 @rtype: str 538 """ 539 try: 540 imp.find_module(plugin) 541 log.warn('"%s" found in python-path', plugin) 542 log.info('"%s" mangled to avoid collisions', plugin) 543 return '%s_plugin' % plugin 544 except: 545 return plugin
546