From 5b3592c974861264a820bd83872d2a2731f1e6ff Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Wed, 5 Aug 2009 15:34:45 +0000 Subject: [PATCH] Monotone-Parent: 11e7712d6d915167dfc15be7f67ab23f7f9e7cc7 Monotone-Revision: 1c96c7fdd2f2dab8f4bc3173deb36b069ade3bf1 Monotone-Author: wsourdeau@inverse.ca Monotone-Date: 2009-08-05T15:34:45 Monotone-Branch: ca.inverse.sogo --- Tests/README | 14 +++ Tests/propfind.py | 39 +++++++ Tests/testconfig.py.in | 4 + Tests/webdav-sync.py | 71 +++++++++++++ Tests/webdavlib.py | 223 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 351 insertions(+) create mode 100644 Tests/README create mode 100755 Tests/propfind.py create mode 100644 Tests/testconfig.py.in create mode 100755 Tests/webdav-sync.py create mode 100644 Tests/webdavlib.py diff --git a/Tests/README b/Tests/README new file mode 100644 index 000000000..1dc8e0de9 --- /dev/null +++ b/Tests/README @@ -0,0 +1,14 @@ +setup +----- + +(you need "python-xml" in order to run the scripts) + +1) copy testconfig.py.in to testconfig.py (make sure to never EVER add it to monotone) +2) edit testconfig.py to suit your environment +3) run the test scripts + +other +----- + +propfind.py - a sample implementation of a PROPFIND request using webdavlib + diff --git a/Tests/propfind.py b/Tests/propfind.py new file mode 100755 index 000000000..28594e251 --- /dev/null +++ b/Tests/propfind.py @@ -0,0 +1,39 @@ +#!/usr/bin/python + +from testconfig import hostname, port, username, password + +import webdavlib + +import sys +import getopt + +def parseArguments(): + arguments = {} + +depth = "0" +quiet = False +(opts, args) = getopt.getopt(sys.argv[1:], "d:q", ["depth=", "quiet"]) + +for pair in opts: + print pair + if (pair[0] == "-d" or pair[0] == "--depth"): + depth = pair[1] + elif (pair[0] == "-q" or pair[0] == "--quiet"): + quiet = True + +# print "depth: " + depth + +nargs = len(args) +if (nargs > 0): + resource = args[0] + if (nargs > 1): + properties = args[1:] + else: + properties = [ "allprop" ] +else: + print "resource required" + sys.exit(-1) + +client = webdavlib.WebDAVClient(hostname, port, username, password) +propfind = webdavlib.WebDAVPROPFIND(resource, properties, depth) +client.execute(propfind) diff --git a/Tests/testconfig.py.in b/Tests/testconfig.py.in new file mode 100644 index 000000000..fbbaf2925 --- /dev/null +++ b/Tests/testconfig.py.in @@ -0,0 +1,4 @@ +hostname = "localhost" +port = "80" +username = "myuser" +password = "mypass" diff --git a/Tests/webdav-sync.py b/Tests/webdav-sync.py new file mode 100755 index 000000000..969a181fd --- /dev/null +++ b/Tests/webdav-sync.py @@ -0,0 +1,71 @@ +#!/usr/bin/python + +from testconfig import hostname, port, username, password + +import sys +import unittest +import webdavlib +import xml.xpath +import time + +resource = '/SOGo/dav/%s/Calendar/test-webdavsync/' % username + +class WebdavSyncTest(unittest.TestCase): + def setUp(self): + self.client = webdavlib.WebDAVClient(hostname, port, + username, password) + + def tearDown(self): + delete = webdavlib.WebDAVDELETE(resource) + self.client.execute(delete) + + def _xpath_query(self, query, top_node): + xpath_context = xml.xpath.CreateContext(top_node) + xpath_context.setNamespaces({ "D": "DAV:" }) + return xml.xpath.Evaluate(query, None, xpath_context) + + def test(self): + # missing tests: + # invalid tokens: negative, non-numeric, > current timestamp + # non-empty collections: token validity, status codes for added, + # modified and removed elements + + # preparation + mkcol = webdavlib.WebDAVMKCOL(resource) + self.client.execute(mkcol) + self.assertEquals(mkcol.response["status"], 201, + "preparation: failure creating collection (code != 201)") + + # test queries: + # empty collection: + # without a token (query1) + # with a token (query2) + # non-empty collection: + # without a token (query3) + # with a token (query4) + + query1 = webdavlib.WebDAVSyncQuery(resource, None, [ "getetag" ]) + self.client.execute(query1) + self.assertEquals(query1.response["status"], 207, + ("query1: invalid status code: %d (!= 207)" + % query1.response["status"])) + token_node = self._xpath_query("/D:multistatus/D:sync-token", + query1.response["document"])[0] + # Implicit "assertion": we expect SOGo to return a token node, with a + # non-empty numerical value. Anything else will trigger an exception + token = int(token_node.childNodes[0].nodeValue) + + self.assertTrue(token > 0) + self.assertTrue(token < int(query1.start)) + + # we make sure that any token is invalid when the collection is empty + query2 = webdavlib.WebDAVSyncQuery(resource, "1234", [ "getetag" ]) + self.client.execute(query2) + self.assertEquals(query2.response["status"], 403) + cond_nodes = self._xpath_query("/D:error/D:valid-sync-token", + query2.response["document"]) + self.assertTrue(len(cond_nodes) > 0, + "expected 'valid-sync-token' condition error") + +if __name__ == "__main__": + unittest.main() diff --git a/Tests/webdavlib.py b/Tests/webdavlib.py new file mode 100644 index 000000000..3498e9706 --- /dev/null +++ b/Tests/webdavlib.py @@ -0,0 +1,223 @@ +import httplib +import M2Crypto.httpslib +import time +import xml.sax.saxutils +import xml.dom.ext.reader.Sax2 +import sys + +class WebDAVClient: + def __init__(self, hostname, port, username, password, forcessl = False): + if port == "443" or forcessl: + self.conn = M2Crypto.httpslib.HTTPSConnection(hostname, int(port), + True) + else: + self.conn = httplib.HTTPConnection(hostname, port, True) + + self.simpleauth_hash = (("%s:%s" % (username, password)) + .encode('base64')[:-1]) + + def _prepare_headers(self, query, body): + headers = { "User-Agent": "Mozilla/5.0", + "content-length": len(body), + "authorization": "Basic %s" % self.simpleauth_hash } + if query.__dict__.has_key("query") and query.depth is not None: + headers["depth"] = query.depth + if query.__dict__.has_key("content_type"): + headers["content-type"] = query.content_type + + return headers + + def execute(self, query): + body = query.render() + + query.start = time.time() + self.conn.request(query.method, query.url, + body, self._prepare_headers(query, body)) + query.set_response(self.conn.getresponse()); + query.duration = time.time() - query.start + +class HTTPSimpleQuery: + method = None + + def __init__(self, url): + self.url = url + self.response = None + self.start = -1 + self.duration = -1 + + def render(): + return None + +class HTTPGET(HTTPSimpleQuery): + method = "GET" + +class HTTPQuery(HTTPSimpleQuery): + def __init__(self, url, content_type): + HTTPSimpleQuery.__init__(self, url) + self.content_type = content_type + + def set_response(self, http_response): + headers = {} + for rk, rv in http_response.getheaders(): + k = rk.lower() + headers[k] = rv + self.response = { "headers": headers, + "status": http_response.status, + "version": http_response.version, + "body": http_response.read() } + +class HTTPPUT(HTTPQuery): + method = "PUT" + + def __init__(self, url, content, content_type = "application/octet-stream"): + HTTPQuery.__init__(self, url, content_type) + self.content = content + + def render(self): + return self.content + +class WebDAVQuery(HTTPQuery): + method = None + + def __init__(self, url, depth = None): + HTTPQuery.__init__(self, url, "application/xml; charset=\"utf-8\"") + self.depth = depth + self.ns_mgr = _WD_XMLNS_MGR() + self.top_node = None + self.xml_response = None + + def render(self): + if self.top_node is not None: + text = ("\n%s" + % self.top_node.render(self.ns_mgr.render())) + else: + text = "" + + return text + + def render_tag(self, tag): + cb = tag.find("}") + if cb > -1: + ns = tag[1:cb] + real_tag = tag[cb+1:] + new_tag = self.ns_mgr.register(real_tag, ns) + else: + new_tag = tag + + return new_tag + + def set_response(self, http_response): + HTTPQuery.set_response(self, http_response) + headers = self.response["headers"] + if (headers.has_key("content-type") + and headers.has_key("content-length") + and (headers["content-type"].startswith("application/xml") + or headers["content-type"].startswith("text/xml")) + and int(headers["content-length"]) > 0): + dom_response = xml.dom.ext.reader.Sax2.FromXml(self.response["body"]) + self.response["document"] = dom_response.documentElement + +class WebDAVMKCOL(WebDAVQuery): + method = "MKCOL" + +class WebDAVDELETE(WebDAVQuery): + method = "DELETE" + +class WebDAVREPORT(WebDAVQuery): + method = "REPORT" + +class WebDAVPROPFIND(WebDAVQuery): + method = "PROPFIND" + + def __init__(self, url, properties, depth = None): + WebDAVQuery.__init__(self, url, depth) + self.top_node = _WD_XMLTreeElement("propfind") + props = _WD_XMLTreeElement("prop") + self.top_node.append(props) + for prop in properties: + prop_tag = self.render_tag(prop) + props.append(_WD_XMLTreeElement(prop_tag)) + +class WebDAVSyncQuery(WebDAVREPORT): + def __init__(self, url, token, properties): + WebDAVQuery.__init__(self, url) + self.top_node = _WD_XMLTreeElement("sync-collection") + + sync_token = _WD_XMLTreeElement("sync-token") + self.top_node.append(sync_token) + if token is not None: + sync_token.append(_WD_XMLTreeTextNode(token)) + + props = _WD_XMLTreeElement("prop") + self.top_node.append(props) + for prop in properties: + prop_tag = self.render_tag(prop) + props.append(_WD_XMLTreeElement(prop_tag)) + +# private classes to handle XML stuff +class _WD_XMLNS_MGR: + def __init__(self): + self.xmlns = {} + self.counter = 0 + + def render(self): + text = " xmlns=\"DAV:\"" + for k in self.xmlns: + text = text + " xmlns:%s=\"%s\"" % (self.xmlns[k], k) + + return text + + def create_key(self, namespace): + new_nssym = "n%d" % self.counter + self.counter = self.counter + 1 + self.xmlns[namespace] = new_nssym + + return new_nssym + + def register(self, tag, namespace): + if namespace != "DAV:": + if self.xmlns.has_key(namespace): + key = self.xmlns[namespace] + else: + key = self.create_key(namespace) + else: + key = None + + if key is not None: + newTag = "%s:%s" % (key, tag) + else: + newTag = tag + + return newTag + +class _WD_XMLTreeElement: + def __init__(self, tag): + self.tag = tag + self.children = [] + + def append(self, child): + self.children.append(child) + + def render(self, ns_text = None): + text = "<" + self.tag + + if ns_text is not None: + text = text + ns_text + + count = len(self.children) + if count > 0: + text = text + ">" + for x in range(0, count): + text = text + self.children[x].render() + text = text + "" + else: + text = text + "/>" + + return text + +class _WD_XMLTreeTextNode: + def __init__(self, text): + self.text = xml.sax.saxutils.escape(text) + + def render(self): + return self.text