Сравнение TcpListener, TcpStream и UdpSocket
Когда что использовать
TcpListener
Используйте для создания TCP-серверов:
- Веб-серверы
- Серверы баз данных
- Игровые серверы
- Серверы реального времени
- API-серверы
use std::net::{TcpListener, TcpStream}; use std::thread; fn handle_client(stream: TcpStream) { // Обработка клиентского подключения } fn main() -> std::io::Result<()> { let listener = TcpListener::bind("0.0.0.0:8080")?; for stream in listener.incoming() { match stream { Ok(stream) => { thread::spawn(|| { handle_client(stream); }); } Err(e) => eprintln!("Ошибка подключения: {}", e), } } Ok(()) }
TcpStream
Используйте для TCP-клиентов и надежных соединений:
- Клиенты к серверам
- Передача файлов
- Веб-браузеры
- SSH/RDP клиенты
- Базы данных клиенты
use std::net::TcpStream; use std::io::{Read, Write}; fn main() -> std::io::Result<()> { let mut stream = TcpStream::connect("example.com:80")?; // Отправка HTTP запроса stream.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")?; // Чтение ответа let mut response = String::new(); stream.read_to_string(&mut response)?; println!("Ответ: {}", response); Ok(()) }
UdpSocket
Используйте для ненадежной, но быстрой передачи:
- VoIP (голосовая связь)
- Видеостриминг
- DNS запросы
- Игровая синхронизация
- Мониторинг и метрики
use std::net::UdpSocket; fn main() -> std::io::Result<()> { let socket = UdpSocket::bind("0.0.0.0:0")?; // Отправка дейтаграммы socket.send_to(b"Hello UDP!", "example.com:8080")?; // Получение ответа let mut buf = [0; 1024]; let (amt, src) = socket.recv_from(&mut buf)?; println!("Получено {} байт от {}", amt, src); Ok(()) }
Организация устойчивых TCP-соединений
1. Обработка ошибок и повторное подключение
#![allow(unused)] fn main() { use std::net::TcpStream; use std::io::{Read, Write}; use std::time::Duration; use std::thread; fn create_connection() -> Result<TcpStream, std::io::Error> { let mut attempts = 0; let max_attempts = 5; while attempts < max_attempts { match TcpStream::connect("127.0.0.1:8080") { Ok(stream) => { // Настройка параметров для надежности stream.set_read_timeout(Some(Duration::from_secs(30)))?; stream.set_write_timeout(Some(Duration::from_secs(30)))?; stream.set_nodelay(true)?; // Отключение Nagle для низкой задержки return Ok(stream); } Err(e) => { attempts += 1; eprintln!("Попытка {} не удалась: {}", attempts, e); thread::sleep(Duration::from_secs(2 * attempts as u64)); // Экспоненциальная задержка } } } Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "Не удалось подключиться после нескольких попыток")) } }
2. Сервер с пулом потоков
use std::net::{TcpListener, TcpStream}; use std::io::{Read, Write}; use std::sync::{Arc, Mutex}; use std::collections::VecDeque; use std::thread; struct ThreadPool { workers: Vec<Worker>, sender: std::sync::mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { fn new(size: usize) -> ThreadPool { let (sender, receiver) = std::sync::mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); job(); }); Worker { id, thread } } } fn handle_client(mut stream: TcpStream) { let mut buffer = [0; 1024]; loop { match stream.read(&mut buffer) { Ok(0) => { // Соединение закрыто println!("Клиент отключился"); break; } Ok(n) => { // Эхо-ответ if let Err(e) = stream.write_all(&buffer[0..n]) { eprintln!("Ошибка записи: {}", e); break; } } Err(e) => { eprintln!("Ошибка чтения: {}", e); break; } } } } fn main() -> std::io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080")?; let pool = ThreadPool::new(4); println!("Сервер запущен на 127.0.0.1:8080"); for stream in listener.incoming() { match stream { Ok(stream) => { pool.execute(|| { handle_client(stream); }); } Err(e) => eprintln!("Ошибка подключения: {}", e), } } Ok(()) }
3. Протокол с подтверждениями для UDP
#![allow(unused)] fn main() { use std::net::UdpSocket; use std::time::{Duration, Instant}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::thread; #[derive(Debug)] struct ReliableMessage { sequence: u32, data: Vec<u8>, timestamp: Instant, retries: u32, } struct ReliableUdpClient { socket: UdpSocket, sequence: u32, pending_acks: Arc<Mutex<HashMap<u32, ReliableMessage>>>, } impl ReliableUdpClient { fn new(bind_addr: &str) -> Result<Self, std::io::Error> { let socket = UdpSocket::bind(bind_addr)?; socket.set_read_timeout(Some(Duration::from_millis(100)))?; Ok(ReliableUdpClient { socket, sequence: 0, pending_acks: Arc::new(Mutex::new(HashMap::new())), }) } fn send_reliable(&mut self, data: &[u8], dest: &str) -> Result<(), std::io::Error> { let sequence = self.sequence; self.sequence += 1; // Добавляем номер последовательности к данным let mut packet = sequence.to_be_bytes().to_vec(); packet.extend_from_slice(data); // Сохраняем сообщение для повторной отправки let message = ReliableMessage { sequence, data: data.to_vec(), timestamp: Instant::now(), retries: 0, }; { let mut pending = self.pending_acks.lock().unwrap(); pending.insert(sequence, message); } // Отправка self.socket.send_to(&packet, dest)?; Ok(()) } fn start_ack_handler(&self) { let socket = self.socket.try_clone().unwrap(); let pending_acks = Arc::clone(&self.pending_acks); thread::spawn(move || { let mut buf = [0; 1024]; loop { match socket.recv_from(&mut buf) { Ok((amt, src)) => { if amt >= 4 { let sequence = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); // Отправляем подтверждение let ack_packet = sequence.to_be_bytes(); let _ = socket.send_to(&ack_packet, src); // Удаляем из ожидающих подтверждения let mut pending = pending_acks.lock().unwrap(); pending.remove(&sequence); } } Err(_) => { // Таймаут - нормальная ситуация } } } }); } fn start_retry_handler(&self) { let socket = self.socket.try_clone().unwrap(); let pending_acks = Arc::clone(&self.pending_acks); thread::spawn(move || { loop { thread::sleep(Duration::from_millis(100)); let now = Instant::now(); let mut to_retry = Vec::new(); // Находим сообщения для повторной отправки { let mut pending = pending_acks.lock().unwrap(); for (seq, message) in pending.iter_mut() { if now.duration_since(message.timestamp) > Duration::from_secs(1) { if message.retries < 3 { message.retries += 1; message.timestamp = now; to_retry.push((*seq, message.data.clone())); } else { // Слишком много попыток - удаляем to_retry.push((*seq, Vec::new())); // Помечаем для удаления } } } // Удаляем сообщения с превышением лимита попыток for (seq, _) in to_retry.iter().filter(|(_, data)| data.is_empty()) { pending.remove(seq); } } // Повторно отправляем сообщения for (seq, data) in to_retry.into_iter().filter(|(_, data)| !data.is_empty()) { let mut packet = seq.to_be_bytes().to_vec(); packet.extend_from_slice(&data); let _ = socket.send_to(&packet, "127.0.0.1:8080"); } } }); } } }
Лучшие практики для надежных соединений
Для TCP:
- Таймауты - устанавливайте разумные таймауты чтения/записи
- Keep-alive - используйте TCP keep-alive для обнаружения разорванных соединений
- Обработка ошибок - всегда обрабатывайте возможные ошибки ввода-вывода
- Буферизация - используйте буферизированное чтение/запись
- Пул соединений - для серверов используйте пул потоков
Для UDP:
- Нумерация пакетов - добавляйте последовательные номера к пакетам
- Подтверждения - реализуйте механизм подтверждения получения
- Повторная отправка - реализуйте повторную отправку неподтвержденных пакетов
- Дубликаты - обрабатывайте возможные дубликаты пакетов
- Таймауты - устанавливайте таймауты для ожидания подтверждений
Универсальные рекомендации:
- Экспоненциальная задержка - при повторных попытках увеличивайте задержку
- Логирование - ведите логи важных событий и ошибок
- Мониторинг - отслеживайте количество активных соединений и ошибок
- Грейсфул shutdown - корректно закрывайте соединения при завершении
- Ограничение ресурсов - устанавливайте лимиты на количество одновременных соединений
Выбор между TCP и UDP зависит от требований приложения: TCP для надежности, UDP для скорости и низкой задержки.