You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
195 lines
5.0 KiB
195 lines
5.0 KiB
#! /usr/bin/env python
|
|
|
|
import subprocess
|
|
from time import sleep
|
|
from threading import Thread
|
|
|
|
from wx import CallAfter
|
|
from wx.lib.pubsub import setuparg1
|
|
from wx.lib.pubsub import pub as Publisher
|
|
|
|
from .SignalHandler import (
|
|
DataPack,
|
|
OutputHandler
|
|
)
|
|
|
|
from .Utils import (
|
|
get_encoding,
|
|
encode_list,
|
|
remove_file,
|
|
get_os_type,
|
|
file_exist
|
|
)
|
|
|
|
MAX_DOWNLOAD_THREADS = 3
|
|
PUBLISHER_TOPIC = 'download'
|
|
|
|
class DownloadManager(Thread):
|
|
|
|
def __init__(self, options, downloadlist, clear_dash_files, logmanager=None):
|
|
super(DownloadManager, self).__init__()
|
|
self.clear_dash_files = clear_dash_files
|
|
self.downloadlist = downloadlist
|
|
self.logmanager = logmanager
|
|
self.options = options
|
|
self.running = True
|
|
self.procList = []
|
|
self.procNo = 0
|
|
self.start()
|
|
|
|
def run(self):
|
|
while self.running:
|
|
if self.downloadlist:
|
|
# Extract url, index from data
|
|
url, index = self.extract_data()
|
|
# Wait for your turn if there are not more positions in 'queue'
|
|
while self.procNo >= MAX_DOWNLOAD_THREADS:
|
|
proc = self.check_queue()
|
|
if proc != None:
|
|
self.procList.remove(proc)
|
|
self.procNo -= 1
|
|
sleep(1)
|
|
# If we still running create new ProcessWrapper thread
|
|
if self.running:
|
|
self.procList.append(
|
|
ProcessWrapper(
|
|
self.options,
|
|
url,
|
|
index,
|
|
self.clear_dash_files,
|
|
self.logmanager
|
|
)
|
|
)
|
|
self.procNo += 1
|
|
else:
|
|
# Return True if at least one process is alive else return False
|
|
if not self.downloading():
|
|
self.running = False
|
|
else:
|
|
sleep(0.1)
|
|
# If we reach here close down all child threads
|
|
self.terminate_all()
|
|
CallAfter(Publisher.sendMessage, PUBLISHER_TOPIC, DataPack('finish'))
|
|
|
|
def downloading(self):
|
|
for proc in self.procList:
|
|
if proc.isAlive():
|
|
return True
|
|
return False
|
|
|
|
def _add_download_item(self, downloadItem):
|
|
self.downloadlist.append(downloadItem)
|
|
|
|
def extract_data(self):
|
|
data = self.downloadlist.pop(0)
|
|
url = data['url']
|
|
index = data['index']
|
|
return url, index
|
|
|
|
def terminate_all(self):
|
|
for proc in self.procList:
|
|
if proc.isAlive():
|
|
proc.close()
|
|
proc.join()
|
|
|
|
def check_queue(self):
|
|
for proc in self.procList:
|
|
if not self.running: break
|
|
if not proc.isAlive():
|
|
return proc
|
|
return None
|
|
|
|
def close(self):
|
|
self.procNo = 0
|
|
self.running = False
|
|
CallAfter(Publisher.sendMessage, PUBLISHER_TOPIC, DataPack('close'))
|
|
|
|
class ProcessWrapper(Thread):
|
|
|
|
def __init__(self, options, url, index, clear_dash_files, log=None):
|
|
super(ProcessWrapper, self).__init__()
|
|
self.clear_dash_files = clear_dash_files
|
|
self.options = options
|
|
self.index = index
|
|
self.log = log
|
|
self.url = url
|
|
self.filenames = []
|
|
self.stopped = False
|
|
self.error = False
|
|
self.proc = None
|
|
self.start()
|
|
|
|
def run(self):
|
|
self.proc = self.create_process(self.get_cmd(), self.get_process_info())
|
|
# while subprocess is alive and NOT the current thread
|
|
while self.proc_is_alive():
|
|
# read stdout, stderr from proc
|
|
stdout, stderr = self.read()
|
|
if stdout != '':
|
|
# pass stdout to output handler
|
|
data = OutputHandler(stdout).get_data()
|
|
if self.clear_dash_files: self.add_file(data)
|
|
# add index to data pack
|
|
data.index = self.index
|
|
# send data back to caller
|
|
CallAfter(Publisher.sendMessage, PUBLISHER_TOPIC, data)
|
|
if stderr != '':
|
|
self.error = True
|
|
self.write_to_log(stderr)
|
|
if not self.stopped:
|
|
if self.clear_dash_files:
|
|
self.clear_dash()
|
|
if not self.error:
|
|
CallAfter(Publisher.sendMessage, PUBLISHER_TOPIC, DataPack('finish', self.index))
|
|
else:
|
|
CallAfter(Publisher.sendMessage, PUBLISHER_TOPIC, DataPack('error', self.index))
|
|
|
|
def add_file(self, dataPack):
|
|
if dataPack.header == 'filename':
|
|
self.filenames.append(dataPack.data)
|
|
|
|
def write_to_log(self, data):
|
|
if self.log != None:
|
|
self.log.write(data)
|
|
|
|
def clear_dash(self):
|
|
CallAfter(Publisher.sendMessage, PUBLISHER_TOPIC, DataPack('remove', self.index))
|
|
for f in self.filenames:
|
|
if file_exist(f):
|
|
remove_file(f)
|
|
|
|
def close(self):
|
|
self.proc.kill()
|
|
self.stopped = True
|
|
CallAfter(Publisher.sendMessage, PUBLISHER_TOPIC, DataPack('close', self.index))
|
|
|
|
def proc_is_alive(self):
|
|
return self.proc.poll() == None
|
|
|
|
def read(self):
|
|
stdout = ''
|
|
stderr = ''
|
|
stdout = self.proc.stdout.readline()
|
|
if stdout == '':
|
|
stderr = self.proc.stderr.readline()
|
|
return stdout.rstrip(), stderr.rstrip()
|
|
|
|
def create_process(self, cmd, info):
|
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=info)
|
|
|
|
def get_cmd(self):
|
|
enc = get_encoding()
|
|
if enc != None:
|
|
cmd = encode_list(self.options + [self.url], enc)
|
|
else:
|
|
cmd = self.options + [self.url]
|
|
return cmd
|
|
|
|
def get_process_info(self):
|
|
if get_os_type() == 'nt':
|
|
info = subprocess.STARTUPINFO()
|
|
info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
return info
|
|
else:
|
|
return None
|
|
|