#!/usr/bin/env python from __future__ import with_statement import sys, os if __name__ == '__main__': sys.path = ([os.path.dirname(os.path.dirname(os.path.dirname( os.path.realpath(__file__))))] + sys.path) __requires__='Bazki' import pkg_resources pkg_resources.require('Bazki') import unittest, warnings import subprocess, shutil import tempfile from paste.script import command from bazbase import custom, model, testing, db import bazsvn verbose = '-v' in sys.argv def run(cmd): if not verbose: cmd.append('-q') p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if p.returncode != 0: raise EnvironmentError(p.returncode, err) here = os.path.abspath(os.path.dirname(__file__)) class BazsvnWithBazbase(unittest.TestCase): def expectRunGetsStatus(self, cmd, status, message): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() self.assertEquals(status, p.returncode) self.assertTrue(message in err, err) def setUp(self): self.cwd = os.getcwd() testing.set_up_for_tests() self.old_vcs_hook = custom.version_control_hook custom.version_control_hook = bazsvn.hook # Do this directly to bypass the bazsvn hook. model.session.begin() model.clear() model.session.commit() model.session.close() bazsvn.hook.clear_checkouts() self.repo = tempfile.mkdtemp(suffix='.testrepo') bazsvn.custom.REPOSITORY = self.repo subprocess.check_call(['svnadmin', 'create', self.repo]) os.symlink(os.path.join(here, 'pre-commit'), os.path.join(self.repo, 'hooks', 'pre-commit')) import_cmd = ['svn', 'import', '--username', 'bazki', '-m', 'Bootstrapping...', os.path.join(here, 'db'), 'file://' + self.repo] run(import_cmd) self.checkout = tempfile.mkdtemp(suffix='.testcheckout') run(['svn', 'co', "file://%s" % self.repo, self.checkout]) os.chdir(self.checkout) def tearDown(self): shutil.rmtree(self.checkout) shutil.rmtree(self.repo) os.chdir(self.cwd) custom.version_control_hook = self.old_vcs_hook def test_add_child(self): path = 'Object/Test/UnitTest.yaml' with open(path, 'w') as outf: outf.write("""comment: | This was created in a unit test. name: George """) run(['svn', 'add', path]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"UnitTest") self.assertEquals(u"George", unicode(e[u'name'].value, 'utf-8')) self.assertEquals(u"This was created in a unit test.\n", unicode(e[u'comment'].value, 'utf-8')) self.assertEquals(u"Test", e.parent.ename) def test_add_propval(self): path = 'Object/Test/Test.yaml' with open(path, 'a') as outf: outf.write("""name: Paul """) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"Test") self.assertEquals(u"Paul", unicode(e[u'name'].value, 'utf-8')) def test_remove_propval(self): path = 'Object/Test/MergeTest.yaml' with open(path, 'w') as outf: outf.write("""comment: This test edits the comment and removes the 'test' prop value. """) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"MergeTest") self.assertTrue(u'comment' in e) self.assertEquals(u"This test edits the comment and removes the 'test' prop value.", unicode(e[u'comment'].value, 'utf-8')) self.assertFalse(u'test' in e) def test_add_child_with_leaf_directory_fails(self): dirpath = 'Object/Test/UnitTest' os.mkdir(dirpath) path = dirpath + '/UnitTest.yaml' with open(path, 'w') as outf: outf.write("""comment: | This was created in a unit test. name: George """) run(['svn', 'add', dirpath]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "If UnitTest has no kids, it needs to be in the form UnitTest.yaml, not UnitTest/UnitTest.yaml!") with db.begin_transaction(): self.assertRaises(model.NoResultFound, model.Element.get, u"UnitTest") def test_remove_child(self): path = 'Object/Test/MergeTest.yaml' run(['svn', 'rm', path]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertRaises(model.NoResultFound, model.Element.get, u"MergeTest") def test_remove_last_child(self): path = 'Object/User/Admin/Daemon.yaml' parent_from = 'Object/User/Admin/Admin.yaml' parent_to = 'Object/User/Admin.yaml' parent_dir = 'Object/User/Admin/' run(['svn', 'rm', path]) run(['svn', 'mv', parent_from, parent_to]) run(['svn', 'rm', parent_dir]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertEquals("User", model.Element.get(u"Admin").parent.ename) self.assertRaises(model.NoResultFound, model.Element.get, u"Daemon") def test_remove_last_child_without_removing_leaf_directory_fails(self): path = 'Object/User/Admin/Daemon.yaml' run(['svn', 'rm', path]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "If Admin has no kids, it needs to be in the form Admin.yaml, not Admin/Admin.yaml!") # These should still work. with db.begin_transaction(): self.assertEquals(u"Admin", model.Element.get(u"Daemon").parent.ename) def test_move_subtree(self): path_from = 'Object/User/Admin' path_to = 'Object/Test/Admin' parent_dir = 'Object/User' parent_from = 'Object/User/User.yaml' parent_to = 'Object/User.yaml' run(['svn', 'mv', path_from, path_to]) run(['svn', 'mv', parent_from, parent_to]) run(['svn', 'rm', parent_dir]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertEquals("Test", model.Element.get(u"Admin").parent.ename) self.assertEquals("Admin", model.Element.get(u"Daemon").parent.ename) def test_remove_subtree(self): path = 'Object/Test' run(['svn', 'rm', path]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertRaises(model.NoResultFound, model.Element.get, u"Test") self.assertRaises(model.NoResultFound, model.Element.get, u"Sandbox") def test_remove_last_subtree(self): path = 'Object/User/Admin' parent_dir = 'Object/User' parent_from = 'Object/User/User.yaml' parent_to = 'Object/User.yaml' run(['svn', 'rm', path]) run(['svn', 'mv', parent_from, parent_to]) run(['svn', 'rm', parent_dir]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertRaises(model.NoResultFound, model.Element.get, u"Admin") self.assertEquals(0, model.Element.get(u"User").countDescendants()) def test_remove_last_subtree_without_removing_leaf_directory_fails(self): path = 'Object/User/Admin' run(['svn', 'rm', path]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "If User has no kids, it needs to be in the form User.yaml, not User/User.yaml!") with db.begin_transaction(): # These should still work. self.assertEquals(u"User", model.Element.get(u"Admin").parent.ename) self.assertEquals(u"Admin", model.Element.get(u"Daemon").parent.ename) def test_add_subtree(self): dir = 'Object/Test/UnitTest/' os.mkdir(dir) path = 'Object/Test/UnitTest/UnitTest.yaml' with open(path, 'w') as outf: outf.write("""comment: | This was created in a unit test. name: George """) path2 = 'Object/Test/UnitTest/TestTwo.yaml' with open(path2, 'w') as outf: outf.write("""comment: | Also created in a unit test. name: Ringo """) run(['svn', 'add', dir]) run(['svn', 'add', path]) run(['svn', 'add', path2]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"UnitTest") self.assertEquals(u"George", unicode(e[u'name'].value, 'utf-8')) e2 = model.Element.get(u"TestTwo") self.assertEquals(u"Ringo", unicode(e2[u'name'].value, 'utf-8')) self.assertEquals(u"This was created in a unit test.\n", unicode(e[u'comment'].value, 'utf-8')) self.assertEquals(u"Test", e.parent.ename) self.assertEquals(e, e2.parent) def test_add_include(self): path = 'Object/Test/Test.yaml' include = 'Object/Test/existent.txt' with open(path, 'a') as outf: outf.write("""name: !include existent.txt """) with open(include, 'w') as incf: incf.write("""Paul""") run(['svn', 'add', include]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"Test") self.assertEquals(u"Paul", unicode(e[u'name'].value, 'utf-8')) def test_add_broken_include_fails(self): path = 'Object/Test/Test.yaml' with open(path, 'a') as outf: outf.write("""name: !include nonexistent.txt """) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "Couldn't read include 'nonexistent.txt'") def test_remove_include(self): path = 'Object/Test/IncludeTest.yaml' include = 'Object/Test/product.txt' with open(path, 'w') as outf: outf.write("""comment: No include.""") run(['svn', 'rm', include]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertTrue(u"comment" in model.Element.get(u"IncludeTest")) self.assertFalse(u"product" in model.Element.get(u"IncludeTest")) def test_remove_include_without_removing_file(self): path = 'Object/Test/IncludeTest.yaml' include = 'Object/Test/product.txt' with open(path, 'w') as outf: outf.write("""comment: No include.""") run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertTrue(u"comment" in model.Element.get(u"IncludeTest")) self.assertFalse(u"product" in model.Element.get(u"IncludeTest")) def test_remove_file_without_removing_include_fails(self): path = 'Object/Test/IncludeTest.yaml' include = 'Object/Test/product.txt' run(['svn', 'rm', include]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "Couldn't read include 'product.txt' for " "IncludeTest.product!") def test_change_include(self): path = 'Object/Test/IncludeTest.yaml' include = 'Object/Test/product.txt' with open(include, 'w') as incf: incf.write("""42""") run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"IncludeTest") self.assertEquals("42", unicode(e[u"product"].value, 'utf-8')) def test_include_to_inline(self): path = 'Object/Test/IncludeTest.yaml' include = 'Object/Test/product.txt' with open(path, 'w') as outf: outf.write("""comment: Now inline. product: 17 """) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"IncludeTest") self.assertEquals("17", unicode(e[u"product"].value, 'utf-8')) def test_inline_to_include(self): path = 'Object/Test/MergeTest.yaml' include = 'Object/Test/existent.txt' with open(path, 'w') as outf: outf.write("""comment: Now with an include! test: !include existent.txt """) with open(include, 'w') as incf: incf.write("""The banner should be Blue!""") run(['svn', 'add', include]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): e = model.Element.get(u"MergeTest") self.assertEquals(u"The banner should be Blue!", unicode(e[u'test'].value, 'utf-8')) def test_yaml_not_under_object(self): dir = 'Not-Object/' os.mkdir(dir) path = 'Not-Object/Whatever.yaml' with open(path, 'w') as outf: outf.write("It shouldn't matter that this isn't valid yaml.") run(['svn', 'add', dir]) run(['svn', 'add', path]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertRaises(model.NoResultFound, model.Element.get, u"Whatever") def test_top_level_yaml(self): path = 'Whatever.yaml' with open(path, 'w') as outf: outf.write("It shouldn't matter that this isn't valid yaml.") run(['svn', 'add', path]) run(['svn', 'ci', '-m', 'unit test']) with db.begin_transaction(): self.assertRaises(model.NoResultFound, model.Element.get, u"Whatever") def test_overly_nested_element_fails(self): dir = 'Object/Test/Dir/' os.mkdir(dir) path = 'Object/Test/Dir/Something.yaml' with open(path, 'w') as outf: outf.write("""comment: Whatever. """) run(['svn', 'add', dir]) run(['svn', 'add', path]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "Parent 'Dir' for element 'Something' " "undefined!") def test_moving_root_fails(self): object_from = 'Object' object_to = 'Pizza' run(['svn', 'mv', object_from, object_to]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "You can't delete the root element.") def test_moving_root_and_renaming_def_fails(self): object_from = 'Object' object_to = 'Pizza' def_from = 'Pizza/Object.yaml' def_to = 'Pizza/Pizza.yaml' run(['svn', 'mv', object_from, object_to]) run(['svn', 'mv', def_from, def_to]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "You can't delete the root element.") def test_deleting_root_dir_fails(self): path = 'Object' run(['svn', 'rm', path]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "You can't delete the root element.") def test_deleting_root_def_fails(self): path = 'Object/Object.yaml' run(['svn', 'rm', path]) self.expectRunGetsStatus(['svn', 'ci', '-m', 'unit test'], 1, "You can't delete the root element.") def test_rename_from_bazbase(self): with db.begin_transaction(): sb = model.Element.get(u'Sandbox') sb.ename = u'Dustbox' run(['svn', 'up']) self.assertTrue(os.path.exists('Object/Test/Dustbox.yaml')) self.assertFalse(os.path.exists('Object/Test/Sandbox.yaml')) if __name__ == "__main__": unittest.main()