Многопоточное скачивание файлов с ftp python-скриптом

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Зачем это нужно?

Однажды передо мной встала задача копирования большого количества файлов с ftp-сервера. Нужно было делать бэкап. Казалось бы, что может быть проще! Но увы, ничего готового работающего так же быстро для моих условий найти не удалось.

Ситуация

Нужно было забирать периодически пару сотен файлов с ftp-сервера под Windows. Много мелочи и несколько очень крупных по размеру файлов. Суммарно примерно на 500 Гб. Сервер представляет собой vps, расположенный довольно далеко за рубежом. Днем машина высоко нагружена, рано ночью выполняются регламентные работы, итого на скачивание часов 5 максимум.

Ни одна из рассмотренных мной утилит не смогла справиться качественно и за отведенное время. Ну что ж, деваться некуда, нормальную систему резервного копирования ещё не купили, а значит ноги в руки вооружаемся редактором или IDE Python и вперёд! За приключениями!

Конфиг

Все параметры для скрипта вынесем в отдельный файл для удобства.

Шаблон конфига:

host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder'  # Папка, в которой будут созданы подпапки со скачанными файлами
max_threads = 20 # максимальное количество одновременных процессов загрузки
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'

Конфиг сохраняем с расширением .py и импортируем в начале нашего скрипта. Импортировать можно непосредственно в пространство имён скрипта, но я сделал конструкцию слегка напоминающую костыль в основной части моего скрипта:

if __name__ == "__main__":
    host = config.host
    user = config.user
    passwd = config.passwd
    basepath = config.basepath  # Папка, в которой будут созданы подпапки со скачанными файлами
    max_threads = config.max_threads
    log_path = config.log_path
    statusfilepath = config.statusfilepath
    main()

В начале был список

Скачать файлы с ftp сама по себе задача не сложная, но путём недолгих экспериментов было выяснено, что скачивание файлов занимает время, а таймаут ftp-соединения приходит к нам гораздо быстрее. Следовательно, качать нужно каждый файл в новом соединении, иначе велик риск чего-то потом не досчитаться в скачанных файлах.

Для этого нам нужен список этих самых файлов. Ни о каком статичном списке файлов, конечно, речи не идет, значит нам его при каждом выполнении скрипта получать с сервера по-новой.

Удобства ради и чтобы не таскать параметры по всему коду - переопределим параметры стандартного класса ftp:

class MyFtp (ftplib.FTP):
    """Класс переопределяет стандартный, чтобы задать все параметры соединение в одном месте"""
    def __init__(self):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.timeout = 1800
        super(MyFtp, self).__init__()

    def connect(self):
        super(MyFtp, self).connect(self.host, timeout=self.timeout)

    def login(self):
        super(MyFtp, self).login(user=self.user, passwd=self.passwd)

    def quit(self):
        super(MyFtp,self).quit()

Параметры берутся из конфига. Конечно же в нужно не забыть импортировать библиотеку ftplib, чтобы этот кусок заработал.

Список файлов с сервера мы получим с помощью следующего класса:

class FileList:
    """Класс для работы со списком загружаемых файлов"""
    def __init__(self):
        self.ftp = None
        self.file_list = []

    def connect_ftp(self):
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()

    def get_list(self, name):
        """Метод для получения списка всех файлов с ftp-сервера."""
        import os
        for dirname in self.ftp.mlsd(str(name), facts=["type"]):
            if dirname[1]["type"] == "file":
                entry_file_list = {}
                entry_file_list['remote_path'] = name  #путь до файла
                entry_file_list['filename'] = dirname[0]  #имя файла
                self.file_list.append(entry_file_list)
            else:
                path = os.path.join(name, dirname[0])
                self.get_list(path)

    def get_next_file(self):
        return self.file_list.pop()

    def len(self):
        return len(self.file_list)

Помимо методов соединения с сервером, получения списка файлов и определения его длины здесь имеет метод, который возвращает нам следующий файл для скачивания, из списка он при этом, конечно, удаляется.

Логирование

Для ведения логов скачивания будет использовать стандартную библиотеку logging. Создадим класс, который будет заниматься логированием.

class MyLogger:
    """Класс для логирования событий"""
    def __init__(self):
        self.logger = None

    def start_file_logging(self, logger_name, log_path):
        """Обычное логирование в файл"""
        import logging
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = logging.FileHandler(log_path)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = logging.FileHandler(log_path)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
        """Логирование в файл с ротацией логов"""
        import logging
        from logging.handlers import RotatingFileHandler
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def add(self, msg):
        self.logger.info(str(msg))

    def add_error(self, msg):
        self.logger.error(str(msg))

