Repoze.who/Repoze.what with Pylons (step by step)

After working through quite a bit of the documentation on the pylons site and the repoze site, I didn’t really find a step by step guide to get repoze.who/repoze.what working with Pylons.

Some of the references used:

* http://code.gustavonarea.net/repoze.what-pylons/Manual/Protecting.html
* http://wiki.pylonshq.com/display/pylonscookbook/Authorization+with+repoze.what

Thanks to my nephew Qwait for overloading ActionProtector to intercept 403s and give an additional chance to authenticate.

wget http://pylonshq.com/download/1.0/go-pylons.py
python go-pylons.py --no-site-packages pylons
cd pylons
source bin/activate
easy_install repoze.what-quickstart
easy_install repoze.what-pylons
easy_install mysql-python
paster create -t pylons project
cd project

Because of the association table and the cascade, you must use MySQL or Postgresql rather than SQLite. You also might need to modify development.ini [server:main] host/port.

cd project

config/middleware.py:

    # CUSTOM MIDDLEWARE HERE (filtered by error handling middlewares)
    from project.lib.auth import add_auth
    app = add_auth(app, config)

config/routing.py:

    map.connect('/login', controller='login', action='login')
    map.connect('/login/submit', controller='login', action='login_handler')
    map.connect('/login/continue', controller='login', action='post_login')
    map.connect('/logout/continue', controller='login', action='post_logout')
    map.connect('/logout', controller='login', action='logout_handler')

lib/auth.py: – modified to intercept 403 and provide chance to authenticate

from pylons import response, url
from pylons.controllers.util import redirect

from repoze.what.plugins.quickstart import setup_sql_auth
from repoze.what.plugins import pylonshq

import project.lib.helpers as h

from project.model.meta import Session
from project.model.auth import AuthUser, AuthGroup, AuthPermission

def add_auth(app, config):
   return setup_sql_auth(
       app, AuthUser, AuthGroup, AuthPermission, Session,
       login_handler = '/login/submit',
       logout_handler = '/logout',
       post_login_url = '/login/continue',
       post_logout_url = '/logout/continue',
       cookie_secret = 'my_secret_word',
       translations = {
           'user_name' : 'username',
           'groups' : 'auth_groups',
           'group_name' : 'name',
           'permissions' : 'auth_permissions',
           'permission_name' : 'name'
       }
   )

def redirect_auth_denial(reason):
    if response.status_int == 401:
        message = 'You are not logged in.'
        message_type = 'warning'
    else:
        message = 'You do not have the permissions to access this page.'
        message_type = 'error'

    h.flash(message, message_type)
    redirect(url('/login', came_from=url.current()))

class ActionProtector(pylonshq.ActionProtector):
    default_denial_handler = staticmethod(redirect_auth_denial)

model/auth.py:

from sqlalchemy import *
from sqlalchemy.databases import mysql
from sqlalchemy.orm import relation, backref, synonym
from sqlalchemy.orm.exc import NoResultFound

from project.model.meta import Base

import os
from hashlib import sha1
from datetime import datetime

group_permission_table = Table('auth_group_permissions', Base.metadata,
    Column('group_id', mysql.BIGINT(20, unsigned=True), ForeignKey('auth_groups.id', onupdate='CASCADE', ondelete='CASCADE')),
    Column('permission_id', mysql.BIGINT(20, unsigned=True), ForeignKey('auth_permissions.id', onupdate='CASCADE', ondelete='CASCADE'))
)
user_group_table = Table('auth_user_groups', Base.metadata,
    Column('user_id', mysql.BIGINT(20, unsigned=True), ForeignKey('auth_users.id', onupdate='CASCADE', ondelete='CASCADE')),
    Column('group_id', mysql.BIGINT(20, unsigned=True), ForeignKey('auth_groups.id', onupdate='CASCADE', ondelete='CASCADE'))
)

class AuthGroup(Base):
    __tablename__ = 'auth_groups'

    id = Column(mysql.BIGINT(20, unsigned=True), primary_key=True, autoincrement=True)
    name = Column(Unicode(80), unique=True, nullable=False)
    created = Column(mysql.DATE())

    users = relation('AuthUser', secondary=user_group_table, backref='auth_groups')

    def __repr__(self):
        return '<group: name=%s>' % self.name

    def __unicode__(self):
        return self.name

