Multithread-Download von Dateien mit FTP-Python-Skript

Warum wird das benötigt?

Einmal stand ich vor der Aufgabe, eine große Anzahl von Dateien von einem FTP-Server zu kopieren. Es war notwendig, ein Backup zu machen. Es scheint, dass es einfacher sein könnte! Aber leider konnte nichts gefunden werden, was bereit war, so schnell für meine Bedingungen zu arbeiten.





Situation

Unter Windows mussten regelmäßig einige hundert Dateien vom FTP-Server abgerufen werden. Viele kleine Dinge und ein paar sehr große Dateien. Insgesamt ca. 500 GB. Der Server ist ein vps, der sich ziemlich weit im Ausland befindet. Tagsüber ist das Auto stark beladen, früh in der Nacht werden routinemäßige Wartungsarbeiten durchgeführt. Insgesamt stehen maximal 5 Stunden zum Herunterladen zur Verfügung.





Keines der von mir überprüften Versorgungsunternehmen war in der Lage, effizient und in der vorgegebenen Zeit damit umzugehen. Nun, es nirgendwo zu gehen ist, ein normales Backup - System noch nicht gekauft worden, das heißt , wir Arm sie mit einem Editor oder Python IDE und gehen! Abenteuer!





Konfig

Wir werden alle Parameter für das Skript der Einfachheit halber in einer separaten Datei ablegen.





Konfigurationsvorlage:





host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder'  # ,        
max_threads = 20 #     
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
      
      



Wir speichern die Konfiguration mit der Erweiterung .py und importieren sie am Anfang unseres Skripts. Sie können direkt in den Namespace des Skripts importieren, aber ich habe die Konstruktion im Hauptteil meines Skripts ein bisschen wie eine Krücke gemacht:





if __name__ == "__main__":
    host = config.host
    user = config.user
    passwd = config.passwd
    basepath = config.basepath  # ,        
    max_threads = config.max_threads
    log_path = config.log_path
    statusfilepath = config.statusfilepath
    main()
      
      



Am Anfang gab es eine Liste

ftp , , , ftp- . , , - .





. , , , -.





- ftp:





class MyFtp (ftplib.FTP):
    """  ,        """
    def __init__(self):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.timeout = 1800
        super(MyFtp, self).__init__()

    def connect(self):
        super(MyFtp, self).connect(self.host, timeout=self.timeout)

    def login(self):
        super(MyFtp, self).login(user=self.user, passwd=self.passwd)

    def quit(self):
        super(MyFtp,self).quit()
      
      



. ftplib, .





:





class FileList:
    """      """
    def __init__(self):
        self.ftp = None
        self.file_list = []

    def connect_ftp(self):
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()

    def get_list(self, name):
        """       ftp-."""
        import os
        for dirname in self.ftp.mlsd(str(name), facts=["type"]):
            if dirname[1]["type"] == "file":
                entry_file_list = {}
                entry_file_list['remote_path'] = name  #  
                entry_file_list['filename'] = dirname[0]  # 
                self.file_list.append(entry_file_list)
            else:
                path = os.path.join(name, dirname[0])
                self.get_list(path)

    def get_next_file(self):
        return self.file_list.pop()

    def len(self):
        return len(self.file_list)
      
      



, , , , , .





logging. , .





class MyLogger:
    """   """
    def __init__(self):
        self.logger = None

    def start_file_logging(self, logger_name, log_path):
        """   """
        import logging
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = logging.FileHandler(log_path)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = logging.FileHandler(log_path)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
        """     """
        import logging
        from logging.handlers import RotatingFileHandler
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def add(self, msg):
        self.logger.info(str(msg))

    def add_error(self, msg):
        self.logger.error(str(msg))

      
      



, .





. , :





class BaseFileDownload(threading.Thread):
    """     """
    count = 0

    def __init__(self, rpath, filename, log):
        threading.Thread.__init__(self)
        self.remote_path = rpath
        self.filename = filename
        self.ftp = None
        self.command = None
        self.currentpath = None
        self.log = log
        self.__class__.count += 1 #     

    def __del__(self):
        self.__class__.count -= 1

    def connect(self):
        """    ftp"""
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()


    def run(self):
        """  """
        import os
        self.connect()
        self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
        self.currentpath = os.path.join(basepath, self.remote_path[3:])
        self.ftp.cwd(self.remote_path)
        if not os.path.exists(self.currentpath):
            os.makedirs(self.currentpath, exist_ok=True)
        self.host_file = os.path.join(self.currentpath, self.filename)
        try:
            with open(self.host_file, 'wb') as local_file:
                self.log.add("Start downloading " + self.filename)
                self.ftp.retrbinary(self.command + self.filename, local_file.write)
                self.log.add("Downloading " + self.filename + " complete")
        except ftplib.error_perm:
            self.log.add_error('Perm error')
        self.ftp.quit()
      
      



count. : , , , .





run - threading ( !), .





, os.makedirs.





-

. zabbix, , - , .





Die Klasse für die Arbeit mit diesen Dateien sieht folgendermaßen aus:





class StatusFile:
    """          ."""
    def __init__(self):
        self.msg = ''

    def setstatus(self, msg):
        global statusfilepath
        with open(statusfilepath, 'w') as status_file:
            status_file.write(msg)

      
      



Multithreading

Und schließlich die Hauptskriptfunktion selbst, die mit Download-Streams funktioniert:





def main():
    import os
    import datetime
    import time

    log = MyLogger()
    log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #  
    now = datetime.datetime.today().strftime("%Y%m%d")
    global basepath
    basepath = os.path.join(basepath, now)  #  ,   
    list_file = FileList()
    list_file.connect_ftp()
    list_file.get_list("..")
    for i in range(list_file.len()):
        flag = True
        while flag:   #         
            if BaseFileDownload.count < max_threads:
                curfile = list_file.get_next_file()
                threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
                threadid.start()
                flag = False
            else:
                time.sleep(20)
    log.add("Downloading files complete")
    statusfile = StatusFile()
    statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
      
      



Hier beginnen wir mit der Protokollierung und erhalten eine Liste der Dateien (die im Speicher gespeichert sind).





In einer ewigen while-Schleife überprüfen wir die Anzahl der gleichzeitig ausgeführten Downloads und starten bei Bedarf zusätzliche Threads.





Den gesamten Quellcode finden Sie hier .












All Articles