import itertools import time import os from socket import PF_PACKET, SOCK_RAW, ntohs, socket from typing import Iterator import netprotocols class Decoder: def __init__(self, interface: str): self.interface = interface self.data = None self.protocol_queue = ["Ethernet"] self.packet_num: int = 0 self.frame_length: int = 0 self.epoch_time: float = 0 def _bind_interface(self, sock: socket): if self.interface is not None: sock.bind((self.interface, 0)) def _attach_protocols(self, frame: bytes): start = end = 0 for proto in self.protocol_queue: try: proto_class = getattr(netprotocols, proto) except AttributeError: continue end: int = start + proto_class.header_len protocol = proto_class.decode(frame[start:end]) setattr(self, proto.lower(), protocol) if protocol.encapsulated_proto in (None, "undefined"): break self.protocol_queue.append(protocol.encapsulated_proto) start = end self.data = frame[end:] def execute(self) -> Iterator: with socket(PF_PACKET, SOCK_RAW, ntohs(0x0003)) as sock: self._bind_interface(sock) while True: # Sonsuz döngü self.frame_length = len(frame := sock.recv(9000)) self.epoch_time = time.time_ns() / (10 ** 9) self._attach_protocols(frame) yield self del self.protocol_queue[1:] class PacketSniffer: def __init__(self): self._observers = list() self.packets = [] # Yakalanan paketleri saklamak için bir liste def register(self, observer) -> None: self._observers.append(observer) def _notify_all(self, *args, **kwargs) -> None: [observer.update(*args, **kwargs) for observer in self._observers] def listen(self, interface: str) -> None: try: print("Paket yakalama başlatıldı...") for frame in Decoder(interface).execute(): # Her paket alındığında terminale çıktı print(f"Paket Yakalandı: Paket No: {frame.packet_num}, Boyut: {frame.frame_length} bytes, Zaman: {frame.epoch_time:.2f} saniye") # Yakalanan paketi listeye ekle self.packets.append(frame) self._notify_all(frame) except KeyboardInterrupt: print("\nPaket yakalama durduruldu.") self.save_packets() def save_packets(self): """Yakalanan paketleri bir dosyaya kaydet.""" desktop_path = os.path.join(os.path.expanduser("~"), 'Desktop') # Dosya yolu file_path = os.path.join(desktop_path, 'yakalanan_paketler.txt') with open(file_path, 'w') as f: for packet in self.packets: f.write(f"Paket No: {packet.packet_num}, Boyut: {packet.frame_length} bytes, Zaman: {packet.epoch_time:.2f} saniye\n") print(f"Yakalanan paketler {file_path} konumuna kaydedildi.") if __name__ == "__main__": sniffer = PacketSniffer() sniffer.listen("eth0") # Kendi arayüzünüze göre değiştirin