
Ein Peer-to-Peer-Netzwerk oder einfach ein P2P- Netzwerk ist ein Netzwerk, in dem alle Benutzer gleich sind und gleiche Rechte haben . Eine Besonderheit solcher Netzwerke gegenüber normalen Netzwerken besteht darin, dass sie keinen einzigen Server haben, mit dem Benutzer eine Verbindung herstellen, sondern sich miteinander verbinden . Es gibt hybride Varianten solcher Netzwerke, in denen ein Server nur koordinierende Arbeiten ausführt.
Heute möchte ich eine einfache Implementierung eines P2P-Servers für ein solches Netzwerk in Python anbieten.
Hintergrund
1- . . , python, . .
. , 2 , . , P2P .
.
import socket
import rsa
from threading import Thread
from time import sleep
import datetime# P2P
class P2P:
def __init__(self, _port: int, _max_clients: int = 1):
#
self.running = True
#
self.port = _port
# -
self.max_clients = _max_clients
#
self.clients_ip = ["" for i in range(self.max_clients)]
#
self.incoming_requests = {}
#
self.clients_logs = [Log for i in range(self.max_clients)]
#
self.client_sockets = [socket.socket() for i in range(self.max_clients)]
#
for i in self.client_sockets:
i.settimeout(0.2)
#
self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
#
self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
#
self.socket_busy = [False for i in range(self.max_clients)]
#
self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
#
self.server_socket = socket.socket()
#
self.server_socket.settimeout(0.2)
#
self.server_socket.bind(('localhost', _port))
self.server_socket.listen(self.max_clients)
self.log = Log("server.log")
self.log.save_data("Server initialized")
# server control
#
def create_session(self, _address: str):
self.log.save_data("Creating session with {}".format(_address))
ind = self.__get_free_socket()
if _address in self.blacklist:
self.log.save_data("{} in blacklist".format(_address))
return
if ind is None:
self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
return
try:
self.__add_user(_address)
thread = Thread(target=self.__connect, args=(_address, 1))
thread.start()
thread.join(0)
connection, address = self.server_socket.accept()
connection.settimeout(0.2)
except OSError:
self.log.save_data("Failed to create session with {}".format(_address))
self.__del_user(_address)
return
my_key = rsa.newkeys(512)
self.raw_send(_address, my_key[0].save_pkcs1())
key = connection.recv(162).decode()
self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
key = rsa.PublicKey.load_pkcs1(key)
self.__add_keys(_address, key, my_key[1])
while self.running and self.socket_busy[ind]:
try:
data = connection.recv(2048)
except socket.timeout:
continue
except OSError:
self.close_connection(_address)
return
if data:
data = rsa.decrypt(data, self.my_keys[ind])
self.__add_request(_address, data)
try:
self.close_connection(_address)
except TypeError or KeyError:
pass
#
def __connect(self, _address: str, *args):
ind = self.__get_ind_by_address(_address)
try:
self.client_sockets[ind].connect((_address, self.port))
self.socket_busy[ind] = True
return True
except OSError:
return False
#
def __reload_socket(self, _ind: int):
self.client_sockets[_ind].close()
self.client_sockets[_ind] = socket.socket()
self.socket_busy[_ind] = False
#
def close_connection(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.__del_key(_address)
self.__reload_socket(ind)
self.__del_user(_address)
#
def kill_server(self):
self.running = False
sleep(1)
self.server_socket.close()
self.log.kill_log()
for i in self.client_sockets:
i.close()
for i in self.clients_logs:
try:
i.kill_log()
except TypeError:
pass
#
def send(self, _address: str, _message: str):
ind = self.__get_ind_by_address(_address)
try:
self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
self.client_sockets[ind].send(rsa.encrypt(_message.encode(), self.keys[ind]))
self.log.save_data("Send message to {}".format(_address))
except OSError:
self.log.save_data("Can`t send message to {}".format(_address))
#
def raw_send(self, _address: str, _message: bytes):
ind = self.__get_ind_by_address(_address)
try:
self.client_sockets[ind].send(_message)
self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
self.log.save_data("Raw send message to {}".format(_address))
except OSError:
self.log.save_data("Raw send to {} Failed".format(_address))
# add
#
def __add_user(self, _address: str):
ind = self.__get_free_socket()
self.clients_logs[ind] = Log("{}.log".format(_address))
self.clients_ip[ind] = _address
self.incoming_requests[_address] = []
self.log.save_data("Added user {}".format(_address))
#
def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
ind = self.__get_ind_by_address(_address)
try:
self.keys[ind] = _key
self.my_keys[ind] = _my_key
except TypeError:
return
#
def __add_request(self, _address: str, _message: bytes):
self.incoming_requests[_address].append(_message.decode())
self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
self.log.save_data("Get incoming message from {}".format(_address))
# get
#
# if self.__get_free_socket() is not None: *
def __get_free_socket(self):
for i in range(len(self.socket_busy)):
if not self.socket_busy[i]:
return i
return None
# ,
def __get_ind_by_address(self, _address: str):
for i in range(len(self.clients_ip)):
if self.clients_ip[i] == _address:
return i
else:
return None
#
def get_request(self, _address: str):
data = self.incoming_requests[_address][0]
self.incoming_requests[_address] = [self.incoming_requests[_address][i]
for i in range(1, len(self.incoming_requests[_address]))]
return data
# check
#
# if self.check_request(_address): *
def check_request(self, _address: str):
return bool(self.incoming_requests.get(_address))
# return True if you already connected to _address else False
def check_address(self, _address: str):
return True if _address in self.clients_ip else False
# del
#
def __del_user(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.clients_logs[ind].kill_log()
self.clients_logs[ind] = Log
self.clients_ip[ind] = ""
self.incoming_requests.pop(_address)
self.log.save_data("Deleted user {}".format(_address))
#
def __del_key(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.keys[ind] = rsa.key.PublicKey
self.my_keys[ind] = rsa.key.PrivateKey
# others
#
def __len__(self):
num = 0
for i in self.clients_ip:
if i != "":
num += 1
return num
#
def __bool__(self):
for i in self.clients_ip:
if i != "":
return True
return Falseclass Log:
def __init__(self, _name: str):
self.name = _name
try:
self.file = open(_name, "a")
except FileNotFoundError:
self.file = open(_name, "w")
self.save_data("Log started at " + str(datetime.datetime.now()))
self.file.close()
#
def save_data(self, _data: str):
self.file = open(self.name, "a")
self.file.write("{}\n".format(_data))
self.file.close()
#
@staticmethod
def read_and_return_list(_name: str):
try:
file = open(_name, "r")
except FileNotFoundError:
return []
data = file.read()
return data.split("\n")
#
def kill_log(self):
self.file = open(self.name, "a")
self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
self.file.close(). , :
- add
- del
- check
- get
- server control
def __init__(self, _port: int, _max_clients: int = 1):
#
self.running = True
#
self.port = _port
# -
self.max_clients = _max_clients
#
self.clients_ip = ["" for i in range(self.max_clients)]
#
self.incoming_requests = {}
#
self.clients_logs = [Log for i in range(self.max_clients)]
#
self.client_sockets = [socket.socket() for i in range(self.max_clients)]
#
for i in self.client_sockets:
i.settimeout(0.2)
#
self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
#
self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
#
self.socket_busy = [False for i in range(self.max_clients)]
#
self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
#
self.server_socket = socket.socket()
#
self.server_socket.settimeout(0.2)
#
self.server_socket.bind(('localhost', _port))
self.server_socket.listen(self.max_clients)
self.log = Log("server.log")
self.log.save_data("Server initialized"), - , 1. :
- -
:
- Ip
, "127.0.0.1" " " ( localhost ), , . - listen().
add
. .
add_user , .
def __add_user(self, _address: str):
ind = self.__get_free_socket()
self.clients_logs[ind] = Log("{}.log".format(_address))
self.clients_ip[ind] = _address
self.incoming_requests[_address] = []
self.log.save_data("Added user {}".format(_address))add_keys .
def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
ind = self.__get_ind_by_address(_address)
try:
self.keys[ind] = _key
self.my_keys[ind] = _my_key
except TypeError:
returnadd_request .
def __add_request(self, _address: str, _message: bytes):
self.incoming_requests[_address].append(_message.decode())
self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
self.log.save_data("Get incoming message from {}".format(_address))del
. , , .
del_user add_user. , , .
def __del_user(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.clients_logs[ind].kill_log()
self.clients_logs[ind] = Log
self.clients_ip[ind] = ""
self.incoming_requests.pop(_address)
self.log.save_data("Deleted user {}".format(_address))del_key .
def __del_key(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.keys[ind] = rsa.key.PublicKey
self.my_keys[ind] = rsa.key.PrivateKeycheck
.
check_request True False .
def check_request(self, _address: str):
return bool(self.incoming_requests.get(_address))check_address True, False, .
def check_address(self, _address: str):
return True if _address in self.clients_ip else Falseget
get_free_socket , , .
def __get_free_socket(self):
for i in range(len(self.socket_busy)):
if not self.socket_busy[i]:
return i
return Noneget_ind_by_address , , , .
def __get_ind_by_address(self, _address: str):
for i in range(len(self.clients_ip)):
if self.clients_ip[i] == _address:
return i
else:
return Noneget_request . , .
def get_request(self, _address: str):
data = self.incoming_requests[_address][0]
self.incoming_requests[_address] = [self.incoming_requests[_address][i]
for i in range(1, len(self.incoming_requests[_address]))]
return dataserver control
, .
— create_session — . , , , .
def create_session(self, _address: str):
self.log.save_data("Creating session with {}".format(_address))
ind = self.__get_free_socket()
if _address in self.blacklist:
self.log.save_data("{} in blacklist".format(_address))
return
if ind is None:
self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
return
try:
self.__add_user(_address)
thread = Thread(target=self.__connect, args=(_address, 1))
thread.start()
thread.join(0)
connection, address = self.server_socket.accept()
connection.settimeout(0.2)
except OSError:
self.log.save_data("Failed to create session with {}".format(_address))
self.__del_user(_address)
return
my_key = rsa.newkeys(512)
self.raw_send(_address, my_key[0].save_pkcs1())
key = connection.recv(162).decode()
self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
key = rsa.PublicKey.load_pkcs1(key)
self.__add_keys(_address, key, my_key[1])
while self.running and self.socket_busy[ind]:
try:
data = connection.recv(2048)
except socket.timeout:
continue
except OSError:
self.close_connection(_address)
return
if data:
data = rsa.decrypt(data, self.my_keys[ind])
self.__add_request(_address, data)
try:
self.close_connection(_address)
except TypeError or KeyError:
passconnect True False . .
def __connect(self, _address: str, *args):
ind = self.__get_ind_by_address(_address)
try:
self.client_sockets[ind].connect((_address, self.port))
self.socket_busy[ind] = True
return True
except OSError:
return Falseclose_connection .
def close_connection(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.__del_key(_address)
self.__reload_socket(ind)
self.__del_user(_address)kill_server .
def kill_server(self):
self.running = False
sleep(1)
self.server_socket.close()
self.log.kill_log()
for i in self.client_sockets:
i.close()
for i in self.clients_logs:
try:
i.kill_log()
except TypeError:
passreload_socket, , .
def __reload_socket(self, _ind: int):
self.client_sockets[_ind].close()
self.client_sockets[_ind] = socket.socket()
self.socket_busy[_ind] = Falsebool True, - , False, .
def __bool__(self):
for i in self.clients_ip:
if i != "":
return True
return Falselen .
def __len__(self):
num = 0
for i in self.clients_ip:
if i != "":
num += 1
return num, . , . , .
, .
. , , . .
def __init__(self, _name: str):
self.name = _name
try:
self.file = open(_name, "a")
except FileNotFoundError:
self.file = open(_name, "w")
self.save_data("Log started at " + str(datetime.datetime.now()))
self.file.close()save_data .
def save_data(self, _data: str):
self.file = open(self.name, "a")
self.file.write("{}\n".format(_data))
self.file.close()Für die statische Funktion read_and_return_list ist kein Objekt der Klasse erforderlich, für den Betrieb ist jedoch der Name der Datei erforderlich, aus der alle Informationen entnommen und als Blatt zurückgegeben werden.
@staticmethod
def read_and_return_list(_name: str):
try:
file = open(_name, "r")
except FileNotFoundError:
return []
data = file.read()
return data.split("\n")Und die letzte Funktion kill_log schreibt die Protokollstoppzeit in die Datei und schließt die Datei.
def kill_log(self):
self.file = open(self.name, "a")
self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
self.file.close()Fazit
Das Schreiben eines Peer-to-Peer-Servers ist nicht schwierig, aber andererseits gibt es Raum zum Bewegen. Sie können das Senden von Dateien und Bildern implementieren und diese in Teilen von mehreren Benutzern gleichzeitig herunterladen. Sie können das Protokoll verbessern oder eine Verschlüsselung hinzufügen, um Schlüssel zur Ver- und Entschlüsselung zu senden.
Wenn es diesbezüglich Vorschläge oder Optionen zur Verbesserung des Codes gibt, lese ich diese gerne in den Kommentaren.