Repoze.who/Repoze.what with Pylons (step by step)
After working through quite a bit of the documentation on the pylons site and the repoze site, I didn’t really find a step by step guide to get repoze.who/repoze.what working with Pylons.
Some of the references used:
Thanks to my nephew Qwait for overloading ActionProtector to intercept 403s and give an additional chance to authenticate.
wget python --no-site-packages pylons cd pylons source bin/activate easy_install repoze.what-quickstart easy_install repoze.what-pylons easy_install mysql-python paster create -t pylons project cd project
Because of the association table and the cascade, you must use MySQL or Postgresql rather than SQLite. You also might need to modify development.ini [server:main] host/port.
cd project
# CUSTOM MIDDLEWARE HERE (filtered by error handling middlewares) from project.lib.auth import add_auth app = add_auth(app, config)
map.connect('/login', controller='login', action='login') map.connect('/login/submit', controller='login', action='login_handler') map.connect('/login/continue', controller='login', action='post_login') map.connect('/logout/continue', controller='login', action='post_logout') map.connect('/logout', controller='login', action='logout_handler')
lib/ – modified to intercept 403 and provide chance to authenticate
from pylons import response, url from pylons.controllers.util import redirect from repoze.what.plugins.quickstart import setup_sql_auth from repoze.what.plugins import pylonshq import project.lib.helpers as h from project.model.meta import Session from project.model.auth import AuthUser, AuthGroup, AuthPermission def add_auth(app, config): return setup_sql_auth( app, AuthUser, AuthGroup, AuthPermission, Session, login_handler = '/login/submit', logout_handler = '/logout', post_login_url = '/login/continue', post_logout_url = '/logout/continue', cookie_secret = 'my_secret_word', translations = { 'user_name' : 'username', 'groups' : 'auth_groups', 'group_name' : 'name', 'permissions' : 'auth_permissions', 'permission_name' : 'name' } ) def redirect_auth_denial(reason): if response.status_int == 401: message = 'You are not logged in.' message_type = 'warning' else: message = 'You do not have the permissions to access this page.' message_type = 'error' h.flash(message, message_type) redirect(url('/login', came_from=url.current())) class ActionProtector(pylonshq.ActionProtector): default_denial_handler = staticmethod(redirect_auth_denial)
from sqlalchemy import * from sqlalchemy.databases import mysql from sqlalchemy.orm import relation, backref, synonym from sqlalchemy.orm.exc import NoResultFound from project.model.meta import Base import os from hashlib import sha1 from datetime import datetime group_permission_table = Table('auth_group_permissions', Base.metadata, Column('group_id', mysql.BIGINT(20, unsigned=True), ForeignKey('', onupdate='CASCADE', ondelete='CASCADE')), Column('permission_id', mysql.BIGINT(20, unsigned=True), ForeignKey('', onupdate='CASCADE', ondelete='CASCADE')) ) user_group_table = Table('auth_user_groups', Base.metadata, Column('user_id', mysql.BIGINT(20, unsigned=True), ForeignKey('', onupdate='CASCADE', ondelete='CASCADE')), Column('group_id', mysql.BIGINT(20, unsigned=True), ForeignKey('', onupdate='CASCADE', ondelete='CASCADE')) ) class AuthGroup(Base): __tablename__ = 'auth_groups' id = Column(mysql.BIGINT(20, unsigned=True), primary_key=True, autoincrement=True) name = Column(Unicode(80), unique=True, nullable=False) created = Column(mysql.DATE()) users = relation('AuthUser', secondary=user_group_table, backref='auth_groups') def __repr__(self): return '<group: name=%s>' % def __unicode__(self): return class AuthUser(Base): __tablename__ = 'auth_users' id = Column(mysql.BIGINT(20, unsigned=True), primary_key=True, autoincrement=True) username = Column(Unicode(80), nullable=False) _password = Column('password', Unicode(80), nullable=False) @property def permissions(self): perms = set() for g in self.groups: perms = perms | set(g.permissions) return perms def _set_password(self, password): hashed_password = password if isinstance(password, unicode): password_8bit = password.encode('UTF-8') else: password_8bit = password salt = sha1() salt.update(os.urandom(60)) hash = sha1() hash.update(password_8bit + salt.hexdigest()) hashed_password = salt.hexdigest() + hash.hexdigest() if not isinstance(hashed_password, unicode): hashed_password = hashed_password.decode('UTF-8') self._password = hashed_password def _get_password(self): return self._password password = synonym('_password', descriptor=property(_get_password, _set_password)) def validate_password(self, password): hashed_pass = sha1() hashed_pass.update(password + self.password[:40]) return self.password[40:] == hashed_pass.hexdigest() def __repr__(self): return '<user: id="%s" username="%s" email="%s">' % (, self.username, def __unicode__(self): return self.username class AuthPermission(Base): __tablename__ = 'auth_permissions' id = Column(mysql.BIGINT(20, unsigned=True), primary_key=True, autoincrement=True) name = Column(Unicode(80), unique=True, nullable=False) description = Column(mysql.TEXT()) groups = relation(AuthGroup, secondary=group_permission_table, backref='auth_permissions') def __unicode__(self): return self.permission_name
from pylons import request, response, session, tmpl_context, config, url from pylons.controllers.util import redirect from project.lib.base import BaseController, render from project.lib.helpers import flash class LoginController(BaseController): def login(self): login_counter = request.environ['repoze.who.logins'] if login_counter > 0: flash('Wrong credentials') tmpl_context.login_counter = login_counter tmpl_context.came_from = request.params.get('came_from') or url('/') return render('login.mako') def login_handler(self): pass def post_login(self): identity = request.environ.get('repoze.who.identity') came_from = str(request.params.get('came_from', '')) or url('/') if not identity: login_counter = request.environ['repoze.who.logins'] + 1 redirect(url('/login', came_from=came_from, __logins=login_counter)) redirect(came_from) def logout_handler(self): pass def post_logout(self): redirect('/')
<% messages = h.flash.pop_messages() %> % if messages: <div class="flash"> % for message in messages: <p class="${message.category}">${message} % endfor </div> % endif <form action="${h.url('/login/submit', came_from=tmpl_context.came_from, __logins=tmpl_context.login_counter)}" method="POST"> <label for="login">Username:<input type="text" id="login" name="login" /><br /> <label for="password">Password:<input type="password" id="password" name="password" /> <input type="submit" value="Login" /> </form>
from pylons import request, response, session, tmpl_context, config #from repoze.what.plugins.pylonshq import ActionProtector, ControllerProtector from project.lib.auth import ActionProtector from repoze.what.predicates import is_user, has_permission, in_group from project.lib.base import BaseController, render class RootController(BaseController): def index(self): return render('index.mako') @ActionProtector(is_user('test')) def user(self): return render('loggedin.mako') @ActionProtector(is_user('nottest')) def notuser(self): return render('loggedin.mako') @ActionProtector(in_group('admin')) def admin(self): return render('loggedin.mako') @ActionProtector(has_permission('edit')) def edit(self): return render('loggedin.mako')
from pylons import url from webhelpers.pylonslib import Flash as _Flash flash = _Flash() – after the Session, Base import:
from project.model.auth import *
Setup/Create the database, start paster
paster setup-app development.ini paster serve --reload development.ini
#!/usr/bin/python2.6 from sqlalchemy import create_engine engine = create_engine('mysql://user:pass@localhost/dbname', echo=True) from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) session = Session() from project.model.auth import * u = AuthUser() u.username = u'test' u.password = u'test' session.add(u) g = AuthGroup() = u'admin' g.users.append(u) session.add(g) p = AuthPermission() = u'edit' p.groups.append(g) session.add(p) session.commit()