import logging
import syslog
from passlib.hash import sha512_crypt
import ajenti
import ajenti.usersync
from ajenti.api import *
[docs]def restrict(permission):
"""
Marks a decorated function as requiring ``permission``.
If the invoking user doesn't have one, :class:`SecurityError` is raised.
"""
def decorator(fx):
def wrapper(*args, **kwargs):
UserManager.get().require_permission(extract_context(), permission)
return fx(*args, **kwargs)
return wrapper
return decorator
[docs]class SecurityError (Exception):
"""
Indicates that user didn't have a required permission.
.. attribute:: permission
permission ID
"""
def __init__(self, permission):
self.permission = permission
def __str__(self):
return 'Permission "%s" required' % self.permission
@plugin
@persistent
@rootcontext
[docs]class UserManager (BasePlugin):
default_classconfig = {'sync-provider': ''}
classconfig_root = True
[docs] def check_password(self, username, password, env=None):
"""
Verifies the given username/password combo
:type username: str
:type password: str
:rtype: bool
"""
if not username or not password:
return False
provider = self.get_sync_provider(fallback=True)
if username == 'root' and not provider.syncs_root:
provider = ajenti.usersync.AjentiSyncProvider.get()
if not username in ajenti.config.tree.users:
return False
try:
provider.sync()
except Exception as e:
logging.error(str(e))
result = provider.check_password(username, password)
provider_name = type(provider).__name__
ip_notion = ''
ip = env.get('REMOTE_ADDR', None) if env else None
if ip:
ip_notion = ' from %s' % ip
if not result:
msg = 'failed login attempt for %s ("%s") through %s%s' % \
(username, password, provider_name, ip_notion)
syslog.syslog(syslog.LOG_WARNING, msg)
logging.warn(msg)
else:
msg = 'user %s logged in through %s%s' % (username, provider_name, ip_notion)
syslog.syslog(syslog.LOG_INFO, msg)
logging.info(msg)
return result
[docs] def hash_password(self, password):
"""
:type password: str
:rtype: str
"""
if not password.startswith('sha512|'):
password = 'sha512|%s' % sha512_crypt.encrypt(password)
return password
[docs] def hash_passwords(self):
for user in ajenti.config.tree.users.values():
if not '|' in user.password:
user.password = self.hash_password(user.password)
[docs] def has_permission(self, context, permission):
"""
Checks whether the current user has a permission
:type permission: str
:rtype: bool
"""
if context.user.name == 'root':
return True
if not permission in context.user.permissions:
return False
return True
[docs] def require_permission(self, context, permission):
"""
Checks current user for given permission and
raises :class:`SecurityError` if he doesn't have one
:type permission: str
:raises: SecurityError
"""
if not self.has_permission(context, permission):
raise SecurityError(permission)
[docs] def get_sync_provider(self, fallback=False):
"""
:type fallback: bool
:rtype: ajenti.usersync.UserSyncProvider
"""
for p in ajenti.usersync.UserSyncProvider.get_classes():
p.get()
if p.id == self.classconfig['sync-provider']:
try:
p.get().test()
except:
if fallback:
return ajenti.usersync.AjentiSyncProvider.get()
return p.get()
[docs] def set_sync_provider(self, provider_id):
self.classconfig['sync-provider'] = provider_id
self.save_classconfig()
[docs] def set_password(self, username, password):
ajenti.config.tree.users[username].password = self.hash_password(password)
@interface
[docs]class PermissionProvider (object):
"""
Override to create your own set of permissions
"""
[docs] def get_permissions(self):
"""
Should return a list of permission names
:rtype: list
"""
return []
[docs] def get_name(self):
"""
Should return a human-friendly name for this set
of permissions (displayed in Configurator)
:rtype: str
"""
return ''
__all__ = ['restrict', 'PermissionProvider', 'SecurityError', 'UserManager']