Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
История начинается с несложной задачи и небольшого Python приложения.
Несложная задача это периодическое удаление дубликатов файлов из указанных каталогов. Изначально она возникла из следующих условий. Есть домашнее хранилище фотографии и видео, в котором определен порядок хранения файлов по тематике, датам и т.д. И есть источники для пополнения этого хранилища: смартфоны, фотоаппараты, контент из сети, электронной почты и т.д. Синхронизации источников контента и хранилища нет. Периодически со смартфонов и фотоаппаратов скидываются все хранящиеся там файлы на жесткий диск компьютера, и получается набор каталогов, в которых оказываются как те файлы, что уже есть в хранилище, так и новые файлы. И чтобы поместить в хранилище новые файлы, их нужно каким-то образом отделить их от тех, что уже сохранены. Самый простой способ, который пришел в голову, это удалить дубликаты из каталогов "пополнения", а с остатком уже работать.
С источников файлы не удаляются пока в этом не появится острая необходимость, в первую очередь потому, что это "естественная" резервная копия. Ну и бывает удобно иметь какие-то фотографии и видео у себя под рукой.
В процессе своего повествования, постараюсь пояснить принятые мной решения, некоторые из которых прямо напрашиваются на решение иным способом.
Определение технического задания
Первый шаг на пути к успеху, это формализация условия задачи, которую необходимо решить, и создание "технического задания". Есть каталог с "образцовыми" файлами (source
), то есть это те файлы, дубликаты которых мы хотели бы найти и удалить, так же у нас есть один или более каталогов с файлами, среди которых нужно эти дубликаты найти и удалить (target
). Как каталог source
, так и каталоги target
могут иметь вложенные каталоги. Кроме того, если в процессе удаления дубликатов окажется, что каталог любого уровня вложенности в target
оказывается пустым, то его тоже необходимо удалить.
"Классические" решения по дедубликации файлов, как правило, работают с одним каталогом, и внутри него ищут дубликаты, а дальше механика работы с ними может отличаться, где-то дубликат заменяют на "жесткую" ссылку, где-то удаляют по порядку нахождения, то есть первый найденный остается, остальные удаляются, где-то выбор предлагают сделать пользователю в красивом графическом интерфейсе и т.д.
В нашем же случае, у нас есть каталог, где находятся образцы файлов, и нужно удалить дубликаты, которые находятся в других каталогах, по этому задача несколько упрощается, поскольку мы точно знаем, где мы должны удалить дубликаты.
Интерфейс CLI должен выглядеть следующим образом:
$ dedublicate /home/user/media /home/user/backup/phone /home/user/backup/camera
где /home/user/media
- это медиа хранилище, которое будет источником для образцов файлов, а /home/user/backup/phone
и /home/user/backup/camera
, это каталоги из которых будут удалены дубликаты. Каталогов target
может быть передано любое количество.
Дополнительные ограничения, приложение должно быть написано на чистом Python без использования сторонних зависимостей, только стандартная библиотека.
Определять являются ли файлы дубликатами будем с помощью сравнения контрольных сумм CRC файлов. То есть фактически будем сравнивать файлы по содержимому.
Циклический избыточный код (англ. Cyclic redundancy check, CRC) — алгоритм нахождения контрольной суммы, предназначенный для проверки целостности данных.
Алгоритм работы приложения будет следующим:
обходим дерево каталогов
source
если встретили файл, то вычисляем для него контрольную сумму, и добавляем ее к множеству уже вычисленных контрольных сумм
после завершения обхода дерева каталогов
source
, начинаем обходит деревья каталоговtarget
, но в этот раз идем в глубинуесли встретили файл, вычисляем для него контрольную сумму
проверяем, если ли в нашем множестве вычисленных контрольных сумм контрольная сумма для этого файла, если есть, то удаляем файл считая его дубликатом, если нет, оставляем и продолжаем обход дерева
при обходе в глубину, когда будем возвращаться обратно, нужно понять, являются ли каталоги пустыми, после удаления дубликатов или нет, и если являются, то удалить.
Разработка интерфейса командной строки (CLI)
Процесс разработки будет проходит, если можно так выразиться, "сверху вниз" по шагам.
Начнем с интерфейса командной строки, а потом уже перейдем к деталям. Поскольку есть ограничение на использование только стандартной библиотеки Python, то разработка CLI интерфейса будет с использованием стандартного модуля argparse
#!/usr/bin/env python3
"""Приложение для удаления дубликатов и пустых каталогов"""
import argparse
import sys
from pathlib import Path
from typing import List
def main(source: Path, target: List[Path]):
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('source', type=Path,
help='путь к каталогу с образцами файлов')
parser.add_argument('target', type=Path, nargs='+',
help='пути к каталогам, из которых нужно удалить дубликаты')
args = parser.parse_args()
exit_code = main(args.source, args.target)
sys.exit(exit_code)
Первый вопрос, который может возникнуть по поводу данного кусочка кода, почему типы аргументов функции main
аннотированы с использованием модуля typing
, а не в современном стиле? Особенно учитывая, что начиная с версии 3.9 class typing.List
помечен как deprecated
.
Ответ, в целях обратной совместимости, чтобы приложение можно было запускать на интерпретаторе Python начиная с версии 3.6.
Если приложение не планируется запускать на версиях интерпретатора младше чем 3.9, то объявление функции следует написать следующим образом:
def main(source: Path, target: list[Path]):
pass
Теперь если запустить этот кусочек кода из командной строки с ключом --help
, то будет получен следующий вывод:
usage: dedublicate.py [-h] source target [target ...]
Приложение для удаления дубликатов и пустых каталогов
positional arguments:
source путь к каталогу с образцами файлов
target пути к каталогам, из которых нужно удалить дубликаты
options:
-h, --help show this help message and exit
Очевидный недостаток, это "смесь языков", лучше все же сохранять консистентное состояние, и в данном случае, проще перевести на английский, русские строки используя Google переводчик.
#!/usr/bin/env python3
"""Application to remove duplicates and empty directories"""
import argparse
import sys
from pathlib import Path
from typing import List
def main(source: Path, target: List[Path]) -> int:
return 0
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('source', type=Path,
help='path to the directory with sample files')
parser.add_argument('target', type=Path, nargs='+',
help='paths to directories from which duplicates need to be removed')
args = parser.parse_args()
exit_code = main(args.source, args.target)
sys.exit(exit_code)
Теперь вывод приобретает достойный вид:
usage: dedublicate.py [-h] source target [target ...]
Application to remove duplicates and empty directories
positional arguments:
source path to the directory with sample files
target paths to directories from which duplicates need to be removed
options:
-h, --help show this help message and exit
Настраиваем логирование
Потенциально, разрабатываемое приложение может работать достаточно долго, и было бы не плохо понимать, что оно вообще работает, и в какой стадии находится.
Для информирования пользователя о ходе процесса обработки будет использован модуль стандартной библиотеки logging
. Данные будет выводить на стандартный поток вывода ошибок stderr
.
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(formatter)
logger.addHandler(handler)
В формате записи лога нет наименования файла, а точнее модуля, в котором произошло событие, поскольку модуль будет единственным.
Описываем основную логику приложения
Как мы уже отмечали в начале, базовый алгоритм можно свести к двум шагам:
необходимо создать множество контрольных сумм для "образцовых" файлов
необходимо удалить дубликаты из каталогов
target
На данном этапе этого достаточно, чтобы описать эти действия в коде:
def main(source: Path, targets: List[Path]) -> int:
logger.info('Creating set of hashes for "%s" has begun', source)
hashes = create_set_of_crc(source)
for target in targets:
logger.info('Processing of the target directory %s has begun', target)
if source.resolve() == target.resolve():
logger.error('The source and target directory are the same: %s',
source.resolve())
continue
remove_duplicates(target, hashes)
return 0
Хотелось обратить внимание на один важный момент, который не упоминался ранее.
А что будет если в качестве
target
будет передан путьsource
?В этом случае, наше приложение, которое на первом шаге получило информацию о образцах файлов, просто методично очистит нам каталог
source
, так как сочтет все файлы дубликатами. Именно для этого случая добавлена проверка на совпадение путиsource
иtarget
, причем используется функция получения абсолютного пути, потому что можно легко передать два различных относительных пути, которые будут указывать на один и тот же объект.
Ну и для того, чтобы код можно было вообще запустить, объявим функции, которые мы использовали в этом фрагменте кода, и опишем их сигнатуру:
def create_set_of_crc(source: Path) -> set:
"""Return a set of files hashes for the path"""
def remove_duplicates(target: Path, hashes: set):
"""Remove file duplicates and empty dirs by the path""
Функция создания множества контрольных сумм
Задача по созданию множества контрольных сумм файлов для каталога source
, на самом деле состоит из двух задач:
обойти дерево каталогов
source
вычислить контрольную сумму для каждого файла
И тут возникает вопрос, как это лучше организовать. В голову приходят два варианта:
делать это одновременно, то есть обходим дерево каталогов и как только встретили файл, начинаем вычислять его контрольную сумму;
разделить эти две операции, то есть сначала составить список путей файлов, а потом передать этот список функции для вычисления контрольных сумм.
Следующая задача, как обходить дерево каталогов:
описать свою рекурсивную функцию;
воспользоваться готовой функцией
walk
из модуляos
.
Мой выбор при решении этих двух задач пал на вторые варианты, то есть разделить операции обхода дерева каталогов и вычисление контрольных сумм; обходить дерево каталогов с использование готовой функции walk
.
В результате получилась следующая функция:
import os
def create_set_of_crc(source: Path) -> set:
"""Return a set of files hashes for the path"""
logger.info('Directory tree traversal %s started', source)
paths = [Path(root, filename)
for root, __, files in os.walk(source, topdown=False)
for filename in files]
logger.info('Checksums calculation for a set of paths has begun')
hashes = set(calculate_crc(path) for path in paths)
if None in hashes:
hashes.remove(None)
return hashes
Ну и теперь нужно описать сигнатуру функции вычисления контрольной суммы, при этом сразу же возникает вопрос, а что должна вернуть функция, если не получится вычислить контрольную сумму? Например, нет прав на чтение, или файл поврежден.
Первая мысль, выбросить исключение, и на уровне выше его обработать. Но уровнем выше мы использовали генераторное выражение, и внутри не получится перехватить и обработать исключение. С другой стороны, можно обработать исключение внутри функции, и в качестве значение вместо контрольной суммы вернуть None
:
def calculate_crc(path: Path) -> Optional[str]:
"""Returns CRC file for the path"""
С учетом вышесказанного, нужно внести изменения в функцию create_set_of_crc
, и перед возвращением из нее значений, из множества контрольных сумму удалить None
, если он есть в множестве:
def create_set_of_crc(source: Path) -> set:
"""Return a set of files hashes for the path"""
paths = [Path(root, filename)
for root, __, files in os.walk(source, topdown=False)
for filename in files]
hashes = set(calculate_crc(path) for path in paths)
if None in hashes:
hashes.remove(None)
return hashes
Функция вычисления контрольной суммы
Для вычисление контрольной суммы воспользуемся функцией модуля hashlib
стандартной библиотеки.
import hashlib
READING_BLOCK_SIZE = 4096
def calculate_crc(path: Path) -> Optional[str]:
"""Returns CRC file for the path"""
crc = hashlib.md5()
try:
with open(path, 'rb') as fd:
while True:
chunk = fd.read(READING_BLOCK_SIZE)
if not chunk:
break
crc.update(chunk)
except OSError as err:
logger.error("Processing attempt: %s failed with error: %s", path, err)
return None
return crc.hexdigest()
К этой функции может возникнуть много вопросов, на которые сложно ответить полностью аргументированно.
Например, почему выбран алгоритм MD5, а не какой-либо другой? Если кратко, то этот алгоритм кажется достаточным для решения поставленной задачи, поскольку вероятность коллизии при работе с файлами ничтожна мала.
Другой вопрос, почему не написать такой странный подход к чтению файла по кусочкам? Можно ведь написать короче, например так:
def calculate_crc(path: Path) -> Optional[str]:
"""Returns CRC file for the path"""
crc = hashlib.md5()
try:
crc.update(path.read_bytes())
except OSError as err:
logger.error("Processing attempt: %s failed with error: %s", path, err)
return None
return crc.hexdigest()
Но вот только что произойдет, если вдруг такой функции попадется файл объемом больше, чем доступная оперативная память? А если этот файл в два раза больше оперативной памяти? Хотя в целом, такой вариант остается работоспособным. С высокой долей вероятности интерпретатор Python и операционная система справятся с данной задачей, но я предпочел более безопасный вариант с чтением файлов по кусочкам.
Почему размер одного блока для чтение READING_BLOCK_SIZE
равен 4096 байт? Потому что кажется это количество байт равное размеру одного блока на файловой системе.
Опять же, это демонстрация того, что даже создание простой функции может потребовать принятие множества решений, и поставить вопросы, на которые нет простых ответов.
Функция удаления дубликатов
Остался последний шаг на пути к цели. Нужно реализовать функцию remove_dublicates
.
В этот раз не будет отделение процесса обхода дерева, вычисления контрольной суммы и удаление дубликатов, то есть по мере обхода дерева, будут вычисляться контрольные суммы и приниматься решение о удалении дубликатов, а также будут удаляться пустые каталоги.
def remove_duplicates(target: Path, hashes: set):
"""Remove file duplicates and empty dirs by the path"""
for root, folders, files in os.walk(target, topdown=False):
for filename in files:
target_file = Path(root, filename)
crc = calculate_crc(target_file)
if crc in hashes:
try:
os.remove(target_file)
logger.info('File %s was removed', target_file)
except OSError:
logger.error('Deleting of file %s failed', target_file)
for folder in folders:
target_folder = Path(root, folder)
try:
os.rmdir(target_folder)
logger.info('Empty dir %s was removed', target_folder)
except OSError:
pass
К этому небольшому кусочку кода тоже возникает целый ряд вопросов.
И первый из них - что происходит с обработкой каталогов? Может показаться, что данный фрагмент кода при запуске просто удалит все каталоги, но это нет так. Этот код использует особенность функции os.rmdir
. Если каталог не пустой, то попытка его удаление приведет к выбрасыванию исключения. Таким образом этот кусочек кода действительно пытается удалить все каталоги, но в результате удалит только действительно пустые каталоги. Это один из принципов Python: "Проще просить прощения, чем разрешения". Попытались удалить не пустой каталог, извинились, в форме обработанного исключения, и пошли дальше.
Прекрасно понимаю, что это решение может показаться очень и очень спорным. Скорее всего можно найти более изящное решение.
Заключение
Осталось только взглянуть по полный текст приложения, которое получилось, и можно использовать его при решении поставленных задач.
Полный текст приложения
#!/usr/bin/env python3
"""Application to remove duplicates and empty directories"""
import argparse
import sys
import logging
import os
import hashlib
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(formatter)
logger.addHandler(handler)
READING_BLOCK_SIZE = 4096
def calculate_crc(path: Path) -> Optional[str]:
"""Returns CRC file for the path"""
crc = hashlib.md5()
try:
with open(path, 'rb') as fd:
while True:
chunk = fd.read(READING_BLOCK_SIZE)
if not chunk:
break
crc.update(chunk)
except OSError as err:
logger.error("Processing attempt: %s failed with error: %s", path, err)
return None
return crc.hexdigest()
def create_set_of_crc(source: Path) -> set:
"""Return a set of files hashes for the path"""
logger.info('Directory tree traversal %s started', source)
paths = [Path(root, filename)
for root, __, files in os.walk(source, topdown=False)
for filename in files]
logger.info('Checksums calculation for a set of paths has begun')
hashes = set(calculate_crc(path) for path in paths)
if None in hashes:
hashes.remove(None)
return hashes
def remove_duplicates(target: Path, hashes: set):
"""Remove file duplicates and empty dirs by the path"""
for root, folders, files in os.walk(target, topdown=False):
for filename in files:
target_file = Path(root, filename)
crc = calculate_crc(target_file)
if crc in hashes:
try:
os.remove(target_file)
logger.info('File %s was removed', target_file)
except OSError:
logger.error('Deleting of file %s failed', target_file)
for folder in folders:
target_folder = Path(root, folder)
try:
os.rmdir(target_folder)
logger.info('Empty dir %s was removed', target_folder)
except OSError:
pass
def main(source: Path, targets: List[Path]) -> int:
logger.info('Creating set of hashes for "%s" has begun', source)
hashes = create_set_of_crc(source)
for target in targets:
logger.info('Processing of the target directory %s has begun', target)
if source.resolve() == target.resolve():
logger.error('The source and target directory are the same: %s',
source.resolve())
continue
remove_duplicates(target, hashes)
return 0
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('source', type=Path,
help='path to the directory with sample files')
parser.add_argument('targets', type=Path, nargs='+',
help='paths to directories from which duplicates need to be removed')
args = parser.parse_args()
exit_code = main(args.source, args.targets)
sys.exit(exit_code)
Остается еще не мало вопросов, оставленных без ответов. Например, почему нет тестов? Каким образом можно поискать узкие места в приложении и ускорить их?
Попытки ускорить выполнение приложения
Ответом на вопрос: "Можно ли сократить время первоначального расчета контрольных сумм для "образцового" каталога, за счет многопроцессной обработки?", - может являться небольшое изменение функции create_set_of_crc
, в результате чего она приобретает следующий вид:
from concurrent.futures import ProcessPoolExecutor
def create_set_of_crc(source: Path) -> set:
"""Return a set of files hashes for the path"""
logger.info('Directory tree traversal %s started', source)
paths = [Path(root, filename)
for root, __, files in os.walk(source, topdown=False)
for filename in files]
logger.info('Checksums calculation for a set of paths has begun')
with ProcessPoolExecutor() as executor:
hashes = set(executor.map(calculate_crc, paths))
if None in hashes:
hashes.remove(None)
return hashes
Вот только как оценить эффективность данного решения? Эта оптимизация исходит из того, что нам нужно выполнять вычисления на CPU, а действительно ли это было узким местом?
В ходе тестирования оказалось, что при большом количестве файлов ProcessPoolExecutor
показывает результат по времени хуже, чем при работе в одни поток. В попытках выяснить причину этого явления, попалась статья Jason Brownlee от 09.02.2022 года ProcessPoolExecutor Example in Python. Оказывается, что прежде чем передать задания на обработку дочерним процессам, родительский процесс производит подготовку, и пока он не подготовит все задачи, он не отдаст ни одного задания в работу. В результате это подготовка для большого числа заданий поглощает всю выгоду от многопроцессной обработки. Автор также предлагает некоторое решение этой проблемы, но я решил попробовать воспользоваться ThreadPoolExecutor
.
Таким образом код функции create_set_of_crc
превратился в следующий:
from concurrent.futures import ProcessPoolExecutor
def create_set_of_crc(source: Path) -> set:
"""Return a set of files hashes for the path"""
logger.info('Directory tree traversal %s started', source)
paths = [Path(root, filename)
for root, __, files in os.walk(source, topdown=False)
for filename in files]
logger.info('Checksums calculation for a set of paths has begun')
with ThreadPoolExecutor() as executor:
hashes = set(executor.map(calculate_crc, paths))
if None in hashes:
hashes.remove(None)
return hashes
В результате получено то ускорение, которое ожидалось. Если говорить очень грубо, поскольку измерения производительности занятие очень непростое, то сократить время обработки удалось в 3,5 раза, при условии, что каталог source
338 039 файлов в среднем размером около 71,5Кб, а вот каталог target
просто пустой.
Опять же, это пример, неправильного подхода к оптимизации, поскольку не проводилось профилирование, не были найдены узкие места. Это не инженерный, а рабоче-крестьянский подход, но он тоже имеет право на существование.
P.S.
В этой статья предпринята попытка показать, что происходит в голове того, кто занимается написанием даже самых простых скриптов. Надеюсь, что мне удалось хоть в какой-то степени показать, что же такое практическое решение задачи.
Первоначально на написание приложения ушло менее 30 минут, но вот для того, чтобы описать этот процесс потребовалось несколько дней.