跳转至

批量下载

littledl 支持多文件批量下载,针对大量小文件、大文件或混合场景进行了专门优化。

核心特性

  • 自适应并发:根据网络状况动态调整同时下载的文件数
  • 小文件优先:自动识别小文件并优先处理,提升用户体验
  • 连接复用:所有文件共享连接池,减少连接建立开销
  • 批量Probe:并行发送 HEAD 请求获取文件信息
  • 智能分块:根据文件大小自动选择最优分块策略

快速开始

同步批量下载

from littledl import batch_download_sync

results = batch_download_sync(
    urls=[
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
    ],
    save_path="./downloads",
    max_concurrent_files=5,
)

for url, path, error in results:
    if path:
        print(f"✓ {url} -> {path}")
    else:
        print(f"✗ {url}: {error}")

异步批量下载

import asyncio
from littledl import BatchDownloader

async def main():
    downloader = BatchDownloader(
        max_concurrent_files=5,
        max_concurrent_chunks_per_file=4,
        enable_adaptive_concurrency=True,
    )

    await downloader.add_urls([
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
    ], "./downloads")

    await downloader.start()

asyncio.run(main())

进度回调

批量进度回调

import asyncio
from littledl import BatchDownloader

def on_batch_progress(completed: int, total: int, speed: float, eta: int):
    print(f"批量进度: {completed}/{total} | 速度: {speed/1024/1024:.1f} MB/s | 预计剩余: {eta}s")

downloader = BatchDownloader()
downloader.set_progress_callback(on_batch_progress)

单文件完成回调

from littledl import FileTask

def on_file_complete(task: FileTask):
    print(f"文件完成: {task.filename} ({task.file_size} bytes)")

downloader = BatchDownloader()
downloader.set_file_complete_callback(on_file_complete)

高级配置

自适应并发控制

默认启用自适应并发控制,系统会根据下载速度自动调整并发数:

  • 速度持续下降 → 增加并发利用更多带宽
  • 速度稳定上升 → 维持或增加并发
  • 错误率上升 → 自动降低并发
downloader = BatchDownloader(
    enable_adaptive_concurrency=True,
    max_concurrent_files=10,
)

手动并发控制

禁用自适应模式,手动设置固定并发数:

downloader = BatchDownloader(
    enable_adaptive_concurrency=False,
    max_concurrent_files=3,
)

文件优先级

支持手动设置文件下载优先级:

downloader = BatchDownloader()

# 添加文件时可指定优先级(数字越小优先级越高)
await downloader.add_url(url1, priority=0)  # 高优先级
await downloader.add_url(url2, priority=1)  # 普通优先级
await downloader.add_url(url3, priority=2)  # 低优先级

智能分块策略

系统会根据文件大小自动选择最优分块策略:

文件大小 分块策略 说明
< 5 MB 单分块 避免分片开销
5 MB ~ 100 MB 4 分块 平衡并发和开销
> 100 MB 8 分块 最大化吞吐

获取下载状态

获取所有任务

downloader = BatchDownloader()
await downloader.add_urls(urls, "./downloads")
await downloader.start()

tasks = downloader.get_all_tasks()
for task in tasks:
    print(f"{task.filename}: {task.status.value} ({task.progress:.1f}%)")

获取统计信息

stats = downloader.get_stats()
print(f"总文件数: {stats['total_files']}")
print(f"已完成: {stats['completed_files']}")
print(f"失败: {stats['failed_files']}")
print(f"当前并发: {stats['current_concurrency']}")
print(f"总进度: {stats['progress_percent']:.1f}%")

获取批量进度

progress = downloader.get_progress()
print(f"总大小: {progress.total_bytes / 1024 / 1024:.1f} MB")
print(f"已下载: {progress.downloaded_bytes / 1024 / 1024:.1f} MB")
print(f"速度: {progress.overall_speed / 1024 / 1024:.1f} MB/s")
print(f"预计剩余: {progress.eta:.0f}s")

暂停、恢复和取消

downloader = BatchDownloader()
await downloader.add_urls(urls, "./downloads")

# 启动下载
task = asyncio.create_task(downloader.start())

# 暂停
await asyncio.sleep(5)
await downloader.pause()

# 恢复
await asyncio.sleep(2)
await downloader.resume()

# 取消
await asyncio.sleep(5)
await downloader.cancel()

# 等待完成
await task

API 参考

BatchDownloader

class BatchDownloader:
    def __init__(
        self,
        config: DownloadConfig | None = None,
        max_concurrent_files: int = 5,
        max_concurrent_chunks_per_file: int = 4,
        enable_adaptive_concurrency: bool = True,
        enable_small_file_priority: bool = True,
    ) -> None:
        ...

    async def add_url(
        self,
        url: str,
        save_path: str | Path = "./downloads",
        filename: str | None = None,
        priority: int = 0,
    ) -> str:
        """添加单个URL到下载队列"""
        ...

    async def add_urls(
        self,
        urls: list[str],
        save_path: str | Path = "./downloads",
    ) -> list[str]:
        """批量添加URL到下载队列"""
        ...

    def set_progress_callback(self, callback) -> None:
        """设置批量进度回调 (completed, total, speed, eta)"""
        ...

    def set_file_complete_callback(self, callback) -> None:
        """设置单文件完成回调 (task: FileTask)"""
        ...

    async def start(self) -> None:
        """启动批量下载"""
        ...

    async def pause(self) -> None:
        """暂停下载"""
        ...

    async def resume(self) -> None:
        """恢复下载"""
        ...

    async def cancel(self) -> None:
        """取消下载"""
        ...

    async def stop(self) -> None:
        """停止下载并关闭连接池"""
        ...

    def get_task(self, task_id: str) -> FileTask | None:
        """根据ID获取任务"""
        ...

    def get_all_tasks(self) -> list[FileTask]:
        """获取所有任务"""
        ...

    def get_progress(self) -> BatchProgress:
        """获取批量下载进度"""
        ...

    def get_stats(self) -> dict:
        """获取统计信息"""
        ...

FileTask

@dataclass
class FileTask:
    task_id: str
    url: str
    save_path: Path
    filename: str | None
    status: FileTaskStatus
    file_size: int
    downloaded: int
    speed: float
    error: str | None
    retry_count: int
    priority: int
    supports_range: bool
    chunks: int

    @property
    def progress(self) -> float: ...
    @property
    def is_active(self) -> bool: ...
    @property
    def is_completed(self) -> bool: ...
    @property
    def is_failed(self) -> bool: ...
    @property
    def is_small_file(self) -> bool: ...
    @property
    def is_large_file(self) -> bool: ...

BatchProgress

@dataclass
class BatchProgress:
    total_files: int
    completed_files: int
    failed_files: int
    active_files: int
    total_bytes: int
    downloaded_bytes: int
    overall_speed: float
    eta: float

    @property
    def progress(self) -> float: ...
    @property
    def files_completed_ratio(self) -> float: ...

便捷函数

async def batch_download(
    urls: list[str],
    save_path: str = "./downloads",
    config: DownloadConfig | None = None,
    max_concurrent_files: int = 5,
    max_concurrent_chunks_per_file: int = 4,
    progress_callback=None,
    file_complete_callback=None,
) -> list[tuple[str, Path | None, str | None]]:
    """异步批量下载,返回 [(url, path, error), ...]"""
    ...

def batch_download_sync(
    urls: list[str],
    save_path: str = "./downloads",
    config: DownloadConfig | None = None,
    **kwargs,
) -> list[tuple[str, Path | None, str | None]]:
    """同步批量下载"""
    ...

高速下载模式 (EnhancedBatchDownloader)

EnhancedBatchDownloader 是基于aria2风格优化的高性能批量下载器,提供更智能的下载调度。

核心特性

特性 说明
智能风格选择 根据文件大小、服务器支持、网络状况自动选择最优下载风格
动态线程分配 全局线程池统一调度,避免资源浪费
多源备份 支持多个备用URL,故障自动切换
文件复用 内容感知匹配,避免重复下载

风格选择算法

系统会自动分析并选择最佳下载风格:

from littledl import DownloadStyle, StrategySelector

selector = StrategySelector(
    default_style=DownloadStyle.ADAPTIVE,
    enable_single=True,
    enable_multi=True,
)

# 文件分析
profile = selector.analyze_file(
    url="https://example.com/file.zip",
    size=100 * 1024 * 1024,  # 100MB
    supports_range=True,
)

# 风格决策
decision = selector.select_style(profile)
print(f"推荐风格: {decision.style.value}")
print(f"推荐分块: {decision.recommended_chunks}")
print(f"预估加速: {decision.estimated_speedup:.1f}x")

使用示例

import asyncio
from littledl import EnhancedBatchDownloader

async def main():
    downloader = EnhancedBatchDownloader(
        max_concurrent_files=5,
        max_total_threads=15,
        enable_existing_file_reuse=True,
        enable_multi_source=True,
    )

    # 支持备份URL
    await downloader.add_url(
        "https://example.com/file.zip",
        backup_urls=["https://backup.com/file.zip"]
    )

    await downloader.start()

asyncio.run(main())

动态风格分配

多文件下载时,系统会根据全局资源动态分配风格:

from littledl import DynamicStyleAllocator, DownloadStyle

allocator = DynamicStyleAllocator(
    selector=selector,
    max_concurrent_files=5,
    max_total_chunks=16,
)

# 添加文件并获取分配
decision = await allocator.add_file(
    file_id="file1",
    url="https://example.com/file.zip",
    size=100 * 1024 * 1024,
    supports_range=True,
    priority=1,
)
print(f"分配风格: {decision.style.value}")

文件复用统计

reuse_stats = downloader.get_file_reuse_stats()
print(f"检查次数: {reuse_stats['checks']}")
print(f"命中次数: {reuse_stats['hits']}")
print(f"节省流量: {reuse_stats['bytes_saved_formatted']}")

API 参考

from littledl import (
    EnhancedBatchDownloader,
    StrategySelector,
    DynamicStyleAllocator,
    DownloadStyle,
)

# 策略选择器
selector = StrategySelector(
    default_style=DownloadStyle.ADAPTIVE,
    enable_single=True,
    enable_multi=True,
    max_chunks=16,
)

# 动态分配器
allocator = DynamicStyleAllocator(
    selector=selector,
    max_concurrent_files=5,
    max_total_chunks=16,
)

# EnhancedBatchDownloader
downloader = EnhancedBatchDownloader(
    config: DownloadConfig | None = None,
    max_concurrent_files: int = 5,
    max_total_threads: int = 15,
    small_file_threshold: int = 1 * 1024 * 1024,
    enable_existing_file_reuse: bool = True,
    enable_multi_source: bool = True,
    enable_adaptive_speed: bool = True,
)