Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_tpfile.py 1.5KB

1 year ago
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. from io import BytesIO
  4. from twisted.internet import abstract, protocol
  5. from twisted.protocols import basic, loopback
  6. from twisted.trial import unittest
  7. class BufferingServer(protocol.Protocol):
  8. buffer = b""
  9. def dataReceived(self, data):
  10. self.buffer += data
  11. class FileSendingClient(protocol.Protocol):
  12. def __init__(self, f):
  13. self.f = f
  14. def connectionMade(self):
  15. s = basic.FileSender()
  16. d = s.beginFileTransfer(self.f, self.transport, lambda x: x)
  17. d.addCallback(lambda r: self.transport.loseConnection())
  18. class FileSenderTests(unittest.TestCase):
  19. def testSendingFile(self):
  20. testStr = b"xyz" * 100 + b"abc" * 100 + b"123" * 100
  21. s = BufferingServer()
  22. c = FileSendingClient(BytesIO(testStr))
  23. d = loopback.loopbackTCP(s, c)
  24. d.addCallback(lambda x: self.assertEqual(s.buffer, testStr))
  25. return d
  26. def testSendingEmptyFile(self):
  27. fileSender = basic.FileSender()
  28. consumer = abstract.FileDescriptor()
  29. consumer.connected = 1
  30. emptyFile = BytesIO(b"")
  31. d = fileSender.beginFileTransfer(emptyFile, consumer, lambda x: x)
  32. # The producer will be immediately exhausted, and so immediately
  33. # unregistered
  34. self.assertIsNone(consumer.producer)
  35. # Which means the Deferred from FileSender should have been called
  36. self.assertTrue(d.called, "producer unregistered with deferred being called")