You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
5.9 KiB

  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. from __future__ import unicode_literals
  4. # Allow direct execution
  5. import os
  6. import sys
  7. import unittest
  8. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from test.helper import http_server_port
  10. from youtube_dl import YoutubeDL
  11. from youtube_dl.compat import compat_http_server, compat_urllib_request
  12. import ssl
  13. import threading
  14. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  15. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  16. def log_message(self, format, *args):
  17. pass
  18. def do_GET(self):
  19. if self.path == '/video.html':
  20. self.send_response(200)
  21. self.send_header('Content-Type', 'text/html; charset=utf-8')
  22. self.end_headers()
  23. self.wfile.write(b'<html><video src="/vid.mp4" /></html>')
  24. elif self.path == '/vid.mp4':
  25. self.send_response(200)
  26. self.send_header('Content-Type', 'video/mp4')
  27. self.end_headers()
  28. self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]')
  29. elif self.path == '/302':
  30. if sys.version_info[0] == 3:
  31. # XXX: Python 3 http server does not allow non-ASCII header values
  32. self.send_response(404)
  33. self.end_headers()
  34. return
  35. new_url = 'http://127.0.0.1:%d/中文.html' % http_server_port(self.server)
  36. self.send_response(302)
  37. self.send_header(b'Location', new_url.encode('utf-8'))
  38. self.end_headers()
  39. elif self.path == '/%E4%B8%AD%E6%96%87.html':
  40. self.send_response(200)
  41. self.send_header('Content-Type', 'text/html; charset=utf-8')
  42. self.end_headers()
  43. self.wfile.write(b'<html><video src="/vid.mp4" /></html>')
  44. else:
  45. assert False
  46. class FakeLogger(object):
  47. def debug(self, msg):
  48. pass
  49. def warning(self, msg):
  50. pass
  51. def error(self, msg):
  52. pass
  53. class TestHTTP(unittest.TestCase):
  54. def setUp(self):
  55. self.httpd = compat_http_server.HTTPServer(
  56. ('127.0.0.1', 0), HTTPTestRequestHandler)
  57. self.port = http_server_port(self.httpd)
  58. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  59. self.server_thread.daemon = True
  60. self.server_thread.start()
  61. def test_unicode_path_redirection(self):
  62. # XXX: Python 3 http server does not allow non-ASCII header values
  63. if sys.version_info[0] == 3:
  64. return
  65. ydl = YoutubeDL({'logger': FakeLogger()})
  66. r = ydl.extract_info('http://127.0.0.1:%d/302' % self.port)
  67. self.assertEqual(r['entries'][0]['url'], 'http://127.0.0.1:%d/vid.mp4' % self.port)
  68. class TestHTTPS(unittest.TestCase):
  69. def setUp(self):
  70. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  71. self.httpd = compat_http_server.HTTPServer(
  72. ('127.0.0.1', 0), HTTPTestRequestHandler)
  73. self.httpd.socket = ssl.wrap_socket(
  74. self.httpd.socket, certfile=certfn, server_side=True)
  75. self.port = http_server_port(self.httpd)
  76. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  77. self.server_thread.daemon = True
  78. self.server_thread.start()
  79. def test_nocheckcertificate(self):
  80. if sys.version_info >= (2, 7, 9): # No certificate checking anyways
  81. ydl = YoutubeDL({'logger': FakeLogger()})
  82. self.assertRaises(
  83. Exception,
  84. ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port)
  85. ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True})
  86. r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port)
  87. self.assertEqual(r['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port)
  88. def _build_proxy_handler(name):
  89. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  90. proxy_name = name
  91. def log_message(self, format, *args):
  92. pass
  93. def do_GET(self):
  94. self.send_response(200)
  95. self.send_header('Content-Type', 'text/plain; charset=utf-8')
  96. self.end_headers()
  97. self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8'))
  98. return HTTPTestRequestHandler
  99. class TestProxy(unittest.TestCase):
  100. def setUp(self):
  101. self.proxy = compat_http_server.HTTPServer(
  102. ('127.0.0.1', 0), _build_proxy_handler('normal'))
  103. self.port = http_server_port(self.proxy)
  104. self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
  105. self.proxy_thread.daemon = True
  106. self.proxy_thread.start()
  107. self.geo_proxy = compat_http_server.HTTPServer(
  108. ('127.0.0.1', 0), _build_proxy_handler('geo'))
  109. self.geo_port = http_server_port(self.geo_proxy)
  110. self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
  111. self.geo_proxy_thread.daemon = True
  112. self.geo_proxy_thread.start()
  113. def test_proxy(self):
  114. geo_proxy = '127.0.0.1:{0}'.format(self.geo_port)
  115. ydl = YoutubeDL({
  116. 'proxy': '127.0.0.1:{0}'.format(self.port),
  117. 'geo_verification_proxy': geo_proxy,
  118. })
  119. url = 'http://foo.com/bar'
  120. response = ydl.urlopen(url).read().decode('utf-8')
  121. self.assertEqual(response, 'normal: {0}'.format(url))
  122. req = compat_urllib_request.Request(url)
  123. req.add_header('Ytdl-request-proxy', geo_proxy)
  124. response = ydl.urlopen(req).read().decode('utf-8')
  125. self.assertEqual(response, 'geo: {0}'.format(url))
  126. def test_proxy_with_idn(self):
  127. ydl = YoutubeDL({
  128. 'proxy': '127.0.0.1:{0}'.format(self.port),
  129. })
  130. url = 'http://中文.tw/'
  131. response = ydl.urlopen(url).read().decode('utf-8')
  132. # b'xn--fiq228c' is '中文'.encode('idna')
  133. self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
  134. if __name__ == '__main__':
  135. unittest.main()