Package gofer :: Package messaging :: Module auth
[hide private]
[frames] | no frames]

Source Code for Module gofer.messaging.auth

  1  # 
  2  # Copyright (c) 2011 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation; either version 
  6  # 2 of the License (LGPLv2) or (at your option) any later version. 
  7  # There is NO WARRANTY for this software, express or implied, 
  8  # including the implied warranties of MERCHANTABILITY, 
  9  # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should 
 10  # have received a copy of LGPLv2 along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/lgpl-2.0.txt. 
 12  # 
 13  # Jeff Ortel <jortel@redhat.com> 
 14  # 
 15   
 16  import hmac 
 17  from gofer import * 
 18  from threading import RLock 
19 20 21 # 22 # Decorator 23 # 24 -def resolved(fn):
25 def _fn(*a,**k): 26 k = fn(*a,**k) 27 if callable(k): 28 return k() 29 else: 30 return k
31 return _fn 32
33 34 # 35 # Exceptions 36 # 37 38 -class MessageDigest(Exception):
39
40 - def __str__(self):
41 return 'digest: "%s" for: uuid=%s, not matched' % \ 42 (self.args[0], 43 self.args[1])
44
45 -class KeyRequired(Exception):
46
47 - def __str__(self):
48 return 'key for uuid=%s, not found' % self.args[0]
49
50 # 51 # KeyChain 52 # 53 54 -class Role:
55 56 ROLES = { 57 0:'signing', 58 1:'validation', 59 'signing':0, 60 'validation':1,} 61
62 - def __init__(self, id):
63 if isinstance(id, str): 64 self.id = self.ROLES[id] 65 else: 66 self.id = id
67
68 - def __str__(self):
69 return self.ROLES[self.id]
70
71 - def __int__(self):
72 return self.id
73
74 75 -class KeyPair:
76
77 - def __init__(self, kp):
78 if isinstance(kp, dict): 79 self.dict = {} 80 for k,v in kp.items(): 81 r = Role(k) 82 self.dict[int(r)] = v 83 return 84 if isinstance(kp, str): 85 self.dict = {0:kp, 1:kp} 86 return 87 if callable(kp): 88 self.dict = {0:kp, 1:kp} 89 return 90 if isinstance(kp, (tuple,list)): 91 self.dict = {0:kp[0], 1:kp[1]} 92 return 93 raise ValueError(kp)
94
95 - def valid(self):
96 err = 0 97 for k in (0, 1): 98 v = self.dict.get(k) 99 if isinstance(v, str): 100 continue 101 if callable(v): 102 continue 103 err += 1 104 return (err == 0)
105
106 107 -class KeyChain(object):
108 109 DEFAULT = None 110
111 - def __init__(self, **keychain):
112 self.__mutex = RLock() 113 self.__keydict = {} 114 self.update(keychain)
115
116 - def add(self, id, *kp, **roles):
117 self.set(id, *kp, **roles)
118
119 - def default(self, *kp, **roles):
120 self.set(self.DEFAULT, *kp, **roles)
121 122 @synchronized
123 - def clear(self):
124 self.__keydict = {}
125 126 @synchronized
127 - def set(self, id, *kp, **roles):
128 if len(kp) == 0: 129 kp = KeyPair(roles) 130 if kp.valid(): 131 self.__keydict[id] = kp.dict 132 else: 133 raise ValueError() 134 return 135 if len(kp) == 1: 136 kp = KeyPair(kp[0]) 137 self.__keydict[id] = kp.dict 138 return 139 if len(kp) == 2: 140 kp = KeyPair(kp) 141 self.__keydict[id] = kp.dict 142 return 143 raise ValueError()
144 145 @synchronized
146 - def unset(self, id):
147 self.__keydict.pop(id, None)
148 149 @synchronized
150 - def update(self, d):
151 if isinstance(d, KeyChain): 152 self.__keydict.update(d.dict()) 153 return 154 for k,v in d.items(): 155 self.set(k,v)
156 157 @synchronized
158 - def get(self, id, d=None):
159 return self.__keydict.get(id, d)
160 161 @resolved 162 @synchronized
163 - def find(self, role, id, d=None):
164 kp = self.get(id) 165 if not kp: 166 kp = self.get(self.DEFAULT) 167 if kp: 168 key = kp.get(role) 169 else: 170 key = d 171 return key
172 173 @synchronized
174 - def dict(self):
175 return dict(self.__keydict)
176 177 @synchronized
178 - def __setitem__(self, id, kp):
179 self.set(id, kp)
180 181 @synchronized
182 - def __getitem__(self, id):
183 return self.__keydict[id]
184 185 @synchronized
186 - def __repr__(self):
187 return repr(self.__keydict)
188 189 @synchronized
190 - def __str__(self):
191 return str(self.__keydict)
192
193 194 # 195 # Authentication 196 # 197 198 -class HMAC:
199 200 PROPERTY = 'digest' 201
202 - def __init__(self, keychain=KeyChain(), policy=2):
203 self.keychain = keychain 204 self.policy = policy
205
206 - def outbound(self, envelope):
207 envelope.pop(self.PROPERTY, None) 208 uuid = envelope.routing[1] 209 key = self.__key(0, uuid) 210 if key: 211 hash = hmac.new(key) 212 hash.update(repr(envelope)) 213 envelope.digest = hash.hexdigest() 214 return envelope
215
216 - def inbound(self, envelope):
217 __digest = envelope.pop(self.PROPERTY, None) 218 if __digest: 219 uuid = envelope.routing[0] 220 key = self.__key(1, uuid) 221 if key: 222 hash = hmac.new(key) 223 hash.update(repr(envelope)) 224 digest = hash.hexdigest() 225 if digest != __digest: 226 raise MessageDigest(digest, uuid) 227 return envelope
228
229 - def __key(self, role, uuid):
230 key = self.keychain.find(role, uuid) 231 if not key: 232 if self.policy == 2: 233 raise KeyRequired(uuid) 234 else: 235 return key
236 237 # 238 # Testing 239 # 240 241 from gofer.messaging import Envelope
242 243 -def test1():
244 e = Envelope(routing=('A','B')) 245 keychain = KeyChain(B=('0xAA', '0xBB')) 246 auth = HMAC(keychain) 247 outbound = auth.outbound(e) 248 print outbound 249 keychain = KeyChain(A=('0xBB', '0xAA')) 250 auth = HMAC(keychain) 251 auth.inbound(outbound) 252 print 'test1:validated'
253
254 255 -def test2():
256 # A 257 routing=('A','B') 258 e = Envelope(routing=routing) 259 keychain = KeyChain() 260 keychain.set('B', '0xAA', '0xBB') 261 keychain.set('C', ('0xXX', '0xYY')) 262 keychain.set('D', {0:'0xXX', 1:'0xYY'}) 263 T = ('C', '0xGG', '0xZZ') 264 keychain.set(*T) 265 auth = HMAC(keychain) 266 outbound = auth.outbound(e) 267 print outbound 268 # B 269 keychain = KeyChain() 270 keychain.add('A', signing='0xBB', validation='0xAA') 271 auth = HMAC(keychain) 272 auth.inbound(outbound) 273 print 'test2:validated'
274
275 -def test3():
276 def keyfn():return '0xDEADBEAF' 277 routing=('A','B') 278 e = Envelope(routing=routing) 279 keychain = KeyChain() 280 keychain.default(keyfn) 281 auth = HMAC(keychain) 282 outbound = auth.outbound(e) 283 auth.inbound(outbound) 284 print 'test3:validated'
285 286 if __name__ == '__main__': 287 test1() 288 test2() 289 test3() 290