Python : Secure File Transfer Manager BatchMode

Erwartete Lesezeit: 3 minuten

Hier mal ein kleines Snippet für den SFTP Dateitransfer für einen einiger maßen sicheren Datentransfer.

In der Version 1.0 könnt ihr Dateien hoch- und herunterladen. Gesteuert wird das ganze über eine Kontrolldatei , in der die Operationen niedergeschrieben werden (aber ohne die Security Informationen). Username und Passwort werden über eine Secret Datei eingelesen .

Über https://github.com/MOderkerk/securefiletransfersuite/releases/download/v1.0.0/securefilemanager.rar könnt ihr euch alles notwendige herunterladen oder hier in den Code schauen.

"""
small application for sending or receiving files via sftp
version: 1.0
date: 09/2021
licence : apache 2.0

Explanations for usage see readme.md file
"""

import datetime
import logging
import os
import sys
from re import search
import paramiko

paramiko.util.log_to_file(os.getenv('sftmLogFile'))
logging.getLogger("paramiko").setLevel(logging.INFO)


def log_console(level: str, message: str):
    """
    Log to the console
    :param level Loglever to display
    :param message message to display
    """
    print(str(datetime.datetime.now()) + ' [' + level.upper() + '] ' + message)


def open_connection(server, port):
    """
    Opens  connection to the specific server and port and returns the sftpclient
    :param server server address to use
    :param port port to the sftp server normally 22
    """
    print('Open sftp connection to ' + server)
    credentials = read_credentials(server)
    if str(3) != str(len(credentials)):
        print(
            'Something went wrong reading the credentials . Found {0} entries for server'.format(str(len(credentials))))
        raise PermissionError('No Username/Password or multiple entries found for server ' + server)

    else:

        host = server + ':' + port
        transporter = paramiko.Transport(host)
        username, password = credentials[1], credentials[2]
        transporter.connect(None, username, password)
        return paramiko.SFTPClient.from_transport(transporter)


def read_credentials(server):
    """ Read the credentials for the specific server from a file with the format
     <server>;<user>;<password>;
     :param server url of the server which is also used in the secret file
    """
    log_console('INFO', 'Searching for the credentials for server ' + server)
    credential_file = os.getenv('sftmCredentialFile')
    if credential_file is None:
        raise AttributeError('Environment variable sftmCredentialFile not found')
    file = open(credential_file, "r")
    lines = file.readlines()
    credentials = []
    file.close()
    for line in lines:
        if search(server, line):
            credentials = line.split(';')
            log_console('INFO', 'Credentials found ')
    return credentials


def close_connection(sftp_client: paramiko.SFTPClient):
    """
    Close the connection to the server
    :param sftp_client sftclient object to use
    """
    log_console('INFO', 'Closing the connection')
    sftp_client.close()


def download_file(source, target, sftp_client: paramiko.SFTPClient):
    """ Download the given file (source) to the target folder incl the target file name
    :param: source sourcefile
    :param: target target path with target filename
    :param: sftp_client client to use
    """
    log_console('INFO', 'Start downloading ' + source + ' to ' + target)
    log_console('INFO', 'Stats of remote file: ' + str(sftp_client.lstat(source)))
    sftp_client.get(source, target)
    log_console('INFO', 'File transferred')


def upload_file(source, target, sftp_client: paramiko.SFTPClient):
    """
    upload the given file (source) to the target folder
    :param: source sourcefile
    :param: target target path with target filename
    :param: sftp_client client to use
    """
    log_console('INFO', 'Start downloading ' + source + ' to ' + target)
    sftp_client.put(source, target)
    log_console('INFO', 'File transferred')


def print_dir(directory, sftp_client: paramiko.SFTPClient):
    """
    print the content of a directory
    :param: directory directory to show
    :param: sftp_client sftpclient to use
    """
    log_console('INFO', 'Printing content of folder ' + directory)
    print(sftp_client.listdir(directory))


def read_control_file(file):
    """
    Reads the control file with the information of the sftp session
    :param: file path to the control file from the args
    :return: lines read controlfile
    """
    file = open(file, 'r')
    lines = file.readlines()
    file.close()
    log_console('INFO', 'Control file contains {0} lines '.format(str(len(lines))))
    return lines


def print_header_info():
    """
    print the application header
    :return:
    """
    print('Secure File Transfer Version 1.0')
    print('--------------------------------')


def validate_arguments():
    """
    validating the arguments from commandline
    :returns:
    """
    if str(2) != str(len(sys.argv)):
        log_console('ERROR', 'Found Parameters ' + str(sys.argv))
        raise AttributeError('[ERROR] You must specify a ftp control file. Please check manual ')


def execute_control_file_commands(controllines):
    """
    executing the sftp control file
    :param: controlllines information read from the controllfile
    """
    log_console('INFO', 'Starting executing control file')

    for line in controllines:
        sftp = None
        splitted_line = line.split(';')
        # log_console('DEBUG',str(splitted_line))
        command = splitted_line[0].upper()
        if command == 'OPEN':
            sftp = open_connection(splitted_line[1], splitted_line[2])

        if command == 'CLOSE':
            close_connection(sftp)
        if command == 'DOWN':
            download_file(splitted_line[1], splitted_line[2], sftp)
        if command == 'UP':
            upload_file(splitted_line[1], splitted_line[2], sftp)
        if command == 'DIR':
            print_dir(splitted_line[1], sftp)
        if command == ' ':
            log_console("INFO", "ignoring blank line in control file")


if __name__ == "__main__":
    print_header_info()
    try:
        validate_arguments()
        control_lines = read_control_file(sys.argv[1])
        execute_control_file_commands(control_lines)
    except AttributeError as e:
        print(e)
    except FileNotFoundError as e:
        print('File not found. {0}'.format(e))