The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
#

# $HeadURL: http://svn.apache.org/repos/asf/subversion/branches/1.7.x/tools/hook-scripts/svnperms.py $
# $LastChangedDate: 2011-07-12 18:37:44 +0000 (Tue, 12 Jul 2011) $
# $LastChangedBy: blair $
# $LastChangedRevision: 1145712 $

import sys, os
import getopt
import shlex

try:
  # Python >=3.0
  from subprocess import getstatusoutput as subprocess_getstatusoutput
except ImportError:
  # Python <3.0
  from commands import getstatusoutput as subprocess_getstatusoutput
try:
    my_getopt = getopt.gnu_getopt
except AttributeError:
    my_getopt = getopt.getopt
import re

__author__ = "Gustavo Niemeyer <gustavo@niemeyer.net>"

class Error(Exception): pass

SECTION = re.compile(r'\[([^]]+?)(?:\s+extends\s+([^]]+))?\]')
OPTION = re.compile(r'(\S+)\s*=\s*(.*)$')

class Config:
    def __init__(self, filename):
        # Options are stored in __sections_list like this:
        # [(sectname, [(optname, optval), ...]), ...]
        self._sections_list = []
        self._sections_dict = {}
        self._read(filename)

    def _read(self, filename):
        # Use the same logic as in ConfigParser.__read()
        file = open(filename)
        cursectdict = None
        optname = None
        lineno = 0
        for line in file:
            lineno = lineno + 1
            if line.isspace() or line[0] == '#':
                continue
            if line[0].isspace() and cursectdict is not None and optname:
                value = line.strip()
                cursectdict[optname] = "%s %s" % (cursectdict[optname], value)
                cursectlist[-1][1] = "%s %s" % (cursectlist[-1][1], value)
            else:
                m = SECTION.match(line)
                if m:
                    sectname = m.group(1)
                    parentsectname = m.group(2)
                    if parentsectname is None:
                        # No parent section defined, so start a new section
                        cursectdict = self._sections_dict.setdefault \
                            (sectname, {})
                        cursectlist = []
                    else:
                        # Copy the parent section into the new section
                        parentsectdict = self._sections_dict.get \
                            (parentsectname, {})
                        cursectdict = self._sections_dict.setdefault \
                            (sectname, parentsectdict.copy())
                        cursectlist = self.walk(parentsectname)
                    self._sections_list.append((sectname, cursectlist))
                    optname = None
                elif cursectdict is None:
                    raise Error("%s:%d: no section header" % \
                                 (filename, lineno))
                else:
                    m = OPTION.match(line)
                    if m:
                        optname, optval = m.groups()
                        optval = optval.strip()
                        cursectdict[optname] = optval
                        cursectlist.append([optname, optval])
                    else:
                        raise Error("%s:%d: parsing error" % \
                                     (filename, lineno))

    def sections(self):
        return list(self._sections_dict.keys())

    def options(self, section):
        return list(self._sections_dict.get(section, {}).keys())

    def get(self, section, option, default=None):
        return self._sections_dict.get(option, default)

    def walk(self, section, option=None):
        ret = []
        for sectname, options in self._sections_list:
            if sectname == section:
                for optname, value in options:
                    if not option or optname == option:
                        ret.append((optname, value))
        return ret


class Permission:
    def __init__(self):
        self._group = {}
        self._permlist = []

    def parse_groups(self, groupsiter):
        for option, value in groupsiter:
            groupusers = []
            for token in shlex.split(value):
                # expand nested groups in place; no forward decls
                if token[0] == "@":
                    try:
                        groupusers.extend(self._group[token[1:]])
                    except KeyError:
                        raise Error, "group '%s' not found" % token[1:]
                else:
                    groupusers.append(token)
            self._group[option] = groupusers

    def parse_perms(self, permsiter):
        for option, value in permsiter:
            # Paths never start with /, so remove it if provided
            if option[0] == "/":
                option = option[1:]
            pattern = re.compile("^%s$" % option)
            for entry in value.split():
                openpar, closepar = entry.find("("), entry.find(")")
                groupsusers = entry[:openpar].split(",")
                perms = entry[openpar+1:closepar].split(",")
                users = []
                for groupuser in groupsusers:
                    if groupuser[0] == "@":
                        try:
                            users.extend(self._group[groupuser[1:]])
                        except KeyError:
                            raise Error("group '%s' not found" % \
                                         groupuser[1:])
                    else:
                        users.append(groupuser)
                self._permlist.append((pattern, users, perms))

    def get(self, user, path):
        ret = []
        for pattern, users, perms in self._permlist:
            if pattern.match(path) and (user in users or "*" in users):
                ret = perms
        return ret

