# Fullmo MovingCap CODE - micropython example. # original file name: mcnet.pyi """Modulo mcnet stub/interfaccia per i servoazionamenti fullmo MovingCap Il modulo `mcnet` fornisce un'API in stile pyserial per la comunicazione socket di rete per MovingCap CODE / Micropython su MovingCap. Consente la creazione di socket server TCP, client TCP, server UDP e client/peer UDP per una facile implementazione di livelli di protocollo applicativo personalizzati, come MODBUS TCP. L'utilizzo di un'API simile a un'API di comunicazione seriale (invece di una libreria Python socket/usocket) fornisce funzionalità di comunicazione estremamente stabile, sicura e conveniente che scarica lo script effettivo da preoccupazioni come binding del socket, accettazione o stabilimento di connessioni, comportamento di riconnessione, ecc. Il modulo fornisce un'interfaccia basata su classi dove si creano oggetti McNet che rappresentano connessioni di rete. Ogni istanza McNet gestisce una connessione socket e fornisce metodi per leggere e scrivere dati. Formato Impostazioni di Connessione: La stringa di impostazioni passata a McNet() definisce il tipo di connessione e i parametri: - Server TCP: "SERVER:porta" o "SERVER:ip:porta" Esempio: McNet("SERVER:10001") - Server TCP sulla porta 10001, qualsiasi interfaccia Esempio: McNet("SERVER:192.168.1.100:502") - Server TCP su IP specifico - Client TCP: "ip:porta" o "hostname:porta" Esempio: McNet("192.168.1.100:502") - Connessione a server TCP su IP:porta Esempio: McNet("plc.local:502") - Connessione usando hostname - Server UDP: "UDP:porta" o "UDP:ip:porta" Esempio: McNet("UDP:5000") - Server UDP sulla porta 5000 - Client/Peer UDP: "UDP:ip:porta" Esempio: McNet("UDP:192.168.1.100:5000") - Comunicazione UDP con host remoto Esempio di Utilizzo: # Esempio server TCP import mcnet server = mcnet.McNet("SERVER:10001") server.set_line_mode(1, ord("<"), ord(">"), 0, 0, 5000) while True: line = server.readline() if line: print("Ricevuto:", line) server.write("OK\\n") # Esempio client TCP client = mcnet.McNet("192.168.1.100:502") if client.is_connected(): client.write(b"\\x00\\x01\\x00\\x00\\x00\\x06") # Richiesta MODBUS response = client.read(256) print("Risposta:", response) client.close() """ __author__ = "Oliver Heggelbacher" __email__ = "oliver.heggelbacher@fullmo.de" __version__ = "50.00.08.xx" __date__ = "2025-02-03" class McNet: """Classe di comunicazione socket di rete con API in stile pyserial. Questa classe rappresenta una connessione socket di rete (TCP o UDP, client o server). Fornisce un'interfaccia semplice, simile a una porta seriale per la comunicazione di rete. """ def __init__(self, settings: str): """Crea e apre una nuova connessione di rete. :param settings: Stringa di impostazioni di connessione che specifica il tipo di socket e i parametri. Il formato dipende dal tipo di connessione (vedere la documentazione del modulo). :type settings: str :raises OSError: Se il socket non può essere aperto (es. tutti i socket in uso, impostazioni non valide) Esempio: server = McNet("SERVER:10001") # Server TCP client = McNet("192.168.1.100:502") # Client TCP """ pass def open(self, settings: str) -> bool: """Apre una connessione di rete (o riapre dopo la chiusura). :param settings: Stringa di impostazioni di connessione (vedere __init__) :type settings: str :return: True se riuscito, False se fallito. :rtype: bool """ pass def is_open(self) -> bool: """Controlla se il socket è aperto. Restituisce True se il socket è stato aperto (anche se non ancora connesso). Per i client TCP, questo indica che il socket esiste ma potrebbe ancora essere in connessione. Per i server TCP, questo indica che il socket del server è in ascolto. :return: True se il socket è aperto, False altrimenti. :rtype: bool """ pass def is_connected(self) -> bool: """Controlla se la connessione è stabilita e pronta per il trasferimento dati. Per i client TCP: Restituisce True quando la connessione al server è stabilita. Per i server TCP: Restituisce True quando un client è connesso. Per UDP: Di solito restituisce True se il socket è aperto (UDP è senza connessione). :return: True se connesso, False altrimenti. :rtype: bool """ pass def close(self): """Chiude la connessione di rete e rilascia il socket. Questo arresta in modo pulito la connessione e libera la risorsa socket per il riutilizzo da parte di altre istanze McNet. """ pass def write(self, data) -> int: """Scrive dati alla connessione di rete. Accetta sia oggetti stringa che oggetti simili a bytes. Le stringhe sono automaticamente codificate come UTF-8. I dati vengono inviati sulla connessione di rete. :param data: Dati da inviare. Può essere str, bytes, bytearray o memoryview. :type data: str or bytes-like :return: Numero di bytes scritti, o valore negativo in caso di errore. :rtype: int Esempio: s.write("Ciao\\n") # Invia stringa s.write(b"\\x01\\x02\\x03") # Invia bytes """ pass def read(self, size: int = 0): """Legge i dati disponibili dalla connessione di rete. Legge fino a 'size' bytes dal buffer di ricezione. Se size è 0 o omesso, legge tutti i dati disponibili. Restituisce None se non sono disponibili dati. Il tipo di ritorno dipende dall'impostazione text_mode: - Se text_mode è True: restituisce str - Se text_mode è False: restituisce bytes :param size: Numero massimo di bytes da leggere. 0 = leggi tutti i disponibili. :type size: int :return: Dati letti come str o bytes, o None se non sono disponibili dati. :rtype: str or bytes or None Esempio: data = s.read() # Leggi tutti i disponibili data = s.read(100) # Leggi fino a 100 bytes """ pass def readline(self): """Legge una riga di dati basata sulla configurazione della modalità riga. Questo metodo utilizza i parametri di analisi della riga configurati con set_line_mode(). Attende ed estrae una riga completa basata sui marcatori di inizio/fine e le impostazioni di timeout. La risposta include i marcatori di inizio e fine, se specificati. Restituisce None se non è disponibile una riga completa o il timeout scade. :return: Dati riga come str o bytes (a seconda di text_mode), o None. :rtype: str or bytes or None Esempio: s.set_line_mode(1, ord('<'), ord('>'), 0, 0, 5000) line = s.readline() # Legge i dati tra '<' e '>' """ pass def read_until(self, expected: str = "\\n", size: int = 0): """Legge i dati fino a quando non si trova una sequenza specifica. Legge dalla connessione fino a quando non si incontra la sequenza di bytes attesa, o fino a quando sono stati letti 'size' bytes (se size > 0), o fino a quando non sono disponibili altri dati. La sequenza attesa è inclusa nei dati restituiti. :param expected: Sequenza di bytes da leggere fino a. Può essere str o bytes. Il valore predefinito è newline. :type expected: str or bytes :param size: Bytes massimi da leggere (0 = nessun limite). :type size: int :return: Dati letti come str o bytes (a seconda di text_mode), o None. :rtype: str or bytes or None Esempio: data = s.read_until("\\r\\n") # Leggi fino a CRLF data = s.read_until(b"\\x00", 1024) # Leggi fino a byte null, max 1024 bytes """ pass def set_line_mode(self, text_mode: int, start_marker: int, end_marker: int, min_len: int, max_len: int, timeout: int) -> int: """Configura la modalità di analisi della riga per il metodo readline(). Questo configura come readline() estrae le righe dal flusso di dati. I marcatori di inizio e fine definiscono i confini della riga, min_len e max_len specificano le lunghezze di frame attese, e il timeout specifica quanto tempo attendere per una riga completa. :param text_mode: Se > 0, restituisce i dati come str (modalità testo). Se 0, restituisce come bytes. :type text_mode: int :param start_marker: Codice ASCII del carattere marcatore di inizio riga (es., ord('<') = 60). :type start_marker: int :param end_marker: Codice ASCII del carattere marcatore di fine riga (es., ord('>') = 62). :type end_marker: int :param min_len: Lunghezza minima del frame in byte (inclusi i marcatori di inizio/fine). Impostare a 0 per nessun vincolo di lunghezza minima. Per i protocolli binari, questo previene interpretazioni errate quando i dati utili contengono byte di marcatore (ambiguità dei confini del frame). :type min_len: int :param max_len: Lunghezza massima del frame in byte (inclusi i marcatori di inizio/fine). Impostare a 0 per nessun vincolo di lunghezza massima. Limita l'ambito di ricerca per i marcatori finali per evitare overflow del buffer e imporre i confini del frame. :type max_len: int :param timeout: Timeout in millisecondi per attendere la riga completa. :type timeout: int :return: 0 in caso di successo, negativo in caso di errore. :rtype: int Esempio: # Protocollo testuale (lunghezza variabile) s.set_line_mode(1, ord('<'), ord('>'), 0, 0, 5000) # Protocollo binario a lunghezza fissa (STX + 6 byte di dati + ETX = 8 byte) s.set_line_mode(0, 0x02, 0x03, 8, 8, 1000) # Protocollo binario a lunghezza variabile (min 5 byte, max 256 byte) s.set_line_mode(0, 0x02, 0x03, 5, 256, 1000) """ pass def in_waiting(self) -> int: """Ottiene il numero di bytes disponibili nel buffer di ricezione. Restituisce il conteggio dei bytes che possono essere letti immediatamente senza bloccare. Utile per controllare se i dati sono disponibili prima di chiamare read(). :return: Numero di bytes disponibili da leggere, o valore negativo in caso di errore. :rtype: int Esempio: if s.in_waiting() > 0: data = s.read() """ pass def reset_input_buffer(self): """Cancella il buffer di ricezione in ingresso. Scarta tutti i dati attualmente nel buffer di ricezione. Utile per risincronizzare la comunicazione o cancellare dati obsoleti. Esempio: s.reset_input_buffer() # Cancella tutti i dati in sospeso s.write("NUOVA_RICHIESTA") """ pass