The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
#!/usr/bin/env python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# Setup your environment so that `which svn` shows the svn you want to test.
# Just run this file, no parameters. Test files are created in /tmp/...
# To adjust which tests are run, look at 'p = Permutations(' way below...
#
# This runs an insane amount of tests of add-vs.-add situations during update
# and switch. The output scrolls by, and a summary for all tests with simple
# greps and wc.db SELECT results is printed in the very end.
#
# There is no automatic validation. You have to read the results.
#
# To run a gdb in any given place, replace a 'svn()' with 'gdbsvn()',
# presumably in either up() or sw().

from subprocess import Popen, PIPE, call
from types import FunctionType, ListType, TupleType
import tempfile, os
from itertools import product


def run_cmd(cmd, verbose=True, shell=False):
  if verbose:
    if shell:
      print '\n---', cmd
    else:
      print '\n---', ' '.join(cmd)
  p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=shell)
  stdout,stderr = p.communicate()[0:2]
  if verbose and stdout:
    print stdout,
  if verbose and stderr:
    print stderr,
  return stdout,stderr

def qsvn(*args):
  return run_cmd(['svn'] + list(args), False)

def svn(*args):
  return run_cmd(['svn'] + list(args))

def gdbsvn(*args):
  call(['gdb', '--args', 'svn'] + list(args))
  return 'gdb', 'gdb'

def shell(script):
  return run_cmd(script, shell=True)

def rewrite_file(path, contents, auto_newline='\n\n'):
  dirname = os.path.dirname(path)
  if dirname and not os.path.lexists(dirname):
    os.makedirs(dirname)
  f = open(path, "w")
  f.write(contents + (auto_newline or ''))
  f.close()

def read_file(path):
  try:
    f = open(path, "r")
    contents = f.read()
    f.close()
    return contents
  except IOError:
    return None

def append_file(path, contents):
  dirname = os.path.dirname(path)
  if not os.path.lexists(dirname):
    os.makedirs(dirname)
  f = open(path, "a")
  f.write(contents)
  f.close()

def remove_file(path):
  if os.path.isfile(path):
    os.remove(path)

def tempdir():
  return tempfile.mkdtemp(prefix='tc_add-')

j = os.path.join

f = 'f' # file
d = 'd' # dir
l = 'l' # symbolic link

class TestContext:
  def __init__(self):
    self.base = tempdir()
    self.repos = j(self.base, 'repos')
    self.URL = 'file://' + self.repos
    shell('svnadmin create "' + self.repos + '"')
    self.WC = j(self.base, 'wc')
    svn('checkout', self.URL, self.WC)

  def create_wc2(self):
    self.WC2 = j(self.base, 'wc2')
    svn('checkout', self.URL, self.WC2)

  def wc(self, *relpath):
    if not relpath:
      return self.WC
    return j(self.WC, *relpath)

  def wc2(self, *relpath):
    if not relpath:
      return self.WC2
    return j(self.WC2, *relpath)

  def url(self, relpath):
    if not relpath:
      return self.URL
    return self.URL + '/' + relpath

  def head(self):
    out, err = qsvn('info', self.URL)
    revstr = 'Revision: '
    for line in out.split('\n'):
      if line.startswith(revstr):
        return int(line.strip()[len(revstr):])



def unver(ctx, target, kind, content=None):
  if not content:
    content = 'content of ' + os.path.basename(target)

  if kind == f:
    rewrite_file(target, content)
    shell('cat ' + target)
  elif kind == l:
    os.symlink('symlink', target)
  else:
    os.mkdir(target)



def add(ctx, target, kind, content):
  unver(ctx, target, kind, content)
  svn('add', target)
  svn('ps', 'PROP_add_' + content + '_' + kind,
      'content_add_' + content + '_' + kind,
      target)



def cp(ctx, suffix, target, kind, content=None):
  if not content:
    content = 'modified ' + os.path.basename(target)

  if kind == f:
    src = ctx.url('file' + suffix)
  elif kind == l:
    src = ctx.url('symlink' + suffix)
  else:
    src = ctx.url('dir' + suffix)

  src = src + '@1'
  svn('copy', src, target)
  svn('ps', 'PROP_copied_' + content + '_' + kind + suffix,
      'content_' + content + '_' + kind + suffix,
      target)
  svn('status', target)