class SVNLook:
    def __init__(self, repospath, txn=None, rev=None):
        self.repospath = repospath
        self.txn = txn
        self.rev = rev

    def _execcmd(self, *cmd, **kwargs):
        cmdstr = " ".join(cmd)
        status, output = subprocess_getstatusoutput(cmdstr)
        if status != 0:
            sys.stderr.write(cmdstr)
            sys.stderr.write("\n")
            sys.stderr.write(output)
            raise Error("command failed: %s\n%s" % (cmdstr, output))
        return status, output

    def _execsvnlook(self, cmd, *args, **kwargs):
        execcmd_args = ["svnlook", cmd, self.repospath]
        self._add_txnrev(execcmd_args, kwargs)
        execcmd_args += args
        execcmd_kwargs = {}
        keywords = ["show", "noerror"]
        for key in keywords:
            if key in kwargs:
                execcmd_kwargs[key] = kwargs[key]
        return self._execcmd(*execcmd_args, **execcmd_kwargs)

    def _add_txnrev(self, cmd_args, received_kwargs):
        if "txn" in received_kwargs:
            txn = received_kwargs.get("txn")
            if txn is not None:
                cmd_args += ["-t", txn]
        elif self.txn is not None:
            cmd_args += ["-t", self.txn]
        if "rev" in received_kwargs:
            rev = received_kwargs.get("rev")
            if rev is not None:
                cmd_args += ["-r", rev]
        elif self.rev is not None:
            cmd_args += ["-r", self.rev]

    def changed(self, **kwargs):
        status, output = self._execsvnlook("changed", **kwargs)
        if status != 0:
            return None
        changes = []
        for line in output.splitlines():
            line = line.rstrip()
            if not line: continue
            entry = [None, None, None]
            changedata, changeprop, path = None, None, None
            if line[0] != "_":
                changedata = line[0]
            if line[1] != " ":
                changeprop = line[1]
            path = line[4:]
            changes.append((changedata, changeprop, path))
        return changes

    def author(self, **kwargs):
        status, output = self._execsvnlook("author", **kwargs)
        if status != 0:
            return None
        return output.strip()


def check_perms(filename, section, repos, txn=None, rev=None, author=None):
    svnlook = SVNLook(repos, txn=txn, rev=rev)
    if author is None:
        author = svnlook.author()
    changes = svnlook.changed()
    try:
        config = Config(filename)
    except IOError:
        raise Error("can't read config file "+filename)
    if not section in config.sections():
        raise Error("section '%s' not found in config file" % section)
    perm = Permission()
    perm.parse_groups(config.walk("groups"))
    perm.parse_groups(config.walk(section+" groups"))
    perm.parse_perms(config.walk(section))
    permerrors = []
    for changedata, changeprop, path in changes:
        pathperms = perm.get(author, path)
        if changedata == "A" and "add" not in pathperms:
            permerrors.append("you can't add "+path)
        elif changedata == "U" and "update" not in pathperms:
            permerrors.append("you can't update "+path)
        elif changedata == "D" and "remove" not in pathperms:
            permerrors.append("you can't remove "+path)
        elif changeprop == "U" and "update" not in pathperms:
            permerrors.append("you can't update properties of "+path)
        #else:
        #    print "cdata=%s cprop=%s path=%s perms=%s" % \
        #          (str(changedata), str(changeprop), path, str(pathperms))
    if permerrors:
        permerrors.insert(0, "you don't have enough permissions for "
                             "this transaction:")
        raise Error("\n".join(permerrors))


# Command:

USAGE = """\
Usage: svnperms.py OPTIONS

Options:
    -r PATH    Use repository at PATH to check transactions
    -t TXN     Query transaction TXN for commit information
    -f PATH    Use PATH as configuration file (default is repository
               path + /conf/svnperms.conf)
    -s NAME    Use section NAME as permission section (default is
               repository name, extracted from repository path)
    -R REV     Query revision REV for commit information (for tests)
    -A AUTHOR  Check commit as if AUTHOR had committed it (for tests)
    -h         Show this message
"""

class MissingArgumentsException(Exception):
    "Thrown when required arguments are missing."
    pass

def parse_options():
    try:
        opts, args = my_getopt(sys.argv[1:], "f:s:r:t:R:A:h", ["help"])
    except getopt.GetoptError, e:
        raise Error(e.msg)
    class Options: pass
    obj = Options()
    obj.filename = None
    obj.section = None
    obj.repository = None
    obj.transaction = None
    obj.revision = None
    obj.author = None
    for opt, val in opts:
        if opt == "-f":
            obj.filename = val
        elif opt == "-s":
            obj.section = val
        elif opt == "-r":
            obj.repository = val
        elif opt == "-t":
            obj.transaction = val
        elif opt == "-R":
            obj.revision = val
        elif opt == "-A":
            obj.author = val
        elif opt in ["-h", "--help"]:
            sys.stdout.write(USAGE)
            sys.exit(0)
    missingopts = []
    if not obj.repository:
        missingopts.append("repository")
    if not (obj.transaction or obj.revision):
        missingopts.append("either transaction or a revision")
    if missingopts:
        raise MissingArgumentsException("missing required option(s): " + ", ".join(missingopts))
    obj.repository = os.path.abspath(obj.repository)
    if obj.filename is None:
        obj.filename = os.path.join(obj.repository, "conf", "svnperms.conf")
    if obj.section is None:
        obj.section = os.path.basename(obj.repository)
    if not (os.path.isdir(obj.repository) and
            os.path.isdir(os.path.join(obj.repository, "db")) and
            os.path.isdir(os.path.join(obj.repository, "hooks")) and
            os.path.isfile(os.path.join(obj.repository, "format"))):
        raise Error("path '%s' doesn't look like a repository" % \
                     obj.repository)

    return obj

def main():
    try:
        opts = parse_options()
        check_perms(opts.filename, opts.section,
                    opts.repository, opts.transaction, opts.revision,
                    opts.author)
    except MissingArgumentsException, e:
        sys.stderr.write("%s\n" % str(e))
        sys.stderr.write(USAGE)
        sys.exit(1)
    except Error, e:
        sys.stderr.write("error: %s\n" % str(e))
        sys.exit(1)

if __name__ == "__main__":
    main()

# vim:et:ts=4:sw=4