123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- # odbc test suite kindly contributed by Frank Millman.
- import os
- import sys
- import tempfile
- import unittest
-
- import odbc
- import pythoncom
- from pywin32_testutil import TestSkipped, str2bytes, str2memory
- from win32com.client import constants
-
- # We use the DAO ODBC driver
- from win32com.client.gencache import EnsureDispatch
-
-
- class TestStuff(unittest.TestCase):
- def setUp(self):
- self.tablename = "pywin32test_users"
- self.db_filename = None
- self.conn = self.cur = None
- try:
- # Test any database if a connection string is supplied...
- conn_str = os.environ["TEST_ODBC_CONNECTION_STRING"]
- except KeyError:
- # Create a local MSAccess DB for testing.
- self.db_filename = tempfile.NamedTemporaryFile().name + ".mdb"
-
- # Create a brand-new database - what is the story with these?
- for suffix in (".36", ".35", ".30"):
- try:
- dbe = EnsureDispatch("DAO.DBEngine" + suffix)
- break
- except pythoncom.com_error:
- pass
- else:
- raise TestSkipped("Can't find a DB engine")
-
- workspace = dbe.Workspaces(0)
-
- newdb = workspace.CreateDatabase(
- self.db_filename, constants.dbLangGeneral, constants.dbEncrypt
- )
-
- newdb.Close()
-
- conn_str = "Driver={Microsoft Access Driver (*.mdb)};dbq=%s;Uid=;Pwd=;" % (
- self.db_filename,
- )
- ## print 'Connection string:', conn_str
- self.conn = odbc.odbc(conn_str)
- # And we expect a 'users' table for these tests.
- self.cur = self.conn.cursor()
- ## self.cur.setoutputsize(1000)
- try:
- self.cur.execute("""drop table %s""" % self.tablename)
- except (odbc.error, odbc.progError):
- pass
-
- ## This needs to be adjusted for sql server syntax for unicode fields
- ## - memo -> TEXT
- ## - varchar -> nvarchar
- self.assertEqual(
- self.cur.execute(
- """create table %s (
- userid varchar(25),
- username varchar(25),
- bitfield bit,
- intfield integer,
- floatfield float,
- datefield datetime,
- rawfield varbinary(100),
- longtextfield memo,
- longbinaryfield image
- )"""
- % self.tablename
- ),
- -1,
- )
-
- def tearDown(self):
- if self.cur is not None:
- try:
- self.cur.execute("""drop table %s""" % self.tablename)
- except (odbc.error, odbc.progError) as why:
- print("Failed to delete test table %s" % self.tablename, why)
-
- self.cur.close()
- self.cur = None
- if self.conn is not None:
- self.conn.close()
- self.conn = None
- if self.db_filename is not None:
- try:
- os.unlink(self.db_filename)
- except OSError:
- pass
-
- def test_insert_select(self, userid="Frank", username="Frank Millman"):
- self.assertEqual(
- self.cur.execute(
- "insert into %s (userid, username) \
- values (?,?)"
- % self.tablename,
- [userid, username],
- ),
- 1,
- )
- self.assertEqual(
- self.cur.execute(
- "select * from %s \
- where userid = ?"
- % self.tablename,
- [userid.lower()],
- ),
- 0,
- )
- self.assertEqual(
- self.cur.execute(
- "select * from %s \
- where username = ?"
- % self.tablename,
- [username.lower()],
- ),
- 0,
- )
-
- def test_insert_select_unicode(self, userid="Frank", username="Frank Millman"):
- self.assertEqual(
- self.cur.execute(
- "insert into %s (userid, username)\
- values (?,?)"
- % self.tablename,
- [userid, username],
- ),
- 1,
- )
- self.assertEqual(
- self.cur.execute(
- "select * from %s \
- where userid = ?"
- % self.tablename,
- [userid.lower()],
- ),
- 0,
- )
- self.assertEqual(
- self.cur.execute(
- "select * from %s \
- where username = ?"
- % self.tablename,
- [username.lower()],
- ),
- 0,
- )
-
- def test_insert_select_unicode_ext(self):
- userid = "t-\xe0\xf2"
- username = "test-\xe0\xf2 name"
- self.test_insert_select_unicode(userid, username)
-
- def _test_val(self, fieldName, value):
- for x in range(100):
- self.cur.execute("delete from %s where userid='Frank'" % self.tablename)
- self.assertEqual(
- self.cur.execute(
- "insert into %s (userid, %s) values (?,?)"
- % (self.tablename, fieldName),
- ["Frank", value],
- ),
- 1,
- )
- self.cur.execute(
- "select %s from %s where userid = ?" % (fieldName, self.tablename),
- ["Frank"],
- )
- rows = self.cur.fetchmany()
- self.assertEqual(1, len(rows))
- row = rows[0]
- self.assertEqual(row[0], value)
-
- def testBit(self):
- self._test_val("bitfield", 1)
- self._test_val("bitfield", 0)
-
- def testInt(self):
- self._test_val("intfield", 1)
- self._test_val("intfield", 0)
- try:
- big = sys.maxsize
- except AttributeError:
- big = sys.maxint
- self._test_val("intfield", big)
-
- def testFloat(self):
- self._test_val("floatfield", 1.01)
- self._test_val("floatfield", 0)
-
- def testVarchar(
- self,
- ):
- self._test_val("username", "foo")
-
- def testLongVarchar(self):
- """Test a long text field in excess of internal cursor data size (65536)"""
- self._test_val("longtextfield", "abc" * 70000)
-
- def testLongBinary(self):
- """Test a long raw field in excess of internal cursor data size (65536)"""
- self._test_val("longbinaryfield", str2memory("\0\1\2" * 70000))
-
- def testRaw(self):
- ## Test binary data
- self._test_val("rawfield", str2memory("\1\2\3\4\0\5\6\7\8"))
-
- def test_widechar(self):
- """Test a unicode character that would be mangled if bound as plain character.
- For example, previously the below was returned as ascii 'a'
- """
- self._test_val("username", "\u0101")
-
- def testDates(self):
- import datetime
-
- for v in ((1900, 12, 25, 23, 39, 59),):
- d = datetime.datetime(*v)
- self._test_val("datefield", d)
-
- def test_set_nonzero_length(self):
- self.assertEqual(
- self.cur.execute(
- "insert into %s (userid,username) " "values (?,?)" % self.tablename,
- ["Frank", "Frank Millman"],
- ),
- 1,
- )
- self.assertEqual(
- self.cur.execute("update %s set username = ?" % self.tablename, ["Frank"]),
- 1,
- )
- self.assertEqual(self.cur.execute("select * from %s" % self.tablename), 0)
- self.assertEqual(len(self.cur.fetchone()[1]), 5)
-
- def test_set_zero_length(self):
- self.assertEqual(
- self.cur.execute(
- "insert into %s (userid,username) " "values (?,?)" % self.tablename,
- [str2bytes("Frank"), ""],
- ),
- 1,
- )
- self.assertEqual(self.cur.execute("select * from %s" % self.tablename), 0)
- self.assertEqual(len(self.cur.fetchone()[1]), 0)
-
- def test_set_zero_length_unicode(self):
- self.assertEqual(
- self.cur.execute(
- "insert into %s (userid,username) " "values (?,?)" % self.tablename,
- ["Frank", ""],
- ),
- 1,
- )
- self.assertEqual(self.cur.execute("select * from %s" % self.tablename), 0)
- self.assertEqual(len(self.cur.fetchone()[1]), 0)
-
-
- if __name__ == "__main__":
- unittest.main()
|