def cp1(ctx, target, kind, content=None):
  return cp(ctx, '1', target, kind, content)

def cp2(ctx, target, kind, content=None):
  return cp(ctx, '2', target, kind, content)

def prepare_cp(ctx, kind, suffix):
  if kind == f:
    target = ctx.wc('file' + suffix)
    if not os.path.lexists(target):
      rewrite_file(target, 'copy source ' + suffix)
      svn('add', target)
      svn('ps', 'PROP_copy_source_' + kind + suffix,
          'content_copy_source_' + kind + suffix,
          target)
  elif kind == l:
    target = ctx.wc('symlink' + suffix)
    if not os.path.lexists(target):
      os.symlink('copy_source' + suffix, target)
      svn('add', target)
      svn('ps', 'PROP_copy_source_' + kind + suffix,
          'content_copy_source_' + kind + suffix,
          target)
  else:
    target = ctx.wc('dir' + suffix)
    svn('mkdir', target)
    svn('ps', 'PROP_copy_source_' + kind + suffix,
        'content_copy_source_' + kind + suffix,
        target)

def postpare_cp(ctx, kind, suffix):
  if kind == f:
    target = ctx.wc('file' + suffix)
    rewrite_file(target, 'local mod on copy source ' + suffix)
    svn('ps', 'PROP_local_copy_src_mod_' + kind + suffix,
        'content_local_copy_src_mod_' + kind + suffix,
        target)
  elif kind == l:
    target = ctx.wc('symlink' + suffix)
    svn('ps', 'PROP_local_copy_src_mod_' + kind + suffix,
        'content_local_copy_src_mod_' + kind + suffix,
        target)
  else:
    target = ctx.wc('dir' + suffix)
    svn('ps', 'PROP_local_copy_src_mod_' + kind + suffix,
        'content_local_copy_src_mod_' + kind + suffix,
        target)


def prepare(ctx, action, kind):
  if action == cp1:
    prepare_cp(ctx, kind, '1')
  elif action == cp2:
    prepare_cp(ctx, kind, '2')


def postpare(ctx, action, kind):
  if action == cp1:
    postpare_cp(ctx, kind, '1')
  elif action == cp2:
    postpare_cp(ctx, kind, '2')



def co(name, local_action, local_kind, incoming_action, incoming_kind):
  ctx = TestContext()

  prepare(ctx, local_action, local_kind)
  prepare(ctx, incoming_action, incoming_kind)

  svn('commit', '-mm', ctx.WC)
  svn('up', ctx.WC)

  head = ctx.head()
  print head

  ctx.create_wc2()
  target = ctx.wc2(name)
  incoming_action(ctx, target, incoming_kind, 'incoming')
  svn('commit', '-mm', ctx.WC2)

  target = ctx.wc(name)
  local_action(ctx, target, local_kind, 'local')

  postpare(ctx, local_action, local_kind)
  postpare(ctx, incoming_action, incoming_kind)

  # get conflicts
  o1,e1 = shell('yes p | svn checkout "' + ctx.URL + '" ' +
                '"' + ctx.WC + '"')
  o2,e2 = svn('status', ctx.WC)
  o3,e3 = run_cmd(['sqlite3', ctx.wc('.svn', 'wc.db'),
                   'select local_relpath,properties from base_node; '
                   +'select local_relpath,properties from working_node; '
                   +'select local_relpath,properties from actual_node; '
                   ])
  return o1, e1, o2, e2, o3, e3


def up(name, local_action, local_kind, incoming_action, incoming_kind):
  ctx = TestContext()

  prepare(ctx, local_action, local_kind)
  prepare(ctx, incoming_action, incoming_kind)

  svn('commit', '-mm', ctx.WC)
  svn('up', ctx.WC)

  head = ctx.head()
  print head

  target = ctx.wc(name)
  incoming_action(ctx, target, incoming_kind, 'incoming')
  svn('commit', '-mm', ctx.WC)

  # time warp
  svn('update', '-r', str(head), ctx.WC)

  local_action(ctx, target, local_kind, 'local')

  postpare(ctx, local_action, local_kind)
  postpare(ctx, incoming_action, incoming_kind)

  # get conflicts
  o1,e1 = svn('update', '--accept=postpone', ctx.WC)
  o2,e2 = svn('status', ctx.WC)
  o3,e3 = run_cmd(['sqlite3', ctx.wc('.svn', 'wc.db'),
                   'select local_relpath,properties from base_node; '
                   +'select local_relpath,properties from working_node; '
                   +'select local_relpath,properties from actual_node; '
                   ])
  return o1, e1, o2, e2, o3, e3


