Basic framework for simultaneous download of multiple formats (#1036)

Authored by: nao20010128nao
This commit is contained in:
The Hatsune Daishi
2021-09-22 23:12:04 +09:00
committed by GitHub
parent c12977bdc4
commit bd50a52b0d
5 changed files with 224 additions and 7 deletions

View File

@@ -16,6 +16,11 @@ from ..utils import (
shell_quote,
timeconvert,
)
from ..minicurses import (
MultilinePrinter,
QuietMultilinePrinter,
BreaklineStatusPrinter
)
class FileDownloader(object):
@@ -68,6 +73,7 @@ class FileDownloader(object):
self.ydl = ydl
self._progress_hooks = []
self.params = params
self._multiline = None
self.add_progress_hook(self.report_progress)
@staticmethod
@@ -236,12 +242,28 @@ class FileDownloader(object):
"""Report destination filename."""
self.to_screen('[download] Destination: ' + filename)
def _report_progress_status(self, msg, is_last_line=False):
def _prepare_multiline_status(self, lines):
if self.params.get('quiet'):
self._multiline = QuietMultilinePrinter()
elif self.params.get('progress_with_newline', False):
self._multiline = BreaklineStatusPrinter(sys.stderr, lines)
elif self.params.get('noprogress', False):
self._multiline = None
else:
self._multiline = MultilinePrinter(sys.stderr, lines)
def _finish_multiline_status(self):
if self._multiline is not None:
self._multiline.end()
def _report_progress_status(self, msg, is_last_line=False, progress_line=None):
fullmsg = '[download] ' + msg
if self.params.get('progress_with_newline', False):
self.to_screen(fullmsg)
elif progress_line is not None and self._multiline is not None:
self._multiline.print_at_line(fullmsg, progress_line)
else:
if compat_os_name == 'nt':
if compat_os_name == 'nt' or not sys.stderr.isatty():
prev_len = getattr(self, '_report_progress_prev_line_length',
0)
if prev_len > len(fullmsg):
@@ -249,7 +271,7 @@ class FileDownloader(object):
self._report_progress_prev_line_length = len(fullmsg)
clear_line = '\r'
else:
clear_line = ('\r\x1b[K' if sys.stderr.isatty() else '\r')
clear_line = '\r\x1b[K'
self.to_screen(clear_line + fullmsg, skip_eol=not is_last_line)
self.to_console_title('yt-dlp ' + msg)
@@ -266,7 +288,8 @@ class FileDownloader(object):
s['_elapsed_str'] = self.format_seconds(s['elapsed'])
msg_template += ' in %(_elapsed_str)s'
self._report_progress_status(
msg_template % s, is_last_line=True)
msg_template % s, progress_line=s.get('progress_idx'))
return
if self.params.get('noprogress'):
return
@@ -311,7 +334,7 @@ class FileDownloader(object):
else:
msg_template = '%(_percent_str)s % at %(_speed_str)s ETA %(_eta_str)s'
self._report_progress_status(msg_template % s)
self._report_progress_status(msg_template % s, progress_line=s.get('progress_idx'))
def report_resuming_byte(self, resume_len):
"""Report attempt to resume at given byte."""

View File

@@ -3,6 +3,7 @@ from __future__ import division, unicode_literals
import os
import time
import json
from math import ceil
try:
import concurrent.futures
@@ -120,6 +121,7 @@ class FragmentFD(FileDownloader):
'url': frag_url,
'http_headers': headers or info_dict.get('http_headers'),
'request_data': request_data,
'ctx_id': ctx.get('ctx_id'),
}
success = ctx['dl'].download(fragment_filename, fragment_info_dict)
if not success:
@@ -219,6 +221,7 @@ class FragmentFD(FileDownloader):
def _start_frag_download(self, ctx, info_dict):
resume_len = ctx['complete_frags_downloaded_bytes']
total_frags = ctx['total_frags']
ctx_id = ctx.get('ctx_id')
# This dict stores the download progress, it's updated by the progress
# hook
state = {
@@ -242,6 +245,12 @@ class FragmentFD(FileDownloader):
if s['status'] not in ('downloading', 'finished'):
return
if ctx_id is not None and s.get('ctx_id') != ctx_id:
return
state['max_progress'] = ctx.get('max_progress')
state['progress_idx'] = ctx.get('progress_idx')
time_now = time.time()
state['elapsed'] = time_now - start
frag_total_bytes = s.get('total_bytes') or 0
@@ -301,6 +310,9 @@ class FragmentFD(FileDownloader):
'filename': ctx['filename'],
'status': 'finished',
'elapsed': elapsed,
'ctx_id': ctx.get('ctx_id'),
'max_progress': ctx.get('max_progress'),
'progress_idx': ctx.get('progress_idx'),
}, info_dict)
def _prepare_external_frag_download(self, ctx):
@@ -347,7 +359,44 @@ class FragmentFD(FileDownloader):
return decrypt_fragment
def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None):
def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
'''
@params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
all args must be either tuple or list
'''
max_progress = len(args)
if max_progress == 1:
return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
self._prepare_multiline_status(max_progress)
def thread_func(idx, ctx, fragments, info_dict, tpe):
ctx['max_progress'] = max_progress
ctx['progress_idx'] = idx
return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
class FTPE(concurrent.futures.ThreadPoolExecutor):
# has to stop this or it's going to wait on the worker thread itself
def __exit__(self, exc_type, exc_val, exc_tb):
pass
spins = []
for idx, (ctx, fragments, info_dict) in enumerate(args):
tpe = FTPE(ceil(max_workers / max_progress))
job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
spins.append((tpe, job))
result = True
for tpe, job in spins:
try:
result = result and job.result()
finally:
tpe.shutdown(wait=True)
self._finish_multiline_status()
return True
def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
fragment_retries = self.params.get('fragment_retries', 0)
is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
if not pack_func:
@@ -416,7 +465,7 @@ class FragmentFD(FileDownloader):
return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized')
self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
ctx['fragment_filename_sanitized'] = frag_filename
ctx['fragment_index'] = frag_index

View File

@@ -310,6 +310,7 @@ class HttpFD(FileDownloader):
'eta': eta,
'speed': speed,
'elapsed': now - ctx.start_time,
'ctx_id': info_dict.get('ctx_id'),
}, info_dict)
if data_len is not None and byte_counter == data_len:
@@ -357,6 +358,7 @@ class HttpFD(FileDownloader):
'filename': ctx.filename,
'status': 'finished',
'elapsed': time.time() - ctx.start_time,
'ctx_id': info_dict.get('ctx_id'),
}, info_dict)
return True