Browse Source

Re-write DownloadManager using Thread Pool

Re-write DownloadManager, DownloadThread(Worker) using
a custom thread pool implementation for speed. We don't
want to create a new thread for each url when the same
thread can be re-used.
doc-issue-template
MrS0m30n3 10 years ago
parent
commit
3d4b78ad93
4 changed files with 177 additions and 248 deletions
  1. 345
      youtube_dl_gui/dlthread.py
  2. 15
      youtube_dl_gui/downloaders.py
  3. 63
      youtube_dl_gui/mainframe.py
  4. 2
      youtube_dl_gui/updthread.py

345
youtube_dl_gui/dlthread.py

@ -11,225 +11,161 @@ from wx.lib.pubsub import setuparg1
from wx.lib.pubsub import pub as Publisher
from .parsers import OptionsParser
from .updthread import UpdateThread
from .downloaders import YoutubeDLDownloader
from .utils import YOUTUBEDL_BIN
class DownloadManager(Thread):
'''
Manage youtube-dlG download list.
Params
threads_list: Python list that contains DownloadThread objects.
update_thread: UpdateThread.py thread.
Accessible Methods
close()
Params: None
Return: None
add_thread()
Params: DownloadThread object
Return: None
alive_threads()
Params: None
Return: Number of alive threads.
not_finished()
Params: None
Return: Number of threads not finished yet.
Properties
successful_downloads: Number of successful downloads.
time: Time (seconds) it took for all downloads to complete.
'''
""" DownloadManager """
PUBLISHER_TOPIC = 'dlmanager'
MAX_DOWNLOAD_THREADS = 3
def __init__(self, threads_list, update_thread=None):
WORKERS_NUMBER = 3
WAIT_TIME = 0.1
def __init__(self, urls_list, opt_manager, log_manager=None):
super(DownloadManager, self).__init__()
self.threads_list = threads_list
self.update_thread = update_thread
self._successful_downloads = 0
self.opt_manager = opt_manager
self.log_manager = log_manager
self.urls_list = urls_list
self._time_it_took = 0
self._successful = 0
self._running = True
self._time = 0
self._workers = self._init_workers()
self.start()
@property
def successful(self):
return self._successful
@property
def time_it_took(self):
return self._time_it_took
def increase_succ(self):
self._successful += 1
def run(self):
if self.update_thread is not None:
self.update_thread.join()
self._check_youtubedl()
self._time_it_took = time.time()
self._time = time.time()
# Main loop
while self._running and not self._threads_finished():
for thread in self.threads_list:
if not self._running:
break
self._start_thread(thread)
while self._running:
for worker in self._workers:
if worker.available() and self.urls_list:
worker.download(self.urls_list.pop(0))
time.sleep(self.WAIT_TIME)
if not self.urls_list and self._jobs_done():
break
time.sleep(0.1)
# Make sure no child thread is alive
for thread in self.threads_list:
if thread.is_alive():
thread.join()
# Collect thread status
if thread.status == 0:
self._successful_downloads += 1
self._time = time.time() - self._time
# Clean up
for worker in self._workers:
worker.close()
worker.join()
self._time_it_took = time.time() - self._time_it_took
if not self._running:
self._callafter('closed')
self._talk_to_gui('closed')
else:
self._callafter('finished')
@property
def time(self):
''' Return time it took for every download to finish. '''
return self._time
@property
def successful_downloads(self):
''' Return number of successful downloads. '''
return self._successful_downloads
def close(self):
''' Close DownloadManager. '''
self._callafter('closing')
self._running = False
for thread in self.threads_list:
thread.close()
def add_thread(self, thread):
''' Add new DownloadThread on self.threads_list. '''
self.threads_list.append(thread)
def alive_threads(self):
''' Return number of alive threads in self.threads_list. '''
counter = 0
for thread in self.threads_list:
if thread.is_alive():
counter += 1
return counter
def not_finished(self):
''' Return number of threads not finished. '''
self._talk_to_gui('finished')
def active(self):
counter = 0
for thread in self.threads_list:
if thread.ident is None or thread.is_alive():
for worker in self._workers:
if not worker.available():
counter += 1
counter += len(self.urls_list)
return counter
def _start_thread(self, thread):
''' Start given thread if not download queue full. '''
while self.alive_threads() >= self.MAX_DOWNLOAD_THREADS:
time.sleep(1)
if not self._running:
break
# If thread has not started
if thread.ident is None and self._running:
thread.start()
def _threads_finished(self):
''' Return True if all threads in self.threads_list have finished. '''
for thread in self.threads_list:
# If thread has not started or thread is alive
if thread.ident is None or thread.is_alive():
def stop_downloads(self):
self._talk_to_gui('closing')
self._running = False
for worker in self._workers:
worker.stop_download()
def add_url(self, url):
self.urls_list.append(url)
def _talk_to_gui(self, data):
CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data)
def _check_youtubedl(self):
if not os.path.exists(self._youtubedl_path()):
UpdateThread(self.opt_manager.options['youtubedl_path'], True).join()
def _jobs_done(self):
for worker in self._workers:
if not worker.available():
return False
return True
def _youtubedl_path(self):
path = self.opt_manager.options['youtubedl_path']
path = os.path.join(path, YOUTUBEDL_BIN)
return path
def _init_workers(self):
youtubedl = self._youtubedl_path()
return [Worker(self.opt_manager, youtubedl, self.increase_succ, self.log_manager) for i in xrange(self.WORKERS_NUMBER)]
def _callafter(self, data):
''' CallAfter wrapper. '''
CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data)
class DownloadThread(Thread):
'''
YoutubeDLDownloader Thread wrapper for youtube-dlg.
Params
url: Video url to download.
index: ListCtrl corresponding row for current thread.
opt_manager: OptionsManager.OptionsManager object.
log_manager: Any logger which implements log().
Accessible Methods
close()
Params: None
Return: None
Properties
status: Thread status.
'''
PUBLISHER_TOPIC = 'dlthread'
def __init__(self, url, index, opt_manager, log_manager=None):
super(DownloadThread, self).__init__()
self.url = url
self.index = index
class Worker(Thread):
PUBLISHER_TOPIC = 'dlworker'
WAIT_TIME = 0.1
def __init__(self, opt_manager, youtubedl, increase_succ, log_manager=None):
super(Worker, self).__init__()
self.increase_succ = increase_succ
self.opt_manager = opt_manager
self.log_manager = log_manager
self._downloader = None
self._status = 0
self._downloader = YoutubeDLDownloader(youtubedl, self._data_hook, log_manager)
self._options_parser = OptionsParser()
self._running = True
self._url = None
self._index = -1
self.start()
def run(self):
self._downloader = YoutubeDLDownloader(
self._get_youtubedl_path(),
self._data_hook,
self.log_manager
)
options = self._options_parser.parse(self.opt_manager.options)
return_code = self._downloader.download(self.url, options)
if (return_code == YoutubeDLDownloader.OK or
return_code == YoutubeDLDownloader.ALREADY):
self._status = 0
else:
self._status = 1
@property
def status(self):
''' Return thread status. Use this property after
thread has joined. (self._status != 0) indicates there was
an error.
'''
return self._status
while self._running:
if self._url is not None:
options = self._options_parser.parse(self.opt_manager.options)
ret_code = self._downloader.download(self._url, options)
if (ret_code == YoutubeDLDownloader.OK or
ret_code == YoutubeDLDownloader.ALREADY):
self.increase_succ()
# Reset
self._url = None
time.sleep(self.WAIT_TIME)
def download(self, item):
self._url = item['url']
self._index = item['index']
def stop_download(self):
self._downloader.stop()
def close(self):
''' Close download thread. '''
if self._downloader is not None:
self._downloader.stop()
self._running = False
self._downloader.stop()
def available(self):
return self._url is None
def _data_hook(self, data):
''' Merge playlist_info with data['status'] and
pass data to self._callafter.
'''
if data['status'] is not None and data['playlist_index'] is not None:
playlist_info = ' '
playlist_info += data['playlist_index']
@ -238,15 +174,18 @@ class DownloadThread(Thread):
data['status'] += playlist_info
self._callafter(data)
def _callafter(self, data):
''' Add self.index on data and send data back to caller. '''
data['index'] = self.index
self._talk_to_gui(data)
def _talk_to_gui(self, data):
data['index'] = self._index
CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data)
def _get_youtubedl_path(self):
''' Retrieve youtube-dl path. '''
path = self.opt_manager.options['youtubedl_path']
path = os.path.join(path, YOUTUBEDL_BIN)
return path
if __name__ == '__main__':
''' Direct call of module for testing. Before
you run the tests change relative imports or you will
get [ValueError: Attempted relative import in non-package].
You need to change relative imports on all the modules
you are gonna use.'''
print "No tests available"

15
youtube_dl_gui/downloaders.py

@ -73,7 +73,7 @@ class YoutubeDLDownloader(object):
''' Download given url using youtube-dl &
return self._return_code.
'''
self._return_code = self.OK
self._reset()
cmd = self._get_cmd(url, options)
self._create_process(cmd)
@ -117,6 +117,19 @@ class YoutubeDLDownloader(object):
self._hook_data()
def _reset(self):
self._return_code = 0
self._data = {
'playlist_index': None,
'playlist_size': None,
'filesize': None,
'filename': None,
'percent': None,
'status': None,
'speed': None,
'eta': None
}
def _sync_data(self, data):
''' Synchronise self._data with data. '''
for key in data:

63
youtube_dl_gui/mainframe.py

@ -12,7 +12,7 @@ from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin
from .optionsframe import OptionsFrame
from .updthread import UpdateThread
from .dlthread import DownloadManager, DownloadThread
from .dlthread import DownloadManager
from .utils import (
YOUTUBEDL_BIN,
@ -110,7 +110,7 @@ class MainFrame(wx.Frame):
# Set threads wx.CallAfter handlers using subscribe
self._set_publisher(self._update_handler, 'update')
self._set_publisher(self._status_list_handler, 'dlthread')
self._set_publisher(self._status_list_handler, 'dlworker')
self._set_publisher(self._download_manager_handler, 'dlmanager')
def _set_publisher(self, handler, topic):
@ -174,20 +174,11 @@ class MainFrame(wx.Frame):
self._panel.SetSizer(hor_sizer)
def _youtubedl_exist(self):
''' Return True if youtube-dl executable exists. '''
path = os.path.join(self.opt_manager.options['youtubedl_path'],
YOUTUBEDL_BIN)
return os.path.exists(path)
def _update_youtubedl(self, quiet=False):
def _update_youtubedl(self):
''' Update youtube-dl executable. '''
if not quiet:
self._update_btn.Disable()
self._download_btn.Disable()
self.update_thread = UpdateThread(self.opt_manager.options['youtubedl_path'], quiet)
self._update_btn.Disable()
self._download_btn.Disable()
self.update_thread = UpdateThread(self.opt_manager.options['youtubedl_path'])
def _status_bar_write(self, msg):
''' Write msg to self._status_bar. '''
@ -201,8 +192,8 @@ class MainFrame(wx.Frame):
def _print_stats(self):
''' Print stats to self._status_bar after downloading. '''
suc_downloads = self.download_manager.successful_downloads
dtime = get_time(self.download_manager.time)
suc_downloads = self.download_manager.successful
dtime = get_time(self.download_manager.time_it_took)
days = int(dtime['days'])
hours = int(dtime['hours'])
@ -229,7 +220,7 @@ class MainFrame(wx.Frame):
self._status_list.write(data)
# Report urls been downloaded
msg = self.URL_REPORT_MSG.format(self.download_manager.not_finished())
msg = self.URL_REPORT_MSG.format(self.download_manager.active())
self._status_bar_write(msg)
def _download_manager_handler(self, msg):
@ -271,39 +262,25 @@ class MainFrame(wx.Frame):
self.ERROR_LABEL,
wx.OK | wx.ICON_EXCLAMATION)
else:
if not self._youtubedl_exist():
self._update_youtubedl(True)
threads_list = [self._create_thread(item) for item in self._status_list.get_items()]
self.download_manager = DownloadManager(threads_list, self.update_thread)
self.download_manager = DownloadManager(self._status_list.get_items(),
self.opt_manager,
self.log_manager)
self._status_bar_write(self.DOWNLOAD_STARTED)
self._download_btn.SetLabel(self.STOP_LABEL)
self._update_btn.Disable()
def _create_thread(self, item):
''' Return DownloadThread created from item. '''
return DownloadThread(item['url'],
item['index'],
self.opt_manager,
self.log_manager)
def _add_thread(self, item):
thread = self._create_thread(item)
self.download_manager.add_thread(thread)
def _on_urllist_edit(self, event):
''' Dynamically add url for download.'''
if self.download_manager is not None:
self._status_list.load_urls(self._get_urls(), self._add_thread)
self._status_list.load_urls(self._get_urls(), self.download_manager.add_url)
def _on_download(self, event):
''' Event handler method for self._download_btn. '''
if self.download_manager is None:
self._start_download()
else:
self.download_manager.close()
self.download_manager.stop_downloads()
def _on_update(self, event):
''' Event handler method for self._update_btn. '''
@ -317,7 +294,7 @@ class MainFrame(wx.Frame):
def _on_close(self, event):
''' Event handler method (wx.EVT_CLOSE). '''
if self.download_manager is not None:
self.download_manager.close()
self.download_manager.stop_downloads()
self.download_manager.join()
if self.update_thread is not None:
@ -374,7 +351,7 @@ class ListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin):
ListCtrlAutoWidthMixin.__init__(self)
self.columns = columns
self._list_index = 0
self._url_list = list()
self._url_list = set()
self._set_columns()
def write(self, data):
@ -405,14 +382,14 @@ class ListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin):
def add_url(self, url):
''' Add url on ListCtrl. '''
self.InsertStringItem(self._list_index, url)
self._url_list.append(url)
self._url_list.add(url)
self._list_index += 1
def clear(self):
''' Clear ListCtrl & reset self._list_index. '''
self.DeleteAllItems()
self._list_index = 0
self._url_list = list()
self._url_list = set()
def is_empty(self):
''' Return True if list is empty. '''
@ -422,7 +399,7 @@ class ListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin):
''' Return list of items in ListCtrl. '''
items = []
for row in range(self._list_index):
for row in xrange(self._list_index):
item = self._get_item(row)
items.append(item)

2
youtube_dl_gui/updthread.py

@ -32,7 +32,7 @@ class UpdateThread(Thread):
PUBLISHER_TOPIC = 'update'
DOWNLOAD_TIMEOUT = 20
def __init__(self, download_path, quiet):
def __init__(self, download_path, quiet=False):
super(UpdateThread, self).__init__()
self.download_path = download_path
self.quiet = quiet

Loading…
Cancel
Save