diff --git a/tests/test_ditem.py b/tests/test_ditem.py new file mode 100644 index 0000000..5636add --- /dev/null +++ b/tests/test_ditem.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +"""Contains test cases for the DownloadItem object.""" + +from __future__ import unicode_literals + +import sys +import os.path +import unittest + +PATH = os.path.realpath(os.path.abspath(__file__)) +sys.path.insert(0, os.path.dirname(os.path.dirname(PATH))) + +try: + from youtube_dl_gui.downloadmanager import DownloadItem +except ImportError as error: + print error + sys.exit(1) + + +class TestItemInit(unittest.TestCase): + + """Test case for DownloadItem init.""" + + def test_init(self): + url = "url" + options = ["-f", "flv"] + + ditem = DownloadItem(url, options) + + self.assertEqual(ditem.stage, "Queued") + self.assertEqual(ditem.url, url) + self.assertEqual(ditem.options, options) + self.assertEqual(ditem.object_id, hash(url + unicode(options))) + + self.assertEqual(ditem.path, "") + self.assertEqual(ditem.filenames, []) + self.assertEqual(ditem.extensions, []) + + self.assertEqual( + ditem.progress_stats, + {"filename": url, + "extension": "-", + "filesize": "-", + "percent": "0%", + "speed": "-", + "eta": "-", + "status": "Queued"} + ) + + +class TestGetFiles(unittest.TestCase): + + """Test case for DownloadItem get_files method.""" + + def setUp(self): + self.ditem = DownloadItem("url", ["-f", "flv"]) + + def test_get_files(self): + path = os.path.join("/home", "user", "downloads") + + self.ditem.path = path + self.ditem.filenames = ["file1", "file2"] + self.ditem.extensions = [".mp4", ".m4a"] + + self.assertEqual(self.ditem.get_files(), [os.path.join(path, "file1", ".mp4"), os.path.join(path, "file2", ".m4a")]) + + def test_get_files_no_data(self): + self.assertEqual(self.ditem.get_files(), []) + + +class TestItemComparison(unittest.TestCase): + + """Test case for DownloadItem __eq__ method.""" + + def test_equal_true(self): + ditem1 = DownloadItem("url", ["-f", "flv"]) + ditem2 = DownloadItem("url", ["-f", "flv"]) + + self.assertTrue(ditem1 == ditem2) + + def test_equal_false(self): + ditem1 = DownloadItem("url", ["-f", "flv"]) + ditem2 = DownloadItem("url2", ["-f", "flv"]) + + self.assertFalse(ditem1 == ditem2) + + ditem1 = DownloadItem("url", ["-f", "flv"]) + ditem2 = DownloadItem("url", ["-f", "mp4"]) + + self.assertFalse(ditem1 == ditem2) + + +class TestSetItemStage(unittest.TestCase): + + """Test case for DownloadItem stage setter.""" + + def setUp(self): + self.ditem = DownloadItem("url", ["-f", "flv"]) + + def test_set_stage_valid(self): + self.ditem.stage = "Queued" + self.assertEqual(self.ditem.stage, "Queued") + + self.ditem.stage = "Active" + self.assertEqual(self.ditem.stage, "Active") + + self.ditem.stage = "Completed" + self.assertEqual(self.ditem.stage, "Completed") + + self.ditem.stage = "Paused" + self.assertEqual(self.ditem.stage, "Paused") + + def test_set_stage_invalid(self): + raised = False + + try: + self.ditem.stage = "some other status" + except ValueError: + raised = True + + self.assertTrue(raised) + + +class TestUpdateStats(unittest.TestCase): + + """Test case for DownloadItem update_stats method.""" + + def setUp(self): + self.ditem = DownloadItem("url", ["-f", "flv"]) + + def test_update_stats(self): + path = os.path.join("/home", "user") + + self.ditem.update_stats({"filename": "somefilename", + "extension": ".mp4", + "filesize": "9.45MiB", + "percent": "2.0%", + "speed": "200.00KiB/s", + "eta": "00:38", + "status": "Downloading", + "path": path}) + + self.assertEqual(self.ditem.path, path) + self.assertEqual(self.ditem.filenames, ["somefilename"]) + self.assertEqual(self.ditem.extensions, [".mp4"]) + self.assertEqual( + self.ditem.progress_stats, + {"filename": "somefilename", + "extension": ".mp4", + "filesize": "9.45MiB", + "percent": "2.0%", + "speed": "200.00KiB/s", + "eta": "00:38", + "status": "Downloading"} + ) + + self.ditem.update_stats({"filename": "someotherfilename", "extension": ".m4a"}) + + self.assertEqual(self.ditem.filenames, ["somefilename", "someotherfilename"]) + self.assertEqual(self.ditem.extensions, [".mp4", ".m4a"]) + self.assertEqual( + self.ditem.progress_stats, + {"filename": "someotherfilename", + "extension": ".m4a", + "filesize": "9.45MiB", + "percent": "2.0%", + "speed": "200.00KiB/s", + "eta": "00:38", + "status": "Downloading"} + ) + + def test_update_stats_invalid_input(self): + self.assertRaises(AssertionError, self.ditem.update_stats, []) + + +class TestDownloadItemPrivate(unittest.TestCase): + + """Test case for private method of the DownloadItem.""" + + def test_set_stage(self): + ditem = DownloadItem("url", ["-f", "flv"]) + + active_status = ["Pre Processing", "Downloading", "Post Processing"] + complete_status = ["Finished", "Error", "Warning", "Stopped", "Already Downloaded", "Filesize Abort"] + + for status in active_status: + ditem._stage = "Queued" + ditem._set_stage(status) + self.assertEqual(ditem.stage, "Active") + + for status in complete_status: + ditem._stage = "Active" + ditem._set_stage(status) + self.assertEqual(ditem.stage, "Completed") + + +def main(): + unittest.main() + + +if __name__ == '__main__': + main() diff --git a/tests/test_dlist.py b/tests/test_dlist.py new file mode 100644 index 0000000..0d5eec8 --- /dev/null +++ b/tests/test_dlist.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +"""Contains test cases for the DownloadList object.""" + +import sys +import os.path +import unittest + +PATH = os.path.realpath(os.path.abspath(__file__)) +sys.path.insert(0, os.path.dirname(os.path.dirname(PATH))) + +try: + import mock + from youtube_dl_gui.downloadmanager import DownloadList, synchronized +except ImportError as error: + print error + sys.exit(1) + + +class TestInit(unittest.TestCase): + + """Test case for the DownloadList init.""" + + def test_init(self): + mocks = [mock.Mock(object_id=0), mock.Mock(object_id=1)] + + dlist = DownloadList(mocks) + self.assertEqual(dlist._items_list, [0, 1]) + self.assertEqual(dlist._items_dict, {0: mocks[0], 1: mocks[1]}) + + def test_init_empty(self): + dlist = DownloadList() + self.assertEqual(dlist._items_list, []) + self.assertEqual(dlist._items_dict, {}) + + def test_init_invalid_args(self): + self.assertRaises(AssertionError, DownloadList, {}) + self.assertRaises(AssertionError, DownloadList, ()) + self.assertRaises(AssertionError, DownloadList, False) + + +class TestInsert(unittest.TestCase): + + """Test case for the DownloadList insert method.""" + + def test_insert(self): + mock_ditem = mock.Mock(object_id=0) + + dlist = DownloadList() + dlist.insert(mock_ditem) + + self.assertEqual(dlist._items_list, [0]) + self.assertEqual(dlist._items_dict, {0: mock_ditem}) + + +class TestRemove(unittest.TestCase): + + """Test case for the DownloadList remove method.""" + + def setUp(self): + self.mocks = [mock.Mock(object_id=0), mock.Mock(object_id=1), mock.Mock(object_id=2)] + self.dlist = DownloadList(self.mocks) + + def test_remove(self): + self.assertTrue(self.dlist.remove(1)) + + self.assertEqual(self.dlist._items_list, [0, 2]) + self.assertEqual(self.dlist._items_dict, {0: self.mocks[0], 2: self.mocks[2]}) + + def test_remove_not_exist(self): + self.assertRaises(KeyError, self.dlist.remove, 3) + + def test_remove_active(self): + self.mocks[1].stage = "Active" + + self.assertFalse(self.dlist.remove(1)) + self.assertEqual(self.dlist._items_list, [0, 1, 2]) + self.assertEqual(self.dlist._items_dict, {0: self.mocks[0], 1: self.mocks[1], 2: self.mocks[2]}) + + +class TestFetchNext(unittest.TestCase): + + """Test case for the DownloadList fetch_next method.""" + + def test_fetch_next(self): + items_count = 3 + + mocks = [mock.Mock(object_id=i, stage="Queued") for i in range(items_count)] + + dlist = DownloadList(mocks) + + for i in range(items_count): + self.assertEqual(dlist.fetch_next(), mocks[i]) + mocks[i].stage = "Active" + + self.assertIsNone(dlist.fetch_next()) + + for i in range(items_count): + mocks[i].stage = "Completed" + + self.assertIsNone(dlist.fetch_next()) + + mocks[1].stage = "Queued" # Re-queue item + self.assertEqual(dlist.fetch_next(), mocks[1]) + + def test_fetch_next_empty_list(self): + dlist = DownloadList() + self.assertIsNone(dlist.fetch_next()) + + +class TestMoveUp(unittest.TestCase): + + """Test case for the DownloadList move_up method.""" + + def setUp(self): + mocks = [mock.Mock(object_id=i, stage="Queued") for i in range(3)] + self.dlist = DownloadList(mocks) + + def test_move_up(self): + self.dlist.move_up(1) + self.assertEqual(self.dlist._items_list, [1, 0, 2]) + + def test_move_up_already_on_top(self): + self.dlist.move_up(0) + self.assertEqual(self.dlist._items_list, [0, 1, 2]) + + def test_move_up_not_exist(self): + self.assertRaises(ValueError, self.dlist.move_up, 666) + + +class TestMoveDown(unittest.TestCase): + + """Test case for the DownloadList move_down method.""" + + def setUp(self): + mocks = [mock.Mock(object_id=i, stage="Queued") for i in range(3)] + self.dlist = DownloadList(mocks) + + def test_move_down(self): + self.dlist.move_down(1) + self.assertEqual(self.dlist._items_list, [0, 2, 1]) + + def test_move_down_already_on_bottom(self): + self.dlist.move_down(2) + self.assertEqual(self.dlist._items_list, [0, 1, 2]) + + def test_move_down_not_exist(self): + self.assertRaises(ValueError, self.dlist.move_down, 666) + + +class TestGetItem(unittest.TestCase): + + """Test case for the DownloadList get_item method.""" + + def test_get_item(self): + mocks = [mock.Mock(object_id=i) for i in range(3)] + dlist = DownloadList(mocks) + + self.assertEqual(dlist.get_item(0), mocks[0]) + self.assertEqual(dlist.get_item(2), mocks[2]) + + def test_get_item_not_exist(self): + dlist = DownloadList() + self.assertRaises(KeyError, dlist.get_item, 0) + + +class TestGetLength(unittest.TestCase): + + """Test case for the DownloadList __len__ method.""" + + def test_get_length(self): + dlist = DownloadList([mock.Mock(), mock.Mock()]) + self.assertEqual(len(dlist), 2) + + def test_get_length_empty_list(self): + dlist = DownloadList() + self.assertEqual(len(dlist), 0) + + +class TestHasItem(unittest.TestCase): + + """Test case for the DownloadList has_item method.""" + + def setUp(self): + mock_ditem = mock.Mock(object_id=1337) + self.dlist = DownloadList([mock_ditem]) + + def test_has_item_true(self): + self.assertTrue(self.dlist.has_item(1337)) + + def test_has_item_false(self): + self.assertFalse(self.dlist.has_item(1000)) + + +class TestGetItems(unittest.TestCase): + + """Test case for the DownloadList get_items method.""" + + def test_get_items(self): + mocks = [mock.Mock() for _ in range(3)] + dlist = DownloadList(mocks) + + self.assertEqual(dlist.get_items(), mocks) + + def test_get_items_empty_list(self): + dlist = DownloadList() + self.assertEqual(dlist.get_items(), []) + + +class TestSynchronizeDecorator(unittest.TestCase): + + def test_synchronize(self): + mock_func = mock.Mock() + mock_lock = mock.Mock() + + decorated_func = synchronized(mock_lock)(mock_func) + + self.assertEqual(decorated_func(1, a=2), mock_func.return_value) + + mock_func.assert_called_once_with(1, a=2) + mock_lock.acquire.assert_called_once() + mock_lock.release.assert_called_once() + + +def main(): + unittest.main() + + +if __name__ == '__main__': + main() diff --git a/youtube_dl_gui/downloadmanager.py b/youtube_dl_gui/downloadmanager.py index a97c247..e24ca99 100644 --- a/youtube_dl_gui/downloadmanager.py +++ b/youtube_dl_gui/downloadmanager.py @@ -26,6 +26,7 @@ import os.path from threading import ( Thread, + RLock, Lock ) @@ -39,13 +40,226 @@ from .downloaders import YoutubeDLDownloader from .utils import ( YOUTUBEDL_BIN, - os_path_exists + os_path_exists, + to_string ) MANAGER_PUB_TOPIC = 'dlmanager' WORKER_PUB_TOPIC = 'dlworker' +_SYNC_LOCK = RLock() + +# Decorator that adds thread synchronization to a function +def synchronized(lock): + def _decorator(func): + def _wrapper(*args, **kwargs): + lock.acquire() + ret_value = func(*args, **kwargs) + lock.release() + return ret_value + return _wrapper + return _decorator + + +class DownloadItem(object): + + """Object that represents a download. + + Attributes: + STAGES (tuple): Main stages of the download item. + + ACTIVE_STAGES (tuple): Sub stages of the 'Active' stage. + + COMPLETED_STAGES (tuple): Sub stages of the 'Completed' stage. + + Args: + url (string): URL that corresponds to the download item. + + options (list): Options list to use during the download phase. + + """ + + STAGES = ("Queued", "Active", "Paused", "Completed") + + ACTIVE_STAGES = ("Pre Processing", "Downloading", "Post Processing") + + COMPLETED_STAGES = ("Finished", "Error", "Warning", "Stopped", "Already Downloaded", "Filesize Abort") + + def __init__(self, url, options): + self.url = url + self.options = options + self._stage = self.STAGES[0] + + self.object_id = hash(url + to_string(options)) + + self.path = "" + self.filenames = [] + self.extensions = [] + + self.progress_stats = { + "filename": url, + "extension": "-", + "filesize": "-", + "percent": "0%", + "speed": "-", + "eta": "-", + "status": self.stage + } + + @property + def stage(self): + return self._stage + + @stage.setter + def stage(self, value): + if value not in self.STAGES: + raise ValueError(value) + + self._stage = value + + def get_files(self): + """Returns a list that contains all the system files bind to this object.""" + files = [] + + for index, item in enumerate(self.filenames): + path = os.path.join(self.path, item, self.extensions[index]) + files.append(path) + + return files + + def update_stats(self, stats_dict): + """Updates the progress_stats dict from the given dictionary.""" + assert isinstance(stats_dict, dict) + + for key in stats_dict: + if key in self.progress_stats: + self.progress_stats[key] = stats_dict[key] + + # Extract extra stuff + if key == "filename": + if stats_dict[key] not in self.filenames: + self.filenames.append(stats_dict[key]) + + if key == "extension": + if stats_dict[key] not in self.extensions: + self.extensions.append(stats_dict[key]) + + if key == "path": + self.path = stats_dict[key] + + if key == "status": + self._set_stage(stats_dict[key]) + + def _set_stage(self, status): + if status in self.ACTIVE_STAGES: + self._stage = self.STAGES[1] + + if status in self.COMPLETED_STAGES: + self._stage = self.STAGES[3] + + def __eq__(self, other): + return self.object_id == other.object_id + + + +class DownloadList(object): + + """List like data structure that contains DownloadItems. + + Args: + items (list): List that contains DownloadItems. + + """ + + def __init__(self, items=None): + assert isinstance(items, list) or items is None + + if items is None: + self._items_dict = {} # Speed up lookup + self._items_list = [] # Keep the sequence + else: + self._items_list = [item.object_id for item in items] + self._items_dict = {item.object_id: item for item in items} + + @synchronized(_SYNC_LOCK) + def insert(self, item): + """Inserts the given item to the list. Does not check for duplicates. """ + self._items_list.append(item.object_id) + self._items_dict[item.object_id] = item + + @synchronized(_SYNC_LOCK) + def remove(self, object_id): + """Removes an item from the list. + + Removes the item with the corresponding object_id from + the list if the item is not in 'Active' state. + + Returns: + True on success else False. + + """ + if self._items_dict[object_id].stage != "Active": + self._items_list.remove(object_id) + del self._items_dict[object_id] + + return True + return False + + @synchronized(_SYNC_LOCK) + def fetch_next(self): + """Returns the next queued item on the list. + + Returns: + Next queued item or None if no other item exist. + + """ + for object_id in self._items_list: + cur_item = self._items_dict[object_id] + + if cur_item.stage == "Queued": + return cur_item + + return None + + @synchronized(_SYNC_LOCK) + def move_up(self, object_id): + """Moves the item with the corresponding object_id up to the list.""" + index = self._items_list.index(object_id) + + if index > 0: + self._swap(index, index - 1) + + @synchronized(_SYNC_LOCK) + def move_down(self, object_id): + """Moves the item with the corresponding object_id down to the list.""" + index = self._items_list.index(object_id) + + if index < (len(self._items_list) - 1): + self._swap(index, index + 1) + + @synchronized(_SYNC_LOCK) + def get_item(self, object_id): + """Returns the DownloadItem with the given object_id.""" + return self._items_dict[object_id] + + @synchronized(_SYNC_LOCK) + def has_item(self, object_id): + """Returns True if the given object_id is in the list else False.""" + return object_id in self._items_list + + @synchronized(_SYNC_LOCK) + def get_items(self): + """Returns a list with all the items.""" + return [self._items_dict[object_id] for object_id in self._items_list] + + @synchronized(_SYNC_LOCK) + def __len__(self): + return len(self._items_list) + + def _swap(self, index1, index2): + self._items_list[index1], self._items_list[index2] = self._items_list[index2], self._items_list[index1] + class DownloadManager(Thread):