Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_sip.py 25KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. # -*- test-case-name: twisted.test.test_sip -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Session Initialization Protocol tests.
  6. """
  7. from twisted.cred import checkers, portal
  8. from twisted.internet import defer, reactor
  9. from twisted.protocols import sip
  10. from twisted.trial import unittest
  11. try:
  12. from twisted.internet.asyncioreactor import AsyncioSelectorReactor
  13. except BaseException:
  14. AsyncioSelectorReactor = None # type: ignore[assignment,misc]
  15. from zope.interface import implementer
  16. # request, prefixed by random CRLFs
  17. request1 = (
  18. "\n\r\n\n\r"
  19. + """\
  20. INVITE sip:foo SIP/2.0
  21. From: mo
  22. To: joe
  23. Content-Length: 4
  24. abcd""".replace(
  25. "\n", "\r\n"
  26. )
  27. )
  28. # request, no content-length
  29. request2 = """INVITE sip:foo SIP/2.0
  30. From: mo
  31. To: joe
  32. 1234""".replace(
  33. "\n", "\r\n"
  34. )
  35. # request, with garbage after
  36. request3 = """INVITE sip:foo SIP/2.0
  37. From: mo
  38. To: joe
  39. Content-Length: 4
  40. 1234
  41. lalalal""".replace(
  42. "\n", "\r\n"
  43. )
  44. # three requests
  45. request4 = """INVITE sip:foo SIP/2.0
  46. From: mo
  47. To: joe
  48. Content-Length: 0
  49. INVITE sip:loop SIP/2.0
  50. From: foo
  51. To: bar
  52. Content-Length: 4
  53. abcdINVITE sip:loop SIP/2.0
  54. From: foo
  55. To: bar
  56. Content-Length: 4
  57. 1234""".replace(
  58. "\n", "\r\n"
  59. )
  60. # response, no content
  61. response1 = """SIP/2.0 200 OK
  62. From: foo
  63. To:bar
  64. Content-Length: 0
  65. """.replace(
  66. "\n", "\r\n"
  67. )
  68. # short header version
  69. request_short = """\
  70. INVITE sip:foo SIP/2.0
  71. f: mo
  72. t: joe
  73. l: 4
  74. abcd""".replace(
  75. "\n", "\r\n"
  76. )
  77. request_natted = """\
  78. INVITE sip:foo SIP/2.0
  79. Via: SIP/2.0/UDP 10.0.0.1:5060;rport
  80. """.replace(
  81. "\n", "\r\n"
  82. )
  83. # multiline headers (example from RFC 3621).
  84. response_multiline = """\
  85. SIP/2.0 200 OK
  86. Via: SIP/2.0/UDP server10.biloxi.com
  87. ;branch=z9hG4bKnashds8;received=192.0.2.3
  88. Via: SIP/2.0/UDP bigbox3.site3.atlanta.com
  89. ;branch=z9hG4bK77ef4c2312983.1;received=192.0.2.2
  90. Via: SIP/2.0/UDP pc33.atlanta.com
  91. ;branch=z9hG4bK776asdhds ;received=192.0.2.1
  92. To: Bob <sip:bob@biloxi.com>;tag=a6c85cf
  93. From: Alice <sip:alice@atlanta.com>;tag=1928301774
  94. Call-ID: a84b4c76e66710@pc33.atlanta.com
  95. CSeq: 314159 INVITE
  96. Contact: <sip:bob@192.0.2.4>
  97. Content-Type: application/sdp
  98. Content-Length: 0
  99. \n""".replace(
  100. "\n", "\r\n"
  101. )
  102. class TestRealm:
  103. def requestAvatar(self, avatarId, mind, *interfaces):
  104. return sip.IContact, None, lambda: None
  105. class MessageParsingTests(unittest.TestCase):
  106. def setUp(self):
  107. self.l = []
  108. self.parser = sip.MessagesParser(self.l.append)
  109. def feedMessage(self, message):
  110. self.parser.dataReceived(message)
  111. self.parser.dataDone()
  112. def validateMessage(self, m, method, uri, headers, body):
  113. """
  114. Validate Requests.
  115. """
  116. self.assertEqual(m.method, method)
  117. self.assertEqual(m.uri.toString(), uri)
  118. self.assertEqual(m.headers, headers)
  119. self.assertEqual(m.body, body)
  120. self.assertEqual(m.finished, 1)
  121. def testSimple(self):
  122. l = self.l
  123. self.feedMessage(request1)
  124. self.assertEqual(len(l), 1)
  125. self.validateMessage(
  126. l[0],
  127. "INVITE",
  128. "sip:foo",
  129. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  130. "abcd",
  131. )
  132. def testTwoMessages(self):
  133. l = self.l
  134. self.feedMessage(request1)
  135. self.feedMessage(request2)
  136. self.assertEqual(len(l), 2)
  137. self.validateMessage(
  138. l[0],
  139. "INVITE",
  140. "sip:foo",
  141. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  142. "abcd",
  143. )
  144. self.validateMessage(
  145. l[1], "INVITE", "sip:foo", {"from": ["mo"], "to": ["joe"]}, "1234"
  146. )
  147. def testGarbage(self):
  148. l = self.l
  149. self.feedMessage(request3)
  150. self.assertEqual(len(l), 1)
  151. self.validateMessage(
  152. l[0],
  153. "INVITE",
  154. "sip:foo",
  155. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  156. "1234",
  157. )
  158. def testThreeInOne(self):
  159. l = self.l
  160. self.feedMessage(request4)
  161. self.assertEqual(len(l), 3)
  162. self.validateMessage(
  163. l[0],
  164. "INVITE",
  165. "sip:foo",
  166. {"from": ["mo"], "to": ["joe"], "content-length": ["0"]},
  167. "",
  168. )
  169. self.validateMessage(
  170. l[1],
  171. "INVITE",
  172. "sip:loop",
  173. {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
  174. "abcd",
  175. )
  176. self.validateMessage(
  177. l[2],
  178. "INVITE",
  179. "sip:loop",
  180. {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
  181. "1234",
  182. )
  183. def testShort(self):
  184. l = self.l
  185. self.feedMessage(request_short)
  186. self.assertEqual(len(l), 1)
  187. self.validateMessage(
  188. l[0],
  189. "INVITE",
  190. "sip:foo",
  191. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  192. "abcd",
  193. )
  194. def testSimpleResponse(self):
  195. l = self.l
  196. self.feedMessage(response1)
  197. self.assertEqual(len(l), 1)
  198. m = l[0]
  199. self.assertEqual(m.code, 200)
  200. self.assertEqual(m.phrase, "OK")
  201. self.assertEqual(
  202. m.headers, {"from": ["foo"], "to": ["bar"], "content-length": ["0"]}
  203. )
  204. self.assertEqual(m.body, "")
  205. self.assertEqual(m.finished, 1)
  206. def test_multiLine(self):
  207. """
  208. A header may be split across multiple lines. Subsequent lines begin
  209. with C{" "} or C{"\\t"}.
  210. """
  211. l = self.l
  212. self.feedMessage(response_multiline)
  213. self.assertEqual(len(l), 1)
  214. m = l[0]
  215. self.assertEqual(
  216. m.headers["via"][0],
  217. "SIP/2.0/UDP server10.biloxi.com;"
  218. "branch=z9hG4bKnashds8;received=192.0.2.3",
  219. )
  220. self.assertEqual(
  221. m.headers["via"][1],
  222. "SIP/2.0/UDP bigbox3.site3.atlanta.com;"
  223. "branch=z9hG4bK77ef4c2312983.1;received=192.0.2.2",
  224. )
  225. self.assertEqual(
  226. m.headers["via"][2],
  227. "SIP/2.0/UDP pc33.atlanta.com;"
  228. "branch=z9hG4bK776asdhds ;received=192.0.2.1",
  229. )
  230. class MessageParsingFeedDataCharByCharTests(MessageParsingTests):
  231. """
  232. Same as base class, but feed data char by char.
  233. """
  234. def feedMessage(self, message):
  235. for c in message:
  236. self.parser.dataReceived(c)
  237. self.parser.dataDone()
  238. class MakeMessageTests(unittest.TestCase):
  239. def testRequest(self):
  240. r = sip.Request("INVITE", "sip:foo")
  241. r.addHeader("foo", "bar")
  242. self.assertEqual(r.toString(), "INVITE sip:foo SIP/2.0\r\nFoo: bar\r\n\r\n")
  243. def testResponse(self):
  244. r = sip.Response(200, "OK")
  245. r.addHeader("foo", "bar")
  246. r.addHeader("Content-Length", "4")
  247. r.bodyDataReceived("1234")
  248. self.assertEqual(
  249. r.toString(), "SIP/2.0 200 OK\r\nFoo: bar\r\nContent-Length: 4\r\n\r\n1234"
  250. )
  251. def testStatusCode(self):
  252. r = sip.Response(200)
  253. self.assertEqual(r.toString(), "SIP/2.0 200 OK\r\n\r\n")
  254. class ViaTests(unittest.TestCase):
  255. def checkRoundtrip(self, v):
  256. s = v.toString()
  257. self.assertEqual(s, sip.parseViaHeader(s).toString())
  258. def testExtraWhitespace(self):
  259. v1 = sip.parseViaHeader("SIP/2.0/UDP 192.168.1.1:5060")
  260. v2 = sip.parseViaHeader("SIP/2.0/UDP 192.168.1.1:5060")
  261. self.assertEqual(v1.transport, v2.transport)
  262. self.assertEqual(v1.host, v2.host)
  263. self.assertEqual(v1.port, v2.port)
  264. def test_complex(self):
  265. """
  266. Test parsing a Via header with one of everything.
  267. """
  268. s = (
  269. "SIP/2.0/UDP first.example.com:4000;ttl=16;maddr=224.2.0.1"
  270. " ;branch=a7c6a8dlze (Example)"
  271. )
  272. v = sip.parseViaHeader(s)
  273. self.assertEqual(v.transport, "UDP")
  274. self.assertEqual(v.host, "first.example.com")
  275. self.assertEqual(v.port, 4000)
  276. self.assertIsNone(v.rport)
  277. self.assertIsNone(v.rportValue)
  278. self.assertFalse(v.rportRequested)
  279. self.assertEqual(v.ttl, 16)
  280. self.assertEqual(v.maddr, "224.2.0.1")
  281. self.assertEqual(v.branch, "a7c6a8dlze")
  282. self.assertEqual(v.hidden, 0)
  283. self.assertEqual(
  284. v.toString(),
  285. "SIP/2.0/UDP first.example.com:4000"
  286. ";ttl=16;branch=a7c6a8dlze;maddr=224.2.0.1",
  287. )
  288. self.checkRoundtrip(v)
  289. def test_simple(self):
  290. """
  291. Test parsing a simple Via header.
  292. """
  293. s = "SIP/2.0/UDP example.com;hidden"
  294. v = sip.parseViaHeader(s)
  295. self.assertEqual(v.transport, "UDP")
  296. self.assertEqual(v.host, "example.com")
  297. self.assertEqual(v.port, 5060)
  298. self.assertIsNone(v.rport)
  299. self.assertIsNone(v.rportValue)
  300. self.assertFalse(v.rportRequested)
  301. self.assertIsNone(v.ttl)
  302. self.assertIsNone(v.maddr)
  303. self.assertIsNone(v.branch)
  304. self.assertTrue(v.hidden)
  305. self.assertEqual(v.toString(), "SIP/2.0/UDP example.com:5060;hidden")
  306. self.checkRoundtrip(v)
  307. def testSimpler(self):
  308. v = sip.Via("example.com")
  309. self.checkRoundtrip(v)
  310. def test_deprecatedRPort(self):
  311. """
  312. Setting rport to True is deprecated, but still produces a Via header
  313. with the expected properties.
  314. """
  315. v = sip.Via("foo.bar", rport=True)
  316. warnings = self.flushWarnings(offendingFunctions=[self.test_deprecatedRPort])
  317. self.assertEqual(len(warnings), 1)
  318. self.assertEqual(
  319. warnings[0]["message"], "rport=True is deprecated since Twisted 9.0."
  320. )
  321. self.assertEqual(warnings[0]["category"], DeprecationWarning)
  322. self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
  323. self.assertTrue(v.rport)
  324. self.assertTrue(v.rportRequested)
  325. self.assertIsNone(v.rportValue)
  326. def test_rport(self):
  327. """
  328. An rport setting of None should insert the parameter with no value.
  329. """
  330. v = sip.Via("foo.bar", rport=None)
  331. self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
  332. self.assertTrue(v.rportRequested)
  333. self.assertIsNone(v.rportValue)
  334. def test_rportValue(self):
  335. """
  336. An rport numeric setting should insert the parameter with the number
  337. value given.
  338. """
  339. v = sip.Via("foo.bar", rport=1)
  340. self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport=1")
  341. self.assertFalse(v.rportRequested)
  342. self.assertEqual(v.rportValue, 1)
  343. self.assertEqual(v.rport, 1)
  344. def testNAT(self):
  345. s = "SIP/2.0/UDP 10.0.0.1:5060;received=22.13.1.5;rport=12345"
  346. v = sip.parseViaHeader(s)
  347. self.assertEqual(v.transport, "UDP")
  348. self.assertEqual(v.host, "10.0.0.1")
  349. self.assertEqual(v.port, 5060)
  350. self.assertEqual(v.received, "22.13.1.5")
  351. self.assertEqual(v.rport, 12345)
  352. self.assertNotEqual(v.toString().find("rport=12345"), -1)
  353. def test_unknownParams(self):
  354. """
  355. Parsing and serializing Via headers with unknown parameters should work.
  356. """
  357. s = "SIP/2.0/UDP example.com:5060;branch=a12345b;bogus;pie=delicious"
  358. v = sip.parseViaHeader(s)
  359. self.assertEqual(v.toString(), s)
  360. class URLTests(unittest.TestCase):
  361. def testRoundtrip(self):
  362. for url in [
  363. "sip:j.doe@big.com",
  364. "sip:j.doe:secret@big.com;transport=tcp",
  365. "sip:j.doe@big.com?subject=project",
  366. "sip:example.com",
  367. ]:
  368. self.assertEqual(sip.parseURL(url).toString(), url)
  369. def testComplex(self):
  370. s = (
  371. "sip:user:pass@hosta:123;transport=udp;user=phone;method=foo;"
  372. "ttl=12;maddr=1.2.3.4;blah;goo=bar?a=b&c=d"
  373. )
  374. url = sip.parseURL(s)
  375. for k, v in [
  376. ("username", "user"),
  377. ("password", "pass"),
  378. ("host", "hosta"),
  379. ("port", 123),
  380. ("transport", "udp"),
  381. ("usertype", "phone"),
  382. ("method", "foo"),
  383. ("ttl", 12),
  384. ("maddr", "1.2.3.4"),
  385. ("other", ["blah", "goo=bar"]),
  386. ("headers", {"a": "b", "c": "d"}),
  387. ]:
  388. self.assertEqual(getattr(url, k), v)
  389. class ParseTests(unittest.TestCase):
  390. def testParseAddress(self):
  391. for address, name, urls, params in [
  392. (
  393. '"A. G. Bell" <sip:foo@example.com>',
  394. "A. G. Bell",
  395. "sip:foo@example.com",
  396. {},
  397. ),
  398. ("Anon <sip:foo@example.com>", "Anon", "sip:foo@example.com", {}),
  399. ("sip:foo@example.com", "", "sip:foo@example.com", {}),
  400. ("<sip:foo@example.com>", "", "sip:foo@example.com", {}),
  401. (
  402. "foo <sip:foo@example.com>;tag=bar;foo=baz",
  403. "foo",
  404. "sip:foo@example.com",
  405. {"tag": "bar", "foo": "baz"},
  406. ),
  407. ]:
  408. gname, gurl, gparams = sip.parseAddress(address)
  409. self.assertEqual(name, gname)
  410. self.assertEqual(gurl.toString(), urls)
  411. self.assertEqual(gparams, params)
  412. @implementer(sip.ILocator)
  413. class DummyLocator:
  414. def getAddress(self, logicalURL):
  415. return defer.succeed(sip.URL("server.com", port=5060))
  416. @implementer(sip.ILocator)
  417. class FailingLocator:
  418. def getAddress(self, logicalURL):
  419. return defer.fail(LookupError())
  420. class ProxyTests(unittest.TestCase):
  421. def setUp(self):
  422. self.proxy = sip.Proxy("127.0.0.1")
  423. self.proxy.locator = DummyLocator()
  424. self.sent = []
  425. self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
  426. def testRequestForward(self):
  427. r = sip.Request("INVITE", "sip:foo")
  428. r.addHeader("via", sip.Via("1.2.3.4").toString())
  429. r.addHeader("via", sip.Via("1.2.3.5").toString())
  430. r.addHeader("foo", "bar")
  431. r.addHeader("to", "<sip:joe@server.com>")
  432. r.addHeader("contact", "<sip:joe@1.2.3.5>")
  433. self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
  434. self.assertEqual(len(self.sent), 1)
  435. dest, m = self.sent[0]
  436. self.assertEqual(dest.port, 5060)
  437. self.assertEqual(dest.host, "server.com")
  438. self.assertEqual(m.uri.toString(), "sip:foo")
  439. self.assertEqual(m.method, "INVITE")
  440. self.assertEqual(
  441. m.headers["via"],
  442. [
  443. "SIP/2.0/UDP 127.0.0.1:5060",
  444. "SIP/2.0/UDP 1.2.3.4:5060",
  445. "SIP/2.0/UDP 1.2.3.5:5060",
  446. ],
  447. )
  448. def testReceivedRequestForward(self):
  449. r = sip.Request("INVITE", "sip:foo")
  450. r.addHeader("via", sip.Via("1.2.3.4").toString())
  451. r.addHeader("foo", "bar")
  452. r.addHeader("to", "<sip:joe@server.com>")
  453. r.addHeader("contact", "<sip:joe@1.2.3.4>")
  454. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  455. dest, m = self.sent[0]
  456. self.assertEqual(
  457. m.headers["via"],
  458. ["SIP/2.0/UDP 127.0.0.1:5060", "SIP/2.0/UDP 1.2.3.4:5060;received=1.1.1.1"],
  459. )
  460. def testResponseWrongVia(self):
  461. # first via must match proxy's address
  462. r = sip.Response(200)
  463. r.addHeader("via", sip.Via("foo.com").toString())
  464. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  465. self.assertEqual(len(self.sent), 0)
  466. def testResponseForward(self):
  467. r = sip.Response(200)
  468. r.addHeader("via", sip.Via("127.0.0.1").toString())
  469. r.addHeader("via", sip.Via("client.com", port=1234).toString())
  470. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  471. self.assertEqual(len(self.sent), 1)
  472. dest, m = self.sent[0]
  473. self.assertEqual((dest.host, dest.port), ("client.com", 1234))
  474. self.assertEqual(m.code, 200)
  475. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:1234"])
  476. def testReceivedResponseForward(self):
  477. r = sip.Response(200)
  478. r.addHeader("via", sip.Via("127.0.0.1").toString())
  479. r.addHeader("via", sip.Via("10.0.0.1", received="client.com").toString())
  480. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  481. self.assertEqual(len(self.sent), 1)
  482. dest, m = self.sent[0]
  483. self.assertEqual((dest.host, dest.port), ("client.com", 5060))
  484. def testResponseToUs(self):
  485. r = sip.Response(200)
  486. r.addHeader("via", sip.Via("127.0.0.1").toString())
  487. l = []
  488. self.proxy.gotResponse = lambda *a: l.append(a)
  489. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  490. self.assertEqual(len(l), 1)
  491. m, addr = l[0]
  492. self.assertEqual(len(m.headers.get("via", [])), 0)
  493. self.assertEqual(m.code, 200)
  494. def testLoop(self):
  495. r = sip.Request("INVITE", "sip:foo")
  496. r.addHeader("via", sip.Via("1.2.3.4").toString())
  497. r.addHeader("via", sip.Via("127.0.0.1").toString())
  498. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  499. self.assertEqual(self.sent, [])
  500. def testCantForwardRequest(self):
  501. r = sip.Request("INVITE", "sip:foo")
  502. r.addHeader("via", sip.Via("1.2.3.4").toString())
  503. r.addHeader("to", "<sip:joe@server.com>")
  504. self.proxy.locator = FailingLocator()
  505. self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
  506. self.assertEqual(len(self.sent), 1)
  507. dest, m = self.sent[0]
  508. self.assertEqual((dest.host, dest.port), ("1.2.3.4", 5060))
  509. self.assertEqual(m.code, 404)
  510. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP 1.2.3.4:5060"])
  511. class RegistrationTests(unittest.TestCase):
  512. def setUp(self):
  513. self.proxy = sip.RegisterProxy(host="127.0.0.1")
  514. self.registry = sip.InMemoryRegistry("bell.example.com")
  515. self.proxy.registry = self.proxy.locator = self.registry
  516. self.sent = []
  517. self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
  518. def tearDown(self):
  519. for d, uri in self.registry.users.values():
  520. d.cancel()
  521. del self.proxy
  522. def register(self):
  523. r = sip.Request("REGISTER", "sip:bell.example.com")
  524. r.addHeader("to", "sip:joe@bell.example.com")
  525. r.addHeader("contact", "sip:joe@client.com:1234")
  526. r.addHeader("via", sip.Via("client.com").toString())
  527. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  528. def unregister(self):
  529. r = sip.Request("REGISTER", "sip:bell.example.com")
  530. r.addHeader("to", "sip:joe@bell.example.com")
  531. r.addHeader("contact", "*")
  532. r.addHeader("via", sip.Via("client.com").toString())
  533. r.addHeader("expires", "0")
  534. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  535. def testRegister(self):
  536. self.register()
  537. dest, m = self.sent[0]
  538. self.assertEqual((dest.host, dest.port), ("client.com", 5060))
  539. self.assertEqual(m.code, 200)
  540. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
  541. self.assertEqual(m.headers["to"], ["sip:joe@bell.example.com"])
  542. self.assertEqual(m.headers["contact"], ["sip:joe@client.com:5060"])
  543. #
  544. # XX: See http://tm.tl/8886
  545. #
  546. if type(reactor) != AsyncioSelectorReactor:
  547. self.assertTrue(int(m.headers["expires"][0]) in (3600, 3601, 3599, 3598))
  548. self.assertEqual(len(self.registry.users), 1)
  549. dc, uri = self.registry.users["joe"]
  550. self.assertEqual(uri.toString(), "sip:joe@client.com:5060")
  551. d = self.proxy.locator.getAddress(
  552. sip.URL(username="joe", host="bell.example.com")
  553. )
  554. d.addCallback(lambda desturl: (desturl.host, desturl.port))
  555. d.addCallback(self.assertEqual, ("client.com", 5060))
  556. return d
  557. def testUnregister(self):
  558. self.register()
  559. self.unregister()
  560. dest, m = self.sent[1]
  561. self.assertEqual((dest.host, dest.port), ("client.com", 5060))
  562. self.assertEqual(m.code, 200)
  563. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
  564. self.assertEqual(m.headers["to"], ["sip:joe@bell.example.com"])
  565. self.assertEqual(m.headers["contact"], ["sip:joe@client.com:5060"])
  566. self.assertEqual(m.headers["expires"], ["0"])
  567. self.assertEqual(self.registry.users, {})
  568. def addPortal(self):
  569. r = TestRealm()
  570. p = portal.Portal(r)
  571. c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
  572. c.addUser("userXname@127.0.0.1", "passXword")
  573. p.registerChecker(c)
  574. self.proxy.portal = p
  575. def testFailedAuthentication(self):
  576. self.addPortal()
  577. self.register()
  578. self.assertEqual(len(self.registry.users), 0)
  579. self.assertEqual(len(self.sent), 1)
  580. dest, m = self.sent[0]
  581. self.assertEqual(m.code, 401)
  582. def testWrongDomainRegister(self):
  583. r = sip.Request("REGISTER", "sip:wrong.com")
  584. r.addHeader("to", "sip:joe@bell.example.com")
  585. r.addHeader("contact", "sip:joe@client.com:1234")
  586. r.addHeader("via", sip.Via("client.com").toString())
  587. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  588. self.assertEqual(len(self.sent), 0)
  589. def testWrongToDomainRegister(self):
  590. r = sip.Request("REGISTER", "sip:bell.example.com")
  591. r.addHeader("to", "sip:joe@foo.com")
  592. r.addHeader("contact", "sip:joe@client.com:1234")
  593. r.addHeader("via", sip.Via("client.com").toString())
  594. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  595. self.assertEqual(len(self.sent), 0)
  596. def testWrongDomainLookup(self):
  597. self.register()
  598. url = sip.URL(username="joe", host="foo.com")
  599. d = self.proxy.locator.getAddress(url)
  600. self.assertFailure(d, LookupError)
  601. return d
  602. def testNoContactLookup(self):
  603. self.register()
  604. url = sip.URL(username="jane", host="bell.example.com")
  605. d = self.proxy.locator.getAddress(url)
  606. self.assertFailure(d, LookupError)
  607. return d
  608. class Client(sip.Base):
  609. def __init__(self):
  610. sip.Base.__init__(self)
  611. self.received = []
  612. self.deferred = defer.Deferred()
  613. def handle_response(self, response, addr):
  614. self.received.append(response)
  615. self.deferred.callback(self.received)
  616. class LiveTests(unittest.TestCase):
  617. def setUp(self):
  618. self.proxy = sip.RegisterProxy(host="127.0.0.1")
  619. self.registry = sip.InMemoryRegistry("bell.example.com")
  620. self.proxy.registry = self.proxy.locator = self.registry
  621. self.serverPort = reactor.listenUDP(0, self.proxy, interface="127.0.0.1")
  622. self.client = Client()
  623. self.clientPort = reactor.listenUDP(0, self.client, interface="127.0.0.1")
  624. self.serverAddress = (
  625. self.serverPort.getHost().host,
  626. self.serverPort.getHost().port,
  627. )
  628. def tearDown(self):
  629. for d, uri in self.registry.users.values():
  630. d.cancel()
  631. d1 = defer.maybeDeferred(self.clientPort.stopListening)
  632. d2 = defer.maybeDeferred(self.serverPort.stopListening)
  633. return defer.gatherResults([d1, d2])
  634. def testRegister(self):
  635. p = self.clientPort.getHost().port
  636. r = sip.Request("REGISTER", "sip:bell.example.com")
  637. r.addHeader("to", "sip:joe@bell.example.com")
  638. r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
  639. r.addHeader("via", sip.Via("127.0.0.1", port=p).toString())
  640. self.client.sendMessage(
  641. sip.URL(host="127.0.0.1", port=self.serverAddress[1]), r
  642. )
  643. d = self.client.deferred
  644. def check(received):
  645. self.assertEqual(len(received), 1)
  646. r = received[0]
  647. self.assertEqual(r.code, 200)
  648. d.addCallback(check)
  649. return d
  650. def test_amoralRPort(self):
  651. """
  652. rport is allowed without a value, apparently because server
  653. implementors might be too stupid to check the received port
  654. against 5060 and see if they're equal, and because client
  655. implementors might be too stupid to bind to port 5060, or set a
  656. value on the rport parameter they send if they bind to another
  657. port.
  658. """
  659. p = self.clientPort.getHost().port
  660. r = sip.Request("REGISTER", "sip:bell.example.com")
  661. r.addHeader("to", "sip:joe@bell.example.com")
  662. r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
  663. r.addHeader("via", sip.Via("127.0.0.1", port=p, rport=True).toString())
  664. warnings = self.flushWarnings(offendingFunctions=[self.test_amoralRPort])
  665. self.assertEqual(len(warnings), 1)
  666. self.assertEqual(
  667. warnings[0]["message"], "rport=True is deprecated since Twisted 9.0."
  668. )
  669. self.assertEqual(warnings[0]["category"], DeprecationWarning)
  670. self.client.sendMessage(
  671. sip.URL(host="127.0.0.1", port=self.serverAddress[1]), r
  672. )
  673. d = self.client.deferred
  674. def check(received):
  675. self.assertEqual(len(received), 1)
  676. r = received[0]
  677. self.assertEqual(r.code, 200)
  678. d.addCallback(check)
  679. return d