From 3d4b78ad934262042468d064707f2dbd7745d92a Mon Sep 17 00:00:00 2001 From: MrS0m30n3 Date: Sun, 21 Dec 2014 20:15:26 +0200 Subject: [PATCH] Re-write DownloadManager using Thread Pool Re-write DownloadManager, DownloadThread(Worker) using a custom thread pool implementation for speed. We don't want to create a new thread for each url when the same thread can be re-used. --- youtube_dl_gui/dlthread.py | 345 ++++++++++++++-------------------- youtube_dl_gui/downloaders.py | 15 +- youtube_dl_gui/mainframe.py | 63 ++----- youtube_dl_gui/updthread.py | 2 +- 4 files changed, 177 insertions(+), 248 deletions(-) diff --git a/youtube_dl_gui/dlthread.py b/youtube_dl_gui/dlthread.py index 7765afd..a38fb2b 100644 --- a/youtube_dl_gui/dlthread.py +++ b/youtube_dl_gui/dlthread.py @@ -11,225 +11,161 @@ from wx.lib.pubsub import setuparg1 from wx.lib.pubsub import pub as Publisher from .parsers import OptionsParser +from .updthread import UpdateThread from .downloaders import YoutubeDLDownloader from .utils import YOUTUBEDL_BIN class DownloadManager(Thread): - - ''' - Manage youtube-dlG download list. - - Params - threads_list: Python list that contains DownloadThread objects. - - update_thread: UpdateThread.py thread. - - Accessible Methods - close() - Params: None - - Return: None - - add_thread() - Params: DownloadThread object - - Return: None - - alive_threads() - Params: None - - Return: Number of alive threads. - - not_finished() - Params: None - - Return: Number of threads not finished yet. - - Properties - successful_downloads: Number of successful downloads. - time: Time (seconds) it took for all downloads to complete. - ''' - + + """ DownloadManager """ + PUBLISHER_TOPIC = 'dlmanager' - MAX_DOWNLOAD_THREADS = 3 - - def __init__(self, threads_list, update_thread=None): + WORKERS_NUMBER = 3 + WAIT_TIME = 0.1 + + def __init__(self, urls_list, opt_manager, log_manager=None): super(DownloadManager, self).__init__() - self.threads_list = threads_list - self.update_thread = update_thread - self._successful_downloads = 0 + self.opt_manager = opt_manager + self.log_manager = log_manager + self.urls_list = urls_list + + self._time_it_took = 0 + self._successful = 0 self._running = True - self._time = 0 + + self._workers = self._init_workers() self.start() - + + @property + def successful(self): + return self._successful + + @property + def time_it_took(self): + return self._time_it_took + + def increase_succ(self): + self._successful += 1 + def run(self): - if self.update_thread is not None: - self.update_thread.join() + self._check_youtubedl() + self._time_it_took = time.time() - self._time = time.time() - - # Main loop - while self._running and not self._threads_finished(): - for thread in self.threads_list: - if not self._running: - break - - self._start_thread(thread) + while self._running: + for worker in self._workers: + if worker.available() and self.urls_list: + worker.download(self.urls_list.pop(0)) + + time.sleep(self.WAIT_TIME) + + if not self.urls_list and self._jobs_done(): + break - time.sleep(0.1) - - # Make sure no child thread is alive - for thread in self.threads_list: - if thread.is_alive(): - thread.join() - - # Collect thread status - if thread.status == 0: - self._successful_downloads += 1 - - self._time = time.time() - self._time - + # Clean up + for worker in self._workers: + worker.close() + worker.join() + + self._time_it_took = time.time() - self._time_it_took + if not self._running: - self._callafter('closed') + self._talk_to_gui('closed') else: - self._callafter('finished') - - @property - def time(self): - ''' Return time it took for every download to finish. ''' - return self._time - - @property - def successful_downloads(self): - ''' Return number of successful downloads. ''' - return self._successful_downloads - - def close(self): - ''' Close DownloadManager. ''' - self._callafter('closing') - self._running = False - for thread in self.threads_list: - thread.close() - - def add_thread(self, thread): - ''' Add new DownloadThread on self.threads_list. ''' - self.threads_list.append(thread) - - def alive_threads(self): - ''' Return number of alive threads in self.threads_list. ''' - counter = 0 - - for thread in self.threads_list: - if thread.is_alive(): - counter += 1 - - return counter - - def not_finished(self): - ''' Return number of threads not finished. ''' + self._talk_to_gui('finished') + + def active(self): counter = 0 - - for thread in self.threads_list: - if thread.ident is None or thread.is_alive(): + for worker in self._workers: + if not worker.available(): counter += 1 - + + counter += len(self.urls_list) + return counter - - def _start_thread(self, thread): - ''' Start given thread if not download queue full. ''' - while self.alive_threads() >= self.MAX_DOWNLOAD_THREADS: - time.sleep(1) - - if not self._running: - break - - # If thread has not started - if thread.ident is None and self._running: - thread.start() - - def _threads_finished(self): - ''' Return True if all threads in self.threads_list have finished. ''' - for thread in self.threads_list: - # If thread has not started or thread is alive - if thread.ident is None or thread.is_alive(): + + def stop_downloads(self): + self._talk_to_gui('closing') + self._running = False + for worker in self._workers: + worker.stop_download() + + def add_url(self, url): + self.urls_list.append(url) + + def _talk_to_gui(self, data): + CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data) + + def _check_youtubedl(self): + if not os.path.exists(self._youtubedl_path()): + UpdateThread(self.opt_manager.options['youtubedl_path'], True).join() + + def _jobs_done(self): + for worker in self._workers: + if not worker.available(): return False - + return True + + def _youtubedl_path(self): + path = self.opt_manager.options['youtubedl_path'] + path = os.path.join(path, YOUTUBEDL_BIN) + return path + + def _init_workers(self): + youtubedl = self._youtubedl_path() + return [Worker(self.opt_manager, youtubedl, self.increase_succ, self.log_manager) for i in xrange(self.WORKERS_NUMBER)] - def _callafter(self, data): - ''' CallAfter wrapper. ''' - CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data) - - -class DownloadThread(Thread): - - ''' - YoutubeDLDownloader Thread wrapper for youtube-dlg. - - Params - url: Video url to download. - index: ListCtrl corresponding row for current thread. - opt_manager: OptionsManager.OptionsManager object. - log_manager: Any logger which implements log(). - - Accessible Methods - close() - Params: None - - Return: None - - Properties - status: Thread status. - ''' - - PUBLISHER_TOPIC = 'dlthread' - - def __init__(self, url, index, opt_manager, log_manager=None): - super(DownloadThread, self).__init__() - self.url = url - self.index = index + +class Worker(Thread): + + PUBLISHER_TOPIC = 'dlworker' + WAIT_TIME = 0.1 + + def __init__(self, opt_manager, youtubedl, increase_succ, log_manager=None): + super(Worker, self).__init__() + self.increase_succ = increase_succ self.opt_manager = opt_manager - self.log_manager = log_manager - self._downloader = None - self._status = 0 + + self._downloader = YoutubeDLDownloader(youtubedl, self._data_hook, log_manager) self._options_parser = OptionsParser() - + self._running = True + self._url = None + self._index = -1 + + self.start() + def run(self): - self._downloader = YoutubeDLDownloader( - self._get_youtubedl_path(), - self._data_hook, - self.log_manager - ) - - options = self._options_parser.parse(self.opt_manager.options) - - return_code = self._downloader.download(self.url, options) - - if (return_code == YoutubeDLDownloader.OK or - return_code == YoutubeDLDownloader.ALREADY): - self._status = 0 - else: - self._status = 1 - - @property - def status(self): - ''' Return thread status. Use this property after - thread has joined. (self._status != 0) indicates there was - an error. - ''' - return self._status - + while self._running: + if self._url is not None: + options = self._options_parser.parse(self.opt_manager.options) + ret_code = self._downloader.download(self._url, options) + + if (ret_code == YoutubeDLDownloader.OK or + ret_code == YoutubeDLDownloader.ALREADY): + self.increase_succ() + + # Reset + self._url = None + + time.sleep(self.WAIT_TIME) + + def download(self, item): + self._url = item['url'] + self._index = item['index'] + + def stop_download(self): + self._downloader.stop() + def close(self): - ''' Close download thread. ''' - if self._downloader is not None: - self._downloader.stop() - + self._running = False + self._downloader.stop() + + def available(self): + return self._url is None + def _data_hook(self, data): - ''' Merge playlist_info with data['status'] and - pass data to self._callafter. - ''' if data['status'] is not None and data['playlist_index'] is not None: playlist_info = ' ' playlist_info += data['playlist_index'] @@ -238,15 +174,18 @@ class DownloadThread(Thread): data['status'] += playlist_info - self._callafter(data) - - def _callafter(self, data): - ''' Add self.index on data and send data back to caller. ''' - data['index'] = self.index + self._talk_to_gui(data) + + def _talk_to_gui(self, data): + data['index'] = self._index CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data) - def _get_youtubedl_path(self): - ''' Retrieve youtube-dl path. ''' - path = self.opt_manager.options['youtubedl_path'] - path = os.path.join(path, YOUTUBEDL_BIN) - return path + +if __name__ == '__main__': + ''' Direct call of module for testing. Before + you run the tests change relative imports or you will + get [ValueError: Attempted relative import in non-package]. + You need to change relative imports on all the modules + you are gonna use.''' + print "No tests available" + diff --git a/youtube_dl_gui/downloaders.py b/youtube_dl_gui/downloaders.py index 14fb9d2..420bb7f 100644 --- a/youtube_dl_gui/downloaders.py +++ b/youtube_dl_gui/downloaders.py @@ -73,7 +73,7 @@ class YoutubeDLDownloader(object): ''' Download given url using youtube-dl & return self._return_code. ''' - self._return_code = self.OK + self._reset() cmd = self._get_cmd(url, options) self._create_process(cmd) @@ -117,6 +117,19 @@ class YoutubeDLDownloader(object): self._hook_data() + def _reset(self): + self._return_code = 0 + self._data = { + 'playlist_index': None, + 'playlist_size': None, + 'filesize': None, + 'filename': None, + 'percent': None, + 'status': None, + 'speed': None, + 'eta': None + } + def _sync_data(self, data): ''' Synchronise self._data with data. ''' for key in data: diff --git a/youtube_dl_gui/mainframe.py b/youtube_dl_gui/mainframe.py index 85be15c..7221d32 100644 --- a/youtube_dl_gui/mainframe.py +++ b/youtube_dl_gui/mainframe.py @@ -12,7 +12,7 @@ from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin from .optionsframe import OptionsFrame from .updthread import UpdateThread -from .dlthread import DownloadManager, DownloadThread +from .dlthread import DownloadManager from .utils import ( YOUTUBEDL_BIN, @@ -110,7 +110,7 @@ class MainFrame(wx.Frame): # Set threads wx.CallAfter handlers using subscribe self._set_publisher(self._update_handler, 'update') - self._set_publisher(self._status_list_handler, 'dlthread') + self._set_publisher(self._status_list_handler, 'dlworker') self._set_publisher(self._download_manager_handler, 'dlmanager') def _set_publisher(self, handler, topic): @@ -174,20 +174,11 @@ class MainFrame(wx.Frame): self._panel.SetSizer(hor_sizer) - def _youtubedl_exist(self): - ''' Return True if youtube-dl executable exists. ''' - path = os.path.join(self.opt_manager.options['youtubedl_path'], - YOUTUBEDL_BIN) - - return os.path.exists(path) - - def _update_youtubedl(self, quiet=False): + def _update_youtubedl(self): ''' Update youtube-dl executable. ''' - if not quiet: - self._update_btn.Disable() - self._download_btn.Disable() - - self.update_thread = UpdateThread(self.opt_manager.options['youtubedl_path'], quiet) + self._update_btn.Disable() + self._download_btn.Disable() + self.update_thread = UpdateThread(self.opt_manager.options['youtubedl_path']) def _status_bar_write(self, msg): ''' Write msg to self._status_bar. ''' @@ -201,8 +192,8 @@ class MainFrame(wx.Frame): def _print_stats(self): ''' Print stats to self._status_bar after downloading. ''' - suc_downloads = self.download_manager.successful_downloads - dtime = get_time(self.download_manager.time) + suc_downloads = self.download_manager.successful + dtime = get_time(self.download_manager.time_it_took) days = int(dtime['days']) hours = int(dtime['hours']) @@ -229,7 +220,7 @@ class MainFrame(wx.Frame): self._status_list.write(data) # Report urls been downloaded - msg = self.URL_REPORT_MSG.format(self.download_manager.not_finished()) + msg = self.URL_REPORT_MSG.format(self.download_manager.active()) self._status_bar_write(msg) def _download_manager_handler(self, msg): @@ -271,39 +262,25 @@ class MainFrame(wx.Frame): self.ERROR_LABEL, wx.OK | wx.ICON_EXCLAMATION) else: - if not self._youtubedl_exist(): - self._update_youtubedl(True) - - threads_list = [self._create_thread(item) for item in self._status_list.get_items()] - - self.download_manager = DownloadManager(threads_list, self.update_thread) + self.download_manager = DownloadManager(self._status_list.get_items(), + self.opt_manager, + self.log_manager) self._status_bar_write(self.DOWNLOAD_STARTED) self._download_btn.SetLabel(self.STOP_LABEL) self._update_btn.Disable() - - def _create_thread(self, item): - ''' Return DownloadThread created from item. ''' - return DownloadThread(item['url'], - item['index'], - self.opt_manager, - self.log_manager) - - def _add_thread(self, item): - thread = self._create_thread(item) - self.download_manager.add_thread(thread) def _on_urllist_edit(self, event): ''' Dynamically add url for download.''' if self.download_manager is not None: - self._status_list.load_urls(self._get_urls(), self._add_thread) - + self._status_list.load_urls(self._get_urls(), self.download_manager.add_url) + def _on_download(self, event): ''' Event handler method for self._download_btn. ''' if self.download_manager is None: self._start_download() else: - self.download_manager.close() + self.download_manager.stop_downloads() def _on_update(self, event): ''' Event handler method for self._update_btn. ''' @@ -317,7 +294,7 @@ class MainFrame(wx.Frame): def _on_close(self, event): ''' Event handler method (wx.EVT_CLOSE). ''' if self.download_manager is not None: - self.download_manager.close() + self.download_manager.stop_downloads() self.download_manager.join() if self.update_thread is not None: @@ -374,7 +351,7 @@ class ListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin): ListCtrlAutoWidthMixin.__init__(self) self.columns = columns self._list_index = 0 - self._url_list = list() + self._url_list = set() self._set_columns() def write(self, data): @@ -405,14 +382,14 @@ class ListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin): def add_url(self, url): ''' Add url on ListCtrl. ''' self.InsertStringItem(self._list_index, url) - self._url_list.append(url) + self._url_list.add(url) self._list_index += 1 def clear(self): ''' Clear ListCtrl & reset self._list_index. ''' self.DeleteAllItems() self._list_index = 0 - self._url_list = list() + self._url_list = set() def is_empty(self): ''' Return True if list is empty. ''' @@ -422,7 +399,7 @@ class ListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin): ''' Return list of items in ListCtrl. ''' items = [] - for row in range(self._list_index): + for row in xrange(self._list_index): item = self._get_item(row) items.append(item) diff --git a/youtube_dl_gui/updthread.py b/youtube_dl_gui/updthread.py index 8446daa..f2dd7cd 100644 --- a/youtube_dl_gui/updthread.py +++ b/youtube_dl_gui/updthread.py @@ -32,7 +32,7 @@ class UpdateThread(Thread): PUBLISHER_TOPIC = 'update' DOWNLOAD_TIMEOUT = 20 - def __init__(self, download_path, quiet): + def __init__(self, download_path, quiet=False): super(UpdateThread, self).__init__() self.download_path = download_path self.quiet = quiet