You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

759 lines
23 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. #!/usr/bin/env python2
  2. # -*- coding: utf-8 -*-
  3. """Youtubedlg module for managing the download process.
  4. This module is responsible for managing the download process
  5. and update the GUI interface.
  6. Attributes:
  7. MANAGER_PUB_TOPIC (string): wxPublisher subscription topic of the
  8. DownloadManager thread.
  9. WORKER_PUB_TOPIC (string): wxPublisher subscription topic of the
  10. Worker thread.
  11. Note:
  12. It's not the actual module that downloads the urls
  13. thats the job of the 'downloaders' module.
  14. """
  15. from __future__ import unicode_literals
  16. import time
  17. import os.path
  18. from threading import (
  19. Thread,
  20. RLock,
  21. Lock
  22. )
  23. from wx import CallAfter
  24. from wx.lib.pubsub import setuparg1
  25. from wx.lib.pubsub import pub as Publisher
  26. from .parsers import OptionsParser
  27. from .updatemanager import UpdateThread
  28. from .downloaders import YoutubeDLDownloader
  29. from .utils import (
  30. YOUTUBEDL_BIN,
  31. os_path_exists,
  32. format_bytes,
  33. to_string,
  34. to_bytes
  35. )
  36. MANAGER_PUB_TOPIC = 'dlmanager'
  37. WORKER_PUB_TOPIC = 'dlworker'
  38. _SYNC_LOCK = RLock()
  39. # Decorator that adds thread synchronization to a function
  40. def synchronized(lock):
  41. def _decorator(func):
  42. def _wrapper(*args, **kwargs):
  43. lock.acquire()
  44. ret_value = func(*args, **kwargs)
  45. lock.release()
  46. return ret_value
  47. return _wrapper
  48. return _decorator
  49. class DownloadItem(object):
  50. """Object that represents a download.
  51. Attributes:
  52. STAGES (tuple): Main stages of the download item.
  53. ACTIVE_STAGES (tuple): Sub stages of the 'Active' stage.
  54. COMPLETED_STAGES (tuple): Sub stages of the 'Completed' stage.
  55. ERROR_STAGES (tuple): Sub stages of the 'Error' stage.
  56. Args:
  57. url (string): URL that corresponds to the download item.
  58. options (list): Options list to use during the download phase.
  59. """
  60. STAGES = ("Queued", "Active", "Paused", "Completed", "Error")
  61. ACTIVE_STAGES = ("Pre Processing", "Downloading", "Post Processing")
  62. COMPLETED_STAGES = ("Finished", "Warning", "Already Downloaded")
  63. ERROR_STAGES = ("Error", "Stopped", "Filesize Abort")
  64. def __init__(self, url, options):
  65. self.url = url
  66. self.options = options
  67. self.object_id = hash(url + to_string(options))
  68. self.reset()
  69. @property
  70. def stage(self):
  71. return self._stage
  72. @stage.setter
  73. def stage(self, value):
  74. if value not in self.STAGES:
  75. raise ValueError(value)
  76. if value == "Queued":
  77. self.progress_stats["status"] = value
  78. if value == "Active":
  79. self.progress_stats["status"] = self.ACTIVE_STAGES[0]
  80. if value == "Completed":
  81. self.progress_stats["status"] = self.COMPLETED_STAGES[0]
  82. if value == "Paused":
  83. self.progress_stats["status"] = value
  84. if value == "Error":
  85. self.progress_stats["status"] = self.ERROR_STAGES[0]
  86. self._stage = value
  87. def reset(self):
  88. if hasattr(self, "_stage") and self._stage == self.STAGES[1]:
  89. raise RuntimeError("Cannot reset an 'Active' item")
  90. self._stage = self.STAGES[0]
  91. self.path = ""
  92. self.filenames = []
  93. self.extensions = []
  94. self.filesizes = []
  95. self.default_values = {
  96. "filename": self.url,
  97. "extension": "-",
  98. "filesize": "-",
  99. "percent": "0%",
  100. "speed": "-",
  101. "eta": "-",
  102. "status": self.stage,
  103. "playlist_size": "",
  104. "playlist_index": ""
  105. }
  106. self.progress_stats = dict(self.default_values)
  107. # Keep track when the 'playlist_index' changes
  108. self.playlist_index_changed = False
  109. def get_files(self):
  110. """Returns a list that contains all the system files bind to this object."""
  111. files = []
  112. for index, item in enumerate(self.filenames):
  113. filename = item + self.extensions[index]
  114. files.append(os.path.join(self.path, filename))
  115. return files
  116. def update_stats(self, stats_dict):
  117. """Updates the progress_stats dict from the given dictionary."""
  118. assert isinstance(stats_dict, dict)
  119. for key in stats_dict:
  120. if key in self.progress_stats:
  121. value = stats_dict[key]
  122. if not isinstance(value, basestring) or not value:
  123. self.progress_stats[key] = self.default_values[key]
  124. else:
  125. self.progress_stats[key] = value
  126. # Extract extra stuff
  127. if "playlist_index" in stats_dict:
  128. self.playlist_index_changed = True
  129. if "filename" in stats_dict:
  130. # Reset filenames, extensions & filesizes lists when changing playlist item
  131. if self.playlist_index_changed:
  132. self.filenames = []
  133. self.extensions = []
  134. self.filesizes = []
  135. self.playlist_index_changed = False
  136. self.filenames.append(stats_dict["filename"])
  137. if "extension" in stats_dict:
  138. self.extensions.append(stats_dict["extension"])
  139. if "path" in stats_dict:
  140. self.path = stats_dict["path"]
  141. if "filesize" in stats_dict:
  142. if stats_dict["percent"] == "100%" and len(self.filesizes) < len(self.filenames):
  143. filesize = stats_dict["filesize"].lstrip("~") # HLS downloader etc
  144. self.filesizes.append(to_bytes(filesize))
  145. if "status" in stats_dict:
  146. # If we are post processing try to calculate the size of
  147. # the output file since youtube-dl does not
  148. if stats_dict["status"] == self.ACTIVE_STAGES[2] and len(self.filesizes) == 2:
  149. post_proc_filesize = self.filesizes[0] + self.filesizes[1]
  150. self.filesizes.append(post_proc_filesize)
  151. self.progress_stats["filesize"] = format_bytes(post_proc_filesize)
  152. self._set_stage(stats_dict["status"])
  153. def _set_stage(self, status):
  154. if status in self.ACTIVE_STAGES:
  155. self._stage = self.STAGES[1]
  156. if status in self.COMPLETED_STAGES:
  157. self._stage = self.STAGES[3]
  158. if status in self.ERROR_STAGES:
  159. self._stage = self.STAGES[4]
  160. def __eq__(self, other):
  161. return self.object_id == other.object_id
  162. class DownloadList(object):
  163. """List like data structure that contains DownloadItems.
  164. Args:
  165. items (list): List that contains DownloadItems.
  166. """
  167. def __init__(self, items=None):
  168. assert isinstance(items, list) or items is None
  169. if items is None:
  170. self._items_dict = {} # Speed up lookup
  171. self._items_list = [] # Keep the sequence
  172. else:
  173. self._items_list = [item.object_id for item in items]
  174. self._items_dict = {item.object_id: item for item in items}
  175. @synchronized(_SYNC_LOCK)
  176. def clear(self):
  177. """Removes all the items from the list even the 'Active' ones."""
  178. self._items_list = []
  179. self._items_dict = {}
  180. @synchronized(_SYNC_LOCK)
  181. def insert(self, item):
  182. """Inserts the given item to the list. Does not check for duplicates. """
  183. self._items_list.append(item.object_id)
  184. self._items_dict[item.object_id] = item
  185. @synchronized(_SYNC_LOCK)
  186. def remove(self, object_id):
  187. """Removes an item from the list.
  188. Removes the item with the corresponding object_id from
  189. the list if the item is not in 'Active' state.
  190. Returns:
  191. True on success else False.
  192. """
  193. if self._items_dict[object_id].stage != "Active":
  194. self._items_list.remove(object_id)
  195. del self._items_dict[object_id]
  196. return True
  197. return False
  198. @synchronized(_SYNC_LOCK)
  199. def fetch_next(self):
  200. """Returns the next queued item on the list.
  201. Returns:
  202. Next queued item or None if no other item exist.
  203. """
  204. for object_id in self._items_list:
  205. cur_item = self._items_dict[object_id]
  206. if cur_item.stage == "Queued":
  207. return cur_item
  208. return None
  209. @synchronized(_SYNC_LOCK)
  210. def move_up(self, object_id):
  211. """Moves the item with the corresponding object_id up to the list."""
  212. index = self._items_list.index(object_id)
  213. if index > 0:
  214. self._swap(index, index - 1)
  215. return True
  216. return False
  217. @synchronized(_SYNC_LOCK)
  218. def move_down(self, object_id):
  219. """Moves the item with the corresponding object_id down to the list."""
  220. index = self._items_list.index(object_id)
  221. if index < (len(self._items_list) - 1):
  222. self._swap(index, index + 1)
  223. return True
  224. return False
  225. @synchronized(_SYNC_LOCK)
  226. def get_item(self, object_id):
  227. """Returns the DownloadItem with the given object_id."""
  228. return self._items_dict[object_id]
  229. @synchronized(_SYNC_LOCK)
  230. def has_item(self, object_id):
  231. """Returns True if the given object_id is in the list else False."""
  232. return object_id in self._items_list
  233. @synchronized(_SYNC_LOCK)
  234. def get_items(self):
  235. """Returns a list with all the items."""
  236. return [self._items_dict[object_id] for object_id in self._items_list]
  237. @synchronized(_SYNC_LOCK)
  238. def change_stage(self, object_id, new_stage):
  239. """Change the stage of the item with the given object_id."""
  240. self._items_dict[object_id].stage = new_stage
  241. @synchronized(_SYNC_LOCK)
  242. def index(self, object_id):
  243. """Get the zero based index of the item with the given object_id."""
  244. if object_id in self._items_list:
  245. return self._items_list.index(object_id)
  246. return -1
  247. @synchronized(_SYNC_LOCK)
  248. def __len__(self):
  249. return len(self._items_list)
  250. def _swap(self, index1, index2):
  251. self._items_list[index1], self._items_list[index2] = self._items_list[index2], self._items_list[index1]
  252. class DownloadManager(Thread):
  253. """Manages the download process.
  254. Attributes:
  255. WAIT_TIME (float): Time in seconds to sleep.
  256. Args:
  257. download_list (DownloadList): List that contains items to download.
  258. opt_manager (optionsmanager.OptionsManager): Object responsible for
  259. managing the youtubedlg options.
  260. log_manager (logmanager.LogManager): Object responsible for writing
  261. errors to the log.
  262. """
  263. WAIT_TIME = 0.1
  264. def __init__(self, parent, download_list, opt_manager, log_manager=None):
  265. super(DownloadManager, self).__init__()
  266. self.parent = parent
  267. self.opt_manager = opt_manager
  268. self.log_manager = log_manager
  269. self.download_list = download_list
  270. self._time_it_took = 0
  271. self._successful = 0
  272. self._running = True
  273. # Init the custom workers thread pool
  274. log_lock = None if log_manager is None else Lock()
  275. wparams = (opt_manager, self._youtubedl_path(), log_manager, log_lock)
  276. self._workers = [Worker(*wparams) for _ in xrange(opt_manager.options["workers_number"])]
  277. self.start()
  278. @property
  279. def successful(self):
  280. """Returns number of successful downloads. """
  281. return self._successful
  282. @property
  283. def time_it_took(self):
  284. """Returns time(seconds) it took for the download process
  285. to complete. """
  286. return self._time_it_took
  287. def run(self):
  288. if not self.opt_manager.options["disable_update"]:
  289. self._check_youtubedl()
  290. self._time_it_took = time.time()
  291. while self._running:
  292. item = self.download_list.fetch_next()
  293. if item is not None:
  294. worker = self._get_worker()
  295. if worker is not None:
  296. worker.download(item.url, item.options, item.object_id)
  297. self.download_list.change_stage(item.object_id, "Active")
  298. if item is None and self._jobs_done():
  299. break
  300. time.sleep(self.WAIT_TIME)
  301. # Close all the workers
  302. for worker in self._workers:
  303. worker.close()
  304. # Join and collect
  305. for worker in self._workers:
  306. worker.join()
  307. self._successful += worker.successful
  308. self._time_it_took = time.time() - self._time_it_took
  309. if not self._running:
  310. self._talk_to_gui('closed')
  311. else:
  312. self._talk_to_gui('finished')
  313. def active(self):
  314. """Returns number of active items.
  315. Note:
  316. active_items = (workers that work) + (items waiting in the url_list).
  317. """
  318. #counter = 0
  319. #for worker in self._workers:
  320. #if not worker.available():
  321. #counter += 1
  322. #counter += len(self.download_list)
  323. return len(self.download_list)
  324. def stop_downloads(self):
  325. """Stop the download process. Also send 'closing'
  326. signal back to the GUI.
  327. Note:
  328. It does NOT kill the workers thats the job of the
  329. clean up task in the run() method.
  330. """
  331. self._talk_to_gui('closing')
  332. self._running = False
  333. def add_url(self, url):
  334. """Add given url to the download_list.
  335. Args:
  336. url (dict): Python dictionary that contains two keys.
  337. The url and the index of the corresponding row in which
  338. the worker should send back the information about the
  339. download process.
  340. """
  341. self.download_list.append(url)
  342. def send_to_worker(self, data):
  343. """Send data to the Workers.
  344. Args:
  345. data (dict): Python dictionary that holds the 'index'
  346. which is used to identify the Worker thread and the data which
  347. can be any of the Worker's class valid data. For a list of valid
  348. data keys see __init__() under the Worker class.
  349. """
  350. if 'index' in data:
  351. for worker in self._workers:
  352. if worker.has_index(data['index']):
  353. worker.update_data(data)
  354. def _talk_to_gui(self, data):
  355. """Send data back to the GUI using wxCallAfter and wxPublisher.
  356. Args:
  357. data (string): Unique signal string that informs the GUI for the
  358. download process.
  359. Note:
  360. DownloadManager supports 4 signals.
  361. 1) closing: The download process is closing.
  362. 2) closed: The download process has closed.
  363. 3) finished: The download process was completed normally.
  364. 4) report_active: Signal the gui to read the number of active
  365. downloads using the active() method.
  366. """
  367. CallAfter(Publisher.sendMessage, MANAGER_PUB_TOPIC, data)
  368. def _check_youtubedl(self):
  369. """Check if youtube-dl binary exists. If not try to download it. """
  370. if not os_path_exists(self._youtubedl_path()) and self.parent.update_thread is None:
  371. self.parent.update_thread = UpdateThread(self.opt_manager.options['youtubedl_path'], True)
  372. self.parent.update_thread.join()
  373. self.parent.update_thread = None
  374. def _get_worker(self):
  375. for worker in self._workers:
  376. if worker.available():
  377. return worker
  378. return None
  379. def _jobs_done(self):
  380. """Returns True if the workers have finished their jobs else False. """
  381. for worker in self._workers:
  382. if not worker.available():
  383. return False
  384. return True
  385. def _youtubedl_path(self):
  386. """Returns the path to youtube-dl binary. """
  387. path = self.opt_manager.options['youtubedl_path']
  388. path = os.path.join(path, YOUTUBEDL_BIN)
  389. return path
  390. class Worker(Thread):
  391. """Simple worker which downloads the given url using a downloader
  392. from the downloaders.py module.
  393. Attributes:
  394. WAIT_TIME (float): Time in seconds to sleep.
  395. Args:
  396. opt_manager (optionsmanager.OptionsManager): Check DownloadManager
  397. description.
  398. youtubedl (string): Absolute path to youtube-dl binary.
  399. log_manager (logmanager.LogManager): Check DownloadManager
  400. description.
  401. log_lock (threading.Lock): Synchronization lock for the log_manager.
  402. If the log_manager is set (not None) then the caller has to make
  403. sure that the log_lock is also set.
  404. Note:
  405. For available data keys see self._data under the __init__() method.
  406. """
  407. WAIT_TIME = 0.1
  408. def __init__(self, opt_manager, youtubedl, log_manager=None, log_lock=None):
  409. super(Worker, self).__init__()
  410. self.opt_manager = opt_manager
  411. self.log_manager = log_manager
  412. self.log_lock = log_lock
  413. self._downloader = YoutubeDLDownloader(youtubedl, self._data_hook, self._log_data)
  414. self._options_parser = OptionsParser()
  415. self._successful = 0
  416. self._running = True
  417. self._options = None
  418. self._wait_for_reply = False
  419. self._data = {
  420. 'playlist_index': None,
  421. 'playlist_size': None,
  422. 'new_filename': None,
  423. 'extension': None,
  424. 'filesize': None,
  425. 'filename': None,
  426. 'percent': None,
  427. 'status': None,
  428. 'index': None,
  429. 'speed': None,
  430. 'path': None,
  431. 'eta': None,
  432. 'url': None
  433. }
  434. self.start()
  435. def run(self):
  436. while self._running:
  437. if self._data['url'] is not None:
  438. #options = self._options_parser.parse(self.opt_manager.options)
  439. ret_code = self._downloader.download(self._data['url'], self._options)
  440. if (ret_code == YoutubeDLDownloader.OK or
  441. ret_code == YoutubeDLDownloader.ALREADY or
  442. ret_code == YoutubeDLDownloader.WARNING):
  443. self._successful += 1
  444. # Ask GUI for name updates
  445. #self._talk_to_gui('receive', {'source': 'filename', 'dest': 'new_filename'})
  446. # Wait until you get a reply
  447. #while self._wait_for_reply:
  448. #time.sleep(self.WAIT_TIME)
  449. self._reset()
  450. time.sleep(self.WAIT_TIME)
  451. # Call the destructor function of YoutubeDLDownloader object
  452. self._downloader.close()
  453. def download(self, url, options, object_id):
  454. """Download given item.
  455. Args:
  456. item (dict): Python dictionary that contains two keys.
  457. The url and the index of the corresponding row in which
  458. the worker should send back the information about the
  459. download process.
  460. """
  461. self._data['url'] = url
  462. self._options = options
  463. self._data['index'] = object_id
  464. def stop_download(self):
  465. """Stop the download process of the worker. """
  466. self._downloader.stop()
  467. def close(self):
  468. """Kill the worker after stopping the download process. """
  469. self._running = False
  470. self._downloader.stop()
  471. def available(self):
  472. """Return True if the worker has no job else False. """
  473. return self._data['url'] is None
  474. def has_index(self, index):
  475. """Return True if index is equal to self._data['index'] else False. """
  476. return self._data['index'] == index
  477. def update_data(self, data):
  478. """Update self._data from the given data. """
  479. if self._wait_for_reply:
  480. # Update data only if a receive request has been issued
  481. for key in data:
  482. self._data[key] = data[key]
  483. self._wait_for_reply = False
  484. @property
  485. def successful(self):
  486. """Return the number of successful downloads for current worker. """
  487. return self._successful
  488. def _reset(self):
  489. """Reset self._data back to the original state. """
  490. for key in self._data:
  491. self._data[key] = None
  492. def _log_data(self, data):
  493. """Callback method for self._downloader.
  494. This method is used to write the given data in a synchronized way
  495. to the log file using the self.log_manager and the self.log_lock.
  496. Args:
  497. data (string): String to write to the log file.
  498. """
  499. if self.log_manager is not None:
  500. self.log_lock.acquire()
  501. self.log_manager.log(data)
  502. self.log_lock.release()
  503. def _data_hook(self, data):
  504. """Callback method for self._downloader.
  505. This method updates self._data and sends the updates back to the
  506. GUI using the self._talk_to_gui() method.
  507. Args:
  508. data (dict): Python dictionary which contains information
  509. about the download process. For more info see the
  510. extract_data() function under the downloaders.py module.
  511. """
  512. ## Temp dictionary which holds the updates
  513. #temp_dict = {}
  514. ## Update each key
  515. #for key in data:
  516. #if self._data[key] != data[key]:
  517. #self._data[key] = data[key]
  518. #temp_dict[key] = data[key]
  519. ## Build the playlist status if there is an update
  520. ## REFACTOR re-implement this on DownloadItem or ListCtrl level?
  521. ##if self._data['playlist_index'] is not None:
  522. ##if 'status' in temp_dict or 'playlist_index' in temp_dict:
  523. ##temp_dict['status'] = '{status} {index}/{size}'.format(
  524. ##status=self._data['status'],
  525. ##index=self._data['playlist_index'],
  526. ##size=self._data['playlist_size']
  527. ##)
  528. #if len(temp_dict):
  529. #self._talk_to_gui('send', temp_dict)
  530. self._talk_to_gui('send', data)
  531. def _talk_to_gui(self, signal, data):
  532. """Communicate with the GUI using wxCallAfter and wxPublisher.
  533. Send/Ask data to/from the GUI. Note that if the signal is 'receive'
  534. then the Worker will wait until it receives a reply from the GUI.
  535. Args:
  536. signal (string): Unique string that informs the GUI about the
  537. communication procedure.
  538. data (dict): Python dictionary which holds the data to be sent
  539. back to the GUI. If the signal is 'send' then the dictionary
  540. contains the updates for the GUI (e.g. percentage, eta). If
  541. the signal is 'receive' then the dictionary contains exactly
  542. three keys. The 'index' (row) from which we want to retrieve
  543. the data, the 'source' which identifies a column in the
  544. wxListCtrl widget and the 'dest' which tells the wxListCtrl
  545. under which key to store the retrieved data.
  546. Note:
  547. Worker class supports 2 signals.
  548. 1) send: The Worker sends data back to the GUI
  549. (e.g. Send status updates).
  550. 2) receive: The Worker asks data from the GUI
  551. (e.g. Receive the name of a file).
  552. Structure:
  553. ('send', {'index': <item_row>, data_to_send*})
  554. ('receive', {'index': <item_row>, 'source': 'source_key', 'dest': 'destination_key'})
  555. """
  556. data['index'] = self._data['index']
  557. if signal == 'receive':
  558. self._wait_for_reply = True
  559. CallAfter(Publisher.sendMessage, WORKER_PUB_TOPIC, (signal, data))