def sw(name, local_action, local_kind, incoming_action, incoming_kind):
  ctx = TestContext()
  prepare(ctx, local_action, local_kind)
  prepare(ctx, incoming_action, incoming_kind)
  svn('commit', '-mm', ctx.WC)

  svn('mkdir', '-mm', ctx.url('trunk'))
  svn('copy', '-mm', ctx.url('trunk'), ctx.url('branch'))

  svn('up', ctx.WC)

  target = ctx.wc('branch', name)
  incoming_action(ctx, target, incoming_kind, 'incoming')
  svn('commit', '-mm', ctx.WC)
  svn('up', ctx.WC)

  target = ctx.wc('trunk', name)
  local_action(ctx, target, local_kind, 'local')

  postpare(ctx, local_action, local_kind)
  postpare(ctx, incoming_action, incoming_kind)

  # get conflicts
  o1,e1 = svn('switch', '--accept=postpone', ctx.url('branch'), ctx.wc('trunk'))
  o2,e2 = svn('status', ctx.WC)
  o3,e3 = run_cmd(['sqlite3', ctx.wc('trunk', '.svn', 'wc.db'),
                   'select local_relpath,properties from base_node; select'
                   + ' local_relpath,properties from working_node;'])
  # This is a bit stupid. Someone rewire this.
  return o1, e1, o2, e2, o3, e3


# This controls which tests are run. All possible combinations are tested.
# The elements are functions for up,sw and add,cp1,cp2,unver, and they are
# simple strings for f (file), l (symlink), d (directory).
#
#            cmd        local action and kind     incoming action and kind
p = product((co,up,sw), (add,cp1,unver), (f,l,d), (add,cp2,cp1), (f,l,d))

# Incoming cp1 is meant to match up only with local cp1. Also, cp1-cp1 is
# supposed to perform identical copies in both incoming and local, so they
# only make sense with matching kinds. Skip all rows that don't match this:
skip = lambda row: (row[3] == cp1 and (row[4] != row[2] or row[1] != cp1)
                    #Select subsets if desired
                    #or (row[0] != up or row[3] not in [cp1, cp2])
                    #or (row[2] != l and row[4] != l)
                    #
                    #or row not in (
                    ##[up, cp1, l, add, l],
                    ##[up, cp1, f, cp2, l],
                    #[up, cp1, f, cp2, f],
                    ##[up, cp1, f, add, f],
                    ##[up, cp1, f, add, l],
                    ##[up, add, l, add, l],
                    #)
                    )




def nameof(thing):
  if isinstance(thing, FunctionType):
    return thing.__name__
  if isinstance(thing, ListType) or isinstance(thing, TupleType):
    return '_'.join([ nameof(thang) for thang in thing])
  return str(thing)


def analyze(name, outs):
  stats = []
  for o in outs:
    for line in o.split('\n'):
      if (line.startswith('svn: ') or line.startswith('      > ')
          or line.find(name) > -1):
        stats.append(line)
  return stats

#os.environ['SVN_I_LOVE_CORRUPTED_WORKING_COPIES_SO_DISABLE_SLEEP_FOR_TIMESTAMPS'] = 'yes'

results = []
name = None
try:
  # there is probably a better way than this:
  for row in list(p):
    if skip(row):
      continue
    name = nameof(row)
    print name
    test_func = row[0]
    results.append( (name, analyze( name, test_func( name, *row[1:] ) )) )
except:
  if name:
    print 'Error during', name
  raise
finally:
  lines = []
  for result in results:
    name = result[0]
    if result[1]:
      lines.append('----- ' + name)
      for stat in result[1]:
        lines.append(stat)
    else:
      lines.append('----- ' + name + ': nothing.')
  dump = '\n'.join(lines)
  print dump
  rewrite_file('tree-conflicts-add-vs-add.py.results', dump)