class AuthUser(Base):
    __tablename__ = 'auth_users'

    id = Column(mysql.BIGINT(20, unsigned=True), primary_key=True, autoincrement=True)
    username = Column(Unicode(80), nullable=False)
    _password = Column('password', Unicode(80), nullable=False)
 
    @property
    def permissions(self):
        perms = set()
        for g in self.groups:
            perms = perms | set(g.permissions)
        return perms

    def _set_password(self, password):
        hashed_password = password

        if isinstance(password, unicode):
            password_8bit = password.encode('UTF-8')
        else:
            password_8bit = password

        salt = sha1()
        salt.update(os.urandom(60))
        hash = sha1()
        hash.update(password_8bit + salt.hexdigest())
        hashed_password = salt.hexdigest() + hash.hexdigest()

        if not isinstance(hashed_password, unicode):
            hashed_password = hashed_password.decode('UTF-8')
        self._password = hashed_password

    def _get_password(self):
        return self._password

    password = synonym('_password', descriptor=property(_get_password, _set_password))

    def validate_password(self, password):
        hashed_pass = sha1()
        hashed_pass.update(password + self.password[:40])
        return self.password[40:] == hashed_pass.hexdigest()

    def __repr__(self):
        return '<user: id="%s" username="%s" email="%s">' % (self.id, self.username, self.email)

    def __unicode__(self):
        return self.username

class AuthPermission(Base):
    __tablename__ = 'auth_permissions'

    id = Column(mysql.BIGINT(20, unsigned=True), primary_key=True, autoincrement=True)
    name = Column(Unicode(80), unique=True, nullable=False)
    description = Column(mysql.TEXT())

    groups = relation(AuthGroup, secondary=group_permission_table, backref='auth_permissions')

    def __unicode__(self):
        return self.permission_name

controllers/login.py:

from pylons import request, response, session, tmpl_context, config, url
from pylons.controllers.util import redirect

from project.lib.base import BaseController, render
from project.lib.helpers import flash

class LoginController(BaseController):
    def login(self):
        login_counter = request.environ['repoze.who.logins']
        if login_counter > 0:
            flash('Wrong credentials')
        tmpl_context.login_counter = login_counter
        tmpl_context.came_from = request.params.get('came_from') or url('/')
        return render('login.mako')

    def login_handler(self):
        pass

    def post_login(self):
        identity = request.environ.get('repoze.who.identity')
        came_from = str(request.params.get('came_from', '')) or url('/')
        if not identity:
            login_counter = request.environ['repoze.who.logins'] + 1
            redirect(url('/login', came_from=came_from, __logins=login_counter))
        redirect(came_from)

    def logout_handler(self):
        pass
     
    def post_logout(self):
        redirect('/')

templates/login.mako:

<% messages = h.flash.pop_messages() %>
% if messages:
<div class="flash">
    % for message in messages:
      <p class="${message.category}">${message}
    % endfor
</div>
% endif

  <form action="${h.url('/login/submit', came_from=tmpl_context.came_from, __logins=tmpl_context.login_counter)}" method="POST">
    <label for="login">Username:<input type="text" id="login" name="login" /><br />
    <label for="password">Password:<input type="password" id="password" name="password" />
    <input type="submit" value="Login" />
  </form>

controllers/root.py:

from pylons import request, response, session, tmpl_context, config

#from repoze.what.plugins.pylonshq import ActionProtector, ControllerProtector
from project.lib.auth import ActionProtector
from repoze.what.predicates import is_user, has_permission, in_group

from project.lib.base import BaseController, render

class RootController(BaseController):
    def index(self):
        return render('index.mako')

    @ActionProtector(is_user('test'))
    def user(self):
        return render('loggedin.mako')

    @ActionProtector(is_user('nottest'))
    def notuser(self):
        return render('loggedin.mako')

    @ActionProtector(in_group('admin'))
    def admin(self):
        return render('loggedin.mako')

    @ActionProtector(has_permission('edit'))
    def edit(self):
        return render('loggedin.mako')

lib/helpers.py:

from pylons import url

from webhelpers.pylonslib import Flash as _Flash
flash = _Flash()

websetup.py: – after the Session, Base import:

from project.model.auth import *

Setup/Create the database, start paster

paster setup-app development.ini
paster serve --reload development.ini

createuser.py:

#!/usr/bin/python2.6

from sqlalchemy import create_engine
engine = create_engine('mysql://user:pass@localhost/dbname', echo=True)
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()

from project.model.auth import *

u = AuthUser()
u.username = u'test'
u.password = u'test'
session.add(u)
g = AuthGroup()
g.name = u'admin'
g.users.append(u)
session.add(g)
p = AuthPermission()
p.name = u'edit'
p.groups.append(g)
session.add(p)
session.commit()

Tags: ,

Leave a Reply

You must be logged in to post a comment.

Entries (RSS) and Comments (RSS).
Cluster host: li