代码如下:
import concurrent.futures
import os
import json
import requests
from concurrent.futures import ThreadPoolExecutor
error_url_list = []
# 请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
def download_image(image_url, image_name):
# 提取文件后缀
file_suffix = image_url.split(".")[-1]
# 设置文件名
image_path = os.path.join("data", str(image_name) + "." + file_suffix)
response = requests.get(image_url, headers=headers)
if response.status_code == 200:
with open(image_path, "wb") as image:
image.write(response.content)
print("图片{}, 下载成功".format(str(image_name)))
else:
print("{} - 出错了".format(image_url))
if __name__ == '__main__':
# 读取数据文件
with open("image.json", "r", encoding="UTF-8") as file:
data = json.load(file)
print(len(data["image_link"]))
# 创建文件夹
if not os.path.exists("data"):
os.makedirs("data")
# 启用多线程
with ThreadPoolExecutor(max_workers=8) as thead:
# 生成字典
thead_dict = {}
for i in range(0, len(data["image_link"])):
thead_dict[thead.submit(download_image, data["image_link"][i], i+1)] = {
"url": data["image_link"][i],
"num": i + 1
}
# 遍历下载
for i in concurrent.futures.as_completed(thead_dict):
print("-------------------------------------------------------")
print("图片下载进度: {} / {}".format(thead_dict[i]["num"], len(data["image_link"])))
print("正在下载: {}".format(thead_dict[i]["url"]))
try:
result = i.result()
if result is not None:
raise ValueError("执行出错")
except Exception as e:
error_url_list.append(thead_dict[i]["url"])
print("下载错误: 图片:{} --- 信息:{}".format(thead_dict[i]["url"], str(e)))
# 保存错误文件
with open("error_image.json", "w", encoding="UTF-8") as error_file:
json.dump({"image_link": error_url_list}, error_file)
文章评论