Development of an internal social media platform with personalised dashboards for students
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mvccadapter.py 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. """Adapt IStorage objects to IMVCCStorage
  2. This is a largely internal implementation of ZODB, especially DB and
  3. Connection. It takes the MVCC implementation involving invalidations
  4. and start time and moves it into a storage adapter. This allows ZODB
  5. to treat Relstoage and other storages in pretty much the same way and
  6. also simplifies the implementation of the DB and Connection classes.
  7. """
  8. import zope.interface
  9. from . import interfaces, serialize, POSException
  10. from .utils import p64, u64, Lock
  11. class Base(object):
  12. _copy_methods = (
  13. 'getName', 'getSize', 'history', 'lastTransaction', 'sortKey',
  14. 'loadBlob', 'openCommittedBlobFile',
  15. 'isReadOnly', 'supportsUndo', 'undoLog', 'undoInfo',
  16. 'temporaryDirectory',
  17. )
  18. def __init__(self, storage):
  19. self._storage = storage
  20. if interfaces.IBlobStorage.providedBy(storage):
  21. zope.interface.alsoProvides(self, interfaces.IBlobStorage)
  22. def __getattr__(self, name):
  23. if name in self._copy_methods:
  24. m = getattr(self._storage, name)
  25. setattr(self, name, m)
  26. return m
  27. raise AttributeError(name)
  28. def __len__(self):
  29. return len(self._storage)
  30. class MVCCAdapter(Base):
  31. def __init__(self, storage):
  32. Base.__init__(self, storage)
  33. self._instances = set()
  34. self._lock = Lock()
  35. if hasattr(storage, 'registerDB'):
  36. storage.registerDB(self)
  37. def new_instance(self):
  38. instance = MVCCAdapterInstance(self)
  39. with self._lock:
  40. self._instances.add(instance)
  41. return instance
  42. def before_instance(self, before=None):
  43. return HistoricalStorageAdapter(self._storage, before)
  44. def undo_instance(self):
  45. return UndoAdapterInstance(self)
  46. def _release(self, instance):
  47. with self._lock:
  48. self._instances.remove(instance)
  49. closed = False
  50. def close(self):
  51. if not self.closed:
  52. self.closed = True
  53. self._storage.close()
  54. del self._instances
  55. del self._storage
  56. def invalidateCache(self):
  57. with self._lock:
  58. for instance in self._instances:
  59. instance._invalidateCache()
  60. def invalidate(self, transaction_id, oids):
  61. with self._lock:
  62. for instance in self._instances:
  63. instance._invalidate(oids)
  64. def _invalidate_finish(self, oids, committing_instance):
  65. with self._lock:
  66. for instance in self._instances:
  67. if instance is not committing_instance:
  68. instance._invalidate(oids)
  69. references = serialize.referencesf
  70. transform_record_data = untransform_record_data = lambda self, data: data
  71. def pack(self, pack_time, referencesf):
  72. return self._storage.pack(pack_time, referencesf)
  73. class MVCCAdapterInstance(Base):
  74. _copy_methods = Base._copy_methods + (
  75. 'loadSerial', 'new_oid', 'tpc_vote',
  76. 'checkCurrentSerialInTransaction', 'tpc_abort',
  77. )
  78. def __init__(self, base):
  79. self._base = base
  80. Base.__init__(self, base._storage)
  81. self._lock = Lock()
  82. self._invalidations = set()
  83. self._start = None # Transaction start time
  84. self._sync = getattr(self._storage, 'sync', lambda : None)
  85. def release(self):
  86. self._base._release(self)
  87. close = release
  88. def _invalidateCache(self):
  89. with self._lock:
  90. self._invalidations = None
  91. def _invalidate(self, oids):
  92. with self._lock:
  93. try:
  94. self._invalidations.update(oids)
  95. except AttributeError:
  96. if self._invalidations is not None:
  97. raise
  98. def sync(self, force=True):
  99. if force:
  100. self._sync()
  101. def poll_invalidations(self):
  102. self._start = p64(u64(self._storage.lastTransaction()) + 1)
  103. with self._lock:
  104. if self._invalidations is None:
  105. self._invalidations = set()
  106. return None
  107. else:
  108. result = list(self._invalidations)
  109. self._invalidations.clear()
  110. return result
  111. def load(self, oid):
  112. assert self._start is not None
  113. r = self._storage.loadBefore(oid, self._start)
  114. if r is None:
  115. raise POSException.ReadConflictError(repr(oid))
  116. return r[:2]
  117. def prefetch(self, oids):
  118. try:
  119. self._storage.prefetch(oids, self._start)
  120. except AttributeError:
  121. if not hasattr(self._storage, 'prefetch'):
  122. self.prefetch = lambda *a: None
  123. else:
  124. raise
  125. _modified = None # Used to keep track of oids modified within a
  126. # transaction, so we can invalidate them later.
  127. def tpc_begin(self, transaction):
  128. self._storage.tpc_begin(transaction)
  129. self._modified = set()
  130. def store(self, oid, serial, data, version, transaction):
  131. self._storage.store(oid, serial, data, version, transaction)
  132. self._modified.add(oid)
  133. def storeBlob(self, oid, serial, data, blobfilename, version, transaction):
  134. self._storage.storeBlob(
  135. oid, serial, data, blobfilename, '', transaction)
  136. self._modified.add(oid)
  137. def tpc_finish(self, transaction, func = lambda tid: None):
  138. modified = self._modified
  139. self._modified = None
  140. def invalidate_finish(tid):
  141. self._base._invalidate_finish(modified, self)
  142. func(tid)
  143. return self._storage.tpc_finish(transaction, invalidate_finish)
  144. def read_only_writer(self, *a, **kw):
  145. raise POSException.ReadOnlyError
  146. class HistoricalStorageAdapter(Base):
  147. """Adapt a storage to a historical storage
  148. """
  149. _copy_methods = Base._copy_methods + (
  150. 'loadSerial', 'tpc_begin', 'tpc_finish', 'tpc_abort', 'tpc_vote',
  151. 'checkCurrentSerialInTransaction',
  152. )
  153. def __init__(self, storage, before=None):
  154. Base.__init__(self, storage)
  155. self._before = before
  156. def isReadOnly(self):
  157. return True
  158. def supportsUndo(self):
  159. return False
  160. def release(self):
  161. try:
  162. release = self._storage.release
  163. except AttributeError:
  164. pass
  165. else:
  166. release()
  167. close = release
  168. def sync(self, force=True):
  169. pass
  170. def poll_invalidations(self):
  171. return []
  172. new_oid = pack = store = read_only_writer
  173. def load(self, oid, version=''):
  174. r = self._storage.loadBefore(oid, self._before)
  175. if r is None:
  176. raise POSException.POSKeyError(oid)
  177. return r[:2]
  178. class UndoAdapterInstance(Base):
  179. _copy_methods = Base._copy_methods + (
  180. 'tpc_abort',
  181. )
  182. def __init__(self, base):
  183. self._base = base
  184. Base.__init__(self, base._storage)
  185. def release(self):
  186. pass
  187. close = release
  188. def tpc_begin(self, transaction):
  189. self._storage.tpc_begin(transaction)
  190. self._undone = set()
  191. def undo(self, transaction_id, transaction):
  192. result = self._storage.undo(transaction_id, transaction)
  193. if result:
  194. self._undone.update(result[1])
  195. return result
  196. def tpc_vote(self, transaction):
  197. result = self._storage.tpc_vote(transaction)
  198. if result:
  199. self._undone.update(result)
  200. def tpc_finish(self, transaction, func = lambda tid: None):
  201. def invalidate_finish(tid):
  202. self._base._invalidate_finish(self._undone, None)
  203. func(tid)
  204. self._storage.tpc_finish(transaction, invalidate_finish)