Скрипт будет поддерживать просто логирование в файл и ротацию файловых логов, ибо логи имеют свойство расти непомерно и это надо бы держать под контролем.

Скачивание файла

Каждый файл будет скачиваться в отдельном потоке. Класс, скачивающий один конкретный файл с сервера выглядит следующим образом:

class BaseFileDownload(threading.Thread):
    """ Объект для копируемого файла """
    count = 0

    def __init__(self, rpath, filename, log):
        threading.Thread.__init__(self)
        self.remote_path = rpath
        self.filename = filename
        self.ftp = None
        self.command = None
        self.currentpath = None
        self.log = log
        self.__class__.count += 1 # для подсчета одновременно запущенных закачек

    def __del__(self):
        self.__class__.count -= 1

    def connect(self):
        """Метод для соединения с ftp"""
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()


    def run(self):
        """Запуск потока скачивания"""
        import os
        self.connect()
        self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
        self.currentpath = os.path.join(basepath, self.remote_path[3:])
        self.ftp.cwd(self.remote_path)
        if not os.path.exists(self.currentpath):
            os.makedirs(self.currentpath, exist_ok=True)
        self.host_file = os.path.join(self.currentpath, self.filename)
        try:
            with open(self.host_file, 'wb') as local_file:
                self.log.add("Start downloading " + self.filename)
                self.ftp.retrbinary(self.command + self.filename, local_file.write)
                self.log.add("Downloading " + self.filename + " complete")
        except ftplib.error_perm:
            self.log.add_error('Perm error')
        self.ftp.quit()

Для подсчета количества одновременно скачиваемых файлов мы будет использовать свойство класса count. В нём у нас будет количество существующих экземпляров класса: в конструкторе счетчик наращивается, в деструкторе, соответственно, уменьшается.

Метод для запуска скачивания должен обязательно называться run - это требование библиотеки threading (не забываем её импортировать!), которую мы будем использовать для параллельного запуска нескольких процессов скачивания.

При сохранении списка файлов скрипт сохраняет также путь до этого файла, этот путь мы воссоздаем при скачивании с помощью os.makedirs.

Статус-файл

По завершению скачивания скрипт будет записывать в файл уведомление об этом. Это уведомление можно мониторить zabbix, чтобы понимать когда бэкап не отработал или, как сделал я - написать простого бота, чтобы периодически проверять статус.

Класс для работы с этим файлов выглядит так:

class StatusFile:
    """По окончанию задачи скрипт пишет в файл уведомление о корректном выполнении."""
    def __init__(self):
        self.msg = ''

    def setstatus(self, msg):
        global statusfilepath
        with open(statusfilepath, 'w') as status_file:
            status_file.write(msg)

Многопоточность

Ну и, наконец, сама основная функция скрипта, которая осуществляет работу с потоками скачивания:

def main():
    import os
    import datetime
    import time

    log = MyLogger()
    log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) # запускаем логирование
    now = datetime.datetime.today().strftime("%Y%m%d")
    global basepath
    basepath = os.path.join(basepath, now)  # модифицируем путь, добавляя текущую дату
    list_file = FileList()
    list_file.connect_ftp()
    list_file.get_list("..")
    for i in range(list_file.len()):
        flag = True
        while flag:   # цикл внутри которого поддерживается нужное количество одновременно запущенных загрузок
            if BaseFileDownload.count < max_threads:
                curfile = list_file.get_next_file()
                threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
                threadid.start()
                flag = False
            else:
                time.sleep(20)
    log.add("Downloading files complete")
    statusfile = StatusFile()
    statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")

Здесь мы запускаем логирование, получаем список файлов ( он хранится в памяти).

В вечном цикле while мы проверяем количество одновременно запущенных скачиваний и, при необходимости, запускаем дополнительные потоки.

Исходный код целиком можно найти здесь.

Источник: https://habr.com/ru/post/537774/


Интересные статьи

Интересные статьи

Маркетплейс – это сервис от 1С-Битрикс, который позволяет разработчикам делиться своими решениями с широкой аудиторией, состоящей из клиентов и других разработчиков.
Пользователи UNIX-подобных ОС, чтобы достичь желаемого результата, привыкли организовывать совместную работу различных программ, рассчитанных на решение какой-то одной задачи. Эти команды...
Бизнес-смыслы появились в Битриксе в начале 2016 года, но мало кто понимает, как их правильно использовать для удобной настройки интернет-магазинов.
Всем привет! Мы продолжаем запуски новых потоков по уже полюбившимся вам курсам и сейчас спешим сообщить о том, что у нас стартует новый набор по курсу «Администратор Linux», который запустится в...