You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
6.0 KiB

12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
  1. #!/usr/bin/env python
  2. import errno
  3. import hashlib
  4. import io
  5. import os
  6. import json
  7. import unittest
  8. import sys
  9. import socket
  10. import binascii
  11. # Allow direct execution
  12. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. import youtube_dl.FileDownloader
  14. import youtube_dl.InfoExtractors
  15. from youtube_dl.utils import *
  16. DEF_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tests.json')
  17. PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "parameters.json")
  18. RETRIES = 3
  19. # General configuration (from __init__, not very elegant...)
  20. jar = compat_cookiejar.CookieJar()
  21. cookie_processor = compat_urllib_request.HTTPCookieProcessor(jar)
  22. proxy_handler = compat_urllib_request.ProxyHandler()
  23. opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler())
  24. compat_urllib_request.install_opener(opener)
  25. socket.setdefaulttimeout(10)
  26. def _try_rm(filename):
  27. """ Remove a file if it exists """
  28. try:
  29. os.remove(filename)
  30. except OSError as ose:
  31. if ose.errno != errno.ENOENT:
  32. raise
  33. md5 = lambda s: hashlib.md5(s.encode('utf-8')).hexdigest()
  34. class FileDownloader(youtube_dl.FileDownloader):
  35. def __init__(self, *args, **kwargs):
  36. self.to_stderr = self.to_screen
  37. self.processed_info_dicts = []
  38. return youtube_dl.FileDownloader.__init__(self, *args, **kwargs)
  39. def report_warning(self, message):
  40. # Don't accept warnings during tests
  41. raise ExtractorError(message)
  42. def process_info(self, info_dict):
  43. self.processed_info_dicts.append(info_dict)
  44. return youtube_dl.FileDownloader.process_info(self, info_dict)
  45. def _file_md5(fn):
  46. with open(fn, 'rb') as f:
  47. return hashlib.md5(f.read()).hexdigest()
  48. with io.open(DEF_FILE, encoding='utf-8') as deff:
  49. defs = json.load(deff)
  50. with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
  51. parameters = json.load(pf)
  52. class TestDownload(unittest.TestCase):
  53. maxDiff = None
  54. def setUp(self):
  55. self.parameters = parameters
  56. self.defs = defs
  57. ### Dynamically generate tests
  58. def generator(test_case):
  59. def test_template(self):
  60. ie = youtube_dl.InfoExtractors.get_info_extractor(test_case['name'])
  61. if not ie._WORKING:
  62. print('Skipping: IE marked as not _WORKING')
  63. return
  64. if 'playlist' not in test_case and not test_case['file']:
  65. print('Skipping: No output file specified')
  66. return
  67. if 'skip' in test_case:
  68. print('Skipping: {0}'.format(test_case['skip']))
  69. return
  70. params = self.parameters.copy()
  71. params.update(test_case.get('params', {}))
  72. fd = FileDownloader(params)
  73. for ie in youtube_dl.InfoExtractors.gen_extractors():
  74. fd.add_info_extractor(ie)
  75. finished_hook_called = set()
  76. def _hook(status):
  77. if status['status'] == 'finished':
  78. finished_hook_called.add(status['filename'])
  79. fd.add_progress_hook(_hook)
  80. test_cases = test_case.get('playlist', [test_case])
  81. for tc in test_cases:
  82. _try_rm(tc['file'])
  83. _try_rm(tc['file'] + '.part')
  84. _try_rm(tc['file'] + '.info.json')
  85. try:
  86. for retry in range(1, RETRIES + 1):
  87. try:
  88. fd.download([test_case['url']])
  89. except (DownloadError, ExtractorError) as err:
  90. if retry == RETRIES: raise
  91. # Check if the exception is not a network related one
  92. if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError):
  93. raise
  94. print('Retrying: {0} failed tries\n\n##########\n\n'.format(retry))
  95. else:
  96. break
  97. for tc in test_cases:
  98. if not test_case.get('params', {}).get('skip_download', False):
  99. self.assertTrue(os.path.exists(tc['file']), msg='Missing file ' + tc['file'])
  100. self.assertTrue(tc['file'] in finished_hook_called)
  101. self.assertTrue(os.path.exists(tc['file'] + '.info.json'))
  102. if 'md5' in tc:
  103. md5_for_file = _file_md5(tc['file'])
  104. self.assertEqual(md5_for_file, tc['md5'])
  105. with io.open(tc['file'] + '.info.json', encoding='utf-8') as infof:
  106. info_dict = json.load(infof)
  107. for (info_field, value) in tc.get('info_dict', {}).items():
  108. if isinstance(value, compat_str) and value.startswith('md5:'):
  109. self.assertEqual(value, 'md5:' + md5(info_dict.get(info_field)))
  110. else:
  111. self.assertEqual(value, info_dict.get(info_field))
  112. # If checkable fields are missing from the test case, print the info_dict
  113. test_info_dict = dict((key, value if not isinstance(value, compat_str) or len(value) < 250 else 'md5:' + md5(value))
  114. for key, value in info_dict.items()
  115. if value and key in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location'))
  116. if not all(key in tc.get('info_dict', {}).keys() for key in test_info_dict.keys()):
  117. sys.stderr.write(u'\n"info_dict": ' + json.dumps(test_info_dict, ensure_ascii=False, indent=2) + u'\n')
  118. # Check for the presence of mandatory fields
  119. for key in ('id', 'url', 'title', 'ext'):
  120. self.assertTrue(key in info_dict.keys() and info_dict[key])
  121. finally:
  122. for tc in test_cases:
  123. _try_rm(tc['file'])
  124. _try_rm(tc['file'] + '.part')
  125. _try_rm(tc['file'] + '.info.json')
  126. return test_template
  127. ### And add them to TestDownload
  128. for test_case in defs:
  129. test_method = generator(test_case)
  130. test_method.__name__ = "test_{0}".format(test_case["name"])
  131. setattr(TestDownload, test_method.__name__, test_method)
  132. del test_method
  133. if __name__ == '__main__':
  134. unittest.main()