You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

203 lines
6.2 KiB

#! /usr/bin/env python
from time import sleep
from threading import Thread
from wx import CallAfter
from wx.lib.pubsub import setuparg1
from wx.lib.pubsub import pub as Publisher
from .YDLOptionsParser import OptionsParser
from .DownloadObject import DownloadObject
from .Utils import (
get_youtubedl_filename,
remove_file,
file_exist,
fix_path
)
class DownloadManager(Thread):
PUBLISHER_TOPIC = 'download_manager'
MAX_DOWNLOAD_THREADS = 3
def __init__(self, download_list, opt_manager, log_manager=None):
super(DownloadManager, self).__init__()
self.download_list = download_list
self.opt_manager = opt_manager
self.log_manager = log_manager
self._threads_lst = []
self._stopped = False
self._running = True
self._kill = False
self.start()
def run(self):
while self._running:
# If download list is not empty
if self.download_list:
dl_item = self.download_list[0]
index = dl_item['index']
url = dl_item['url']
self._check_download_queue()
if self._running:
self._download(url, index)
self.download_list.pop(0)
else:
if not self.downloading():
self._running = False
else:
sleep(0.1)
self._terminate_all()
if not self._kill:
if self._stopped:
self._callafter('closed')
else:
self._callafter('finished')
def downloading(self):
''' Return True if at least one download thread is alive '''
for thread in self._threads_lst:
if thread.is_alive():
return True
return False
def add_download_item(self, item):
''' Add download item on download list '''
self.download_list.append(item)
def get_items_counter(self):
''' Return download videos counter '''
counter = 0
for thread in self._threads_lst:
if thread.is_alive():
counter += 1
return len(self.download_list) + counter
def close(self, kill=False):
self._callafter('closing')
self._running = False
self._stopped = True
self._kill = kill
def _download(self, url, index):
''' Download given url '''
dl_thread = DownloadThread(url, index, self.opt_manager, self.log_manager)
self._threads_lst.append(dl_thread)
def _terminate_all(self):
''' Close down all download threads '''
for thread in self._threads_lst:
if thread.is_alive():
thread.close()
thread.join()
def _check_download_queue(self):
while len(self._threads_lst) >= self.MAX_DOWNLOAD_THREADS:
sleep(1)
for thread in self._threads_lst:
if not self._running:
return
if not thread.is_alive():
self._threads_lst.remove(thread)
def _callafter(self, data):
CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data)
class DownloadThread(Thread):
'''
Params
url: URL to download.
index: ListCtrl index for the current DownloadThread.
opt_manager: OptionsHandler.OptionsHandler object.
log_manager: Any logger which implements log().
Accessible Methods
close()
Params: None
'''
PUBLISHER_TOPIC = 'download_thread'
def __init__(self, url, index, opt_manager, log_manager=None):
super(DownloadThread, self).__init__()
self.log_manager = log_manager
self.opt_manager = opt_manager
self.index = index
self.url = url
self._dl_object = None
self.start()
def run(self):
youtubedl_path = self._get_youtubedl_path()
options = OptionsParser(self.opt_manager).parse()
self._dl_object = DownloadObject(youtubedl_path, self._data_hook, self.log_manager)
return_code = self._dl_object.download(self.url, options)
if self.opt_manager.options['clear_dash_files']:
self._clear_dash()
if return_code == DownloadObject.OK:
self._callafter({'status': 'Finished'})
elif return_code == DownloadObject.ERROR:
self._callafter({'status': 'Error', 'speed': '', 'eta': ''})
elif return_code == DownloadObject.STOPPED:
self._callafter({'status': 'Stopped', 'speed': '', 'eta': ''})
elif return_code == DownloadObject.ALREADY:
self._callafter({'status': 'Already-Downloaded'})
def close(self):
if self._dl_object is not None:
self._callafter({'status': 'Stopping'})
self._dl_object.stop()
def _clear_dash(self):
''' Clear DASH files after ffmpeg mux '''
for fl in self._dl_object.files_list:
if file_exist(fl):
remove_file(fl)
def _data_hook(self, data):
''' Extract process status and call CallAfter '''
data['status'] = self._get_status(data)
self._callafter(data)
def _callafter(self, data):
''' Add self.index on data and send data back to caller '''
data['index'] = self.index
CallAfter(Publisher.sendMessage, self.PUBLISHER_TOPIC, data)
def _get_status(self, data):
''' Return download process status from data['status'] '''
if data['playlist_index'] is not None:
playlist_info = '%s/%s' % (data['playlist_index'], data['playlist_size'])
else:
playlist_info = ''
if data['status'] == 'pre_process':
msg = 'Pre-Processing %s' % playlist_info
elif data['status'] == 'download':
msg = 'Downloading %s' % playlist_info
elif data['status'] == 'post_process':
msg = 'Post-Processing %s' % playlist_info
else:
msg = ''
return msg
def _get_youtubedl_path(self):
''' Retrieve youtube-dl path '''
path = self.opt_manager.options['youtubedl_path']
path = fix_path(path) + get_youtubedl_filename()
return path