No known key found for this signature in database
GPG Key ID: 2C393E0F18A9236D
4 changed files with 213 additions and 22 deletions
Unified View
Diff Options
-
123test/test_downloader_http.py
-
6youtube_dl/__init__.py
-
101youtube_dl/downloader/http.py
-
5youtube_dl/options.py
@ -0,0 +1,123 @@ |
|||||
|
#!/usr/bin/env python |
||||
|
# coding: utf-8 |
||||
|
from __future__ import unicode_literals |
||||
|
|
||||
|
# Allow direct execution |
||||
|
import os |
||||
|
import re |
||||
|
import sys |
||||
|
import unittest |
||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
||||
|
|
||||
|
from youtube_dl import YoutubeDL |
||||
|
from youtube_dl.compat import compat_http_server |
||||
|
from youtube_dl.downloader.http import HttpFD |
||||
|
from youtube_dl.utils import encodeFilename |
||||
|
import ssl |
||||
|
import threading |
||||
|
|
||||
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) |
||||
|
|
||||
|
|
||||
|
def http_server_port(httpd): |
||||
|
if os.name == 'java' and isinstance(httpd.socket, ssl.SSLSocket): |
||||
|
# In Jython SSLSocket is not a subclass of socket.socket |
||||
|
sock = httpd.socket.sock |
||||
|
else: |
||||
|
sock = httpd.socket |
||||
|
return sock.getsockname()[1] |
||||
|
|
||||
|
|
||||
|
TEST_SIZE = 10 * 1024 |
||||
|
|
||||
|
|
||||
|
class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): |
||||
|
def log_message(self, format, *args): |
||||
|
pass |
||||
|
|
||||
|
def send_content_range(self, total=None): |
||||
|
range_header = self.headers.get('Range') |
||||
|
start = end = None |
||||
|
if range_header: |
||||
|
mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header) |
||||
|
if mobj: |
||||
|
start = int(mobj.group(1)) |
||||
|
end = int(mobj.group(2)) |
||||
|
valid_range = start is not None and end is not None |
||||
|
if valid_range: |
||||
|
content_range = 'bytes %d-%d' % (start, end) |
||||
|
if total: |
||||
|
content_range += '/%d' % total |
||||
|
self.send_header('Content-Range', content_range) |
||||
|
return (end - start + 1) if valid_range else total |
||||
|
|
||||
|
def serve(self, range=True, content_length=True): |
||||
|
self.send_response(200) |
||||
|
self.send_header('Content-Type', 'video/mp4') |
||||
|
size = TEST_SIZE |
||||
|
if range: |
||||
|
size = self.send_content_range(TEST_SIZE) |
||||
|
if content_length: |
||||
|
self.send_header('Content-Length', size) |
||||
|
self.end_headers() |
||||
|
self.wfile.write(b'#' * size) |
||||
|
|
||||
|
def do_GET(self): |
||||
|
if self.path == '/regular': |
||||
|
self.serve() |
||||
|
elif self.path == '/no-content-length': |
||||
|
self.serve(content_length=False) |
||||
|
elif self.path == '/no-range': |
||||
|
self.serve(range=False) |
||||
|
elif self.path == '/no-range-no-content-length': |
||||
|
self.serve(range=False, content_length=False) |
||||
|
else: |
||||
|
assert False |
||||
|
|
||||
|
|
||||
|
class FakeLogger(object): |
||||
|
def debug(self, msg): |
||||
|
pass |
||||
|
|
||||
|
def warning(self, msg): |
||||
|
pass |
||||
|
|
||||
|
def error(self, msg): |
||||
|
pass |
||||
|
|
||||
|
|
||||
|
class TestHttpFD(unittest.TestCase): |
||||
|
def setUp(self): |
||||
|
self.httpd = compat_http_server.HTTPServer( |
||||
|
('127.0.0.1', 0), HTTPTestRequestHandler) |
||||
|
self.port = http_server_port(self.httpd) |
||||
|
self.server_thread = threading.Thread(target=self.httpd.serve_forever) |
||||
|
self.server_thread.daemon = True |
||||
|
self.server_thread.start() |
||||
|
|
||||
|
def download(self, params, ep): |
||||
|
params['logger'] = FakeLogger() |
||||
|
ydl = YoutubeDL(params) |
||||
|
downloader = HttpFD(ydl, params) |
||||
|
filename = 'testfile.mp4' |
||||
|
self.assertTrue(downloader.real_download(filename, { |
||||
|
'url': 'http://127.0.0.1:%d/%s' % (self.port, ep), |
||||
|
})) |
||||
|
self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE) |
||||
|
os.remove(encodeFilename(filename)) |
||||
|
|
||||
|
def download_all(self, params): |
||||
|
for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'): |
||||
|
self.download(params, ep) |
||||
|
|
||||
|
def test_regular(self): |
||||
|
self.download_all({}) |
||||
|
|
||||
|
def test_chunked(self): |
||||
|
self.download_all({ |
||||
|
'http_chunk_size': 1000, |
||||
|
}) |
||||
|
|
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
unittest.main() |
Write
Preview
Loading…
Cancel
Save