Сравнение TcpListener, TcpStream и UdpSocket

Когда что использовать

TcpListener

Используйте для создания TCP-серверов:

  • Веб-серверы
  • Серверы баз данных
  • Игровые серверы
  • Серверы реального времени
  • API-серверы
use std::net::{TcpListener, TcpStream};
use std::thread;

fn handle_client(stream: TcpStream) {
    // Обработка клиентского подключения
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("0.0.0.0:8080")?;
    
    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                thread::spawn(|| {
                    handle_client(stream);
                });
            }
            Err(e) => eprintln!("Ошибка подключения: {}", e),
        }
    }
    Ok(())
}

TcpStream

Используйте для TCP-клиентов и надежных соединений:

  • Клиенты к серверам
  • Передача файлов
  • Веб-браузеры
  • SSH/RDP клиенты
  • Базы данных клиенты
use std::net::TcpStream;
use std::io::{Read, Write};

fn main() -> std::io::Result<()> {
    let mut stream = TcpStream::connect("example.com:80")?;
    
    // Отправка HTTP запроса
    stream.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")?;
    
    // Чтение ответа
    let mut response = String::new();
    stream.read_to_string(&mut response)?;
    
    println!("Ответ: {}", response);
    Ok(())
}

UdpSocket

Используйте для ненадежной, но быстрой передачи:

  • VoIP (голосовая связь)
  • Видеостриминг
  • DNS запросы
  • Игровая синхронизация
  • Мониторинг и метрики
use std::net::UdpSocket;

fn main() -> std::io::Result<()> {
    let socket = UdpSocket::bind("0.0.0.0:0")?;
    
    // Отправка дейтаграммы
    socket.send_to(b"Hello UDP!", "example.com:8080")?;
    
    // Получение ответа
    let mut buf = [0; 1024];
    let (amt, src) = socket.recv_from(&mut buf)?;
    
    println!("Получено {} байт от {}", amt, src);
    Ok(())
}

Организация устойчивых TCP-соединений

1. Обработка ошибок и повторное подключение

#![allow(unused)]
fn main() {
use std::net::TcpStream;
use std::io::{Read, Write};
use std::time::Duration;
use std::thread;

fn create_connection() -> Result<TcpStream, std::io::Error> {
    let mut attempts = 0;
    let max_attempts = 5;
    
    while attempts < max_attempts {
        match TcpStream::connect("127.0.0.1:8080") {
            Ok(stream) => {
                // Настройка параметров для надежности
                stream.set_read_timeout(Some(Duration::from_secs(30)))?;
                stream.set_write_timeout(Some(Duration::from_secs(30)))?;
                stream.set_nodelay(true)?; // Отключение Nagle для низкой задержки
                return Ok(stream);
            }
            Err(e) => {
                attempts += 1;
                eprintln!("Попытка {} не удалась: {}", attempts, e);
                thread::sleep(Duration::from_secs(2 * attempts as u64)); // Экспоненциальная задержка
            }
        }
    }
    Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "Не удалось подключиться после нескольких попыток"))
}
}

2. Сервер с пулом потоков

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use std::thread;

struct ThreadPool {
    workers: Vec<Worker>,
    sender: std::sync::mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        let (sender, receiver) = std::sync::mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        
        ThreadPool { workers, sender }
    }
    
    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            job();
        });
        
        Worker { id, thread }
    }
}

fn handle_client(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    
    loop {
        match stream.read(&mut buffer) {
            Ok(0) => {
                // Соединение закрыто
                println!("Клиент отключился");
                break;
            }
            Ok(n) => {
                // Эхо-ответ
                if let Err(e) = stream.write_all(&buffer[0..n]) {
                    eprintln!("Ошибка записи: {}", e);
                    break;
                }
            }
            Err(e) => {
                eprintln!("Ошибка чтения: {}", e);
                break;
            }
        }
    }
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    let pool = ThreadPool::new(4);
    
    println!("Сервер запущен на 127.0.0.1:8080");
    
    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                pool.execute(|| {
                    handle_client(stream);
                });
            }
            Err(e) => eprintln!("Ошибка подключения: {}", e),
        }
    }
    Ok(())
}

3. Протокол с подтверждениями для UDP

#![allow(unused)]
fn main() {
use std::net::UdpSocket;
use std::time::{Duration, Instant};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;

#[derive(Debug)]
struct ReliableMessage {
    sequence: u32,
    data: Vec<u8>,
    timestamp: Instant,
    retries: u32,
}

struct ReliableUdpClient {
    socket: UdpSocket,
    sequence: u32,
    pending_acks: Arc<Mutex<HashMap<u32, ReliableMessage>>>,
}

impl ReliableUdpClient {
    fn new(bind_addr: &str) -> Result<Self, std::io::Error> {
        let socket = UdpSocket::bind(bind_addr)?;
        socket.set_read_timeout(Some(Duration::from_millis(100)))?;
        
        Ok(ReliableUdpClient {
            socket,
            sequence: 0,
            pending_acks: Arc::new(Mutex::new(HashMap::new())),
        })
    }
    
    fn send_reliable(&mut self, data: &[u8], dest: &str) -> Result<(), std::io::Error> {
        let sequence = self.sequence;
        self.sequence += 1;
        
        // Добавляем номер последовательности к данным
        let mut packet = sequence.to_be_bytes().to_vec();
        packet.extend_from_slice(data);
        
        // Сохраняем сообщение для повторной отправки
        let message = ReliableMessage {
            sequence,
            data: data.to_vec(),
            timestamp: Instant::now(),
            retries: 0,
        };
        
        {
            let mut pending = self.pending_acks.lock().unwrap();
            pending.insert(sequence, message);
        }
        
        // Отправка
        self.socket.send_to(&packet, dest)?;
        Ok(())
    }
    
    fn start_ack_handler(&self) {
        let socket = self.socket.try_clone().unwrap();
        let pending_acks = Arc::clone(&self.pending_acks);
        
        thread::spawn(move || {
            let mut buf = [0; 1024];
            loop {
                match socket.recv_from(&mut buf) {
                    Ok((amt, src)) => {
                        if amt >= 4 {
                            let sequence = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
                            
                            // Отправляем подтверждение
                            let ack_packet = sequence.to_be_bytes();
                            let _ = socket.send_to(&ack_packet, src);
                            
                            // Удаляем из ожидающих подтверждения
                            let mut pending = pending_acks.lock().unwrap();
                            pending.remove(&sequence);
                        }
                    }
                    Err(_) => {
                        // Таймаут - нормальная ситуация
                    }
                }
            }
        });
    }
    
    fn start_retry_handler(&self) {
        let socket = self.socket.try_clone().unwrap();
        let pending_acks = Arc::clone(&self.pending_acks);
        
        thread::spawn(move || {
            loop {
                thread::sleep(Duration::from_millis(100));
                
                let now = Instant::now();
                let mut to_retry = Vec::new();
                
                // Находим сообщения для повторной отправки
                {
                    let mut pending = pending_acks.lock().unwrap();
                    for (seq, message) in pending.iter_mut() {
                        if now.duration_since(message.timestamp) > Duration::from_secs(1) {
                            if message.retries < 3 {
                                message.retries += 1;
                                message.timestamp = now;
                                to_retry.push((*seq, message.data.clone()));
                            } else {
                                // Слишком много попыток - удаляем
                                to_retry.push((*seq, Vec::new())); // Помечаем для удаления
                            }
                        }
                    }
                    
                    // Удаляем сообщения с превышением лимита попыток
                    for (seq, _) in to_retry.iter().filter(|(_, data)| data.is_empty()) {
                        pending.remove(seq);
                    }
                }
                
                // Повторно отправляем сообщения
                for (seq, data) in to_retry.into_iter().filter(|(_, data)| !data.is_empty()) {
                    let mut packet = seq.to_be_bytes().to_vec();
                    packet.extend_from_slice(&data);
                    let _ = socket.send_to(&packet, "127.0.0.1:8080");
                }
            }
        });
    }
}
}

Лучшие практики для надежных соединений

Для TCP:

  1. Таймауты - устанавливайте разумные таймауты чтения/записи
  2. Keep-alive - используйте TCP keep-alive для обнаружения разорванных соединений
  3. Обработка ошибок - всегда обрабатывайте возможные ошибки ввода-вывода
  4. Буферизация - используйте буферизированное чтение/запись
  5. Пул соединений - для серверов используйте пул потоков

Для UDP:

  1. Нумерация пакетов - добавляйте последовательные номера к пакетам
  2. Подтверждения - реализуйте механизм подтверждения получения
  3. Повторная отправка - реализуйте повторную отправку неподтвержденных пакетов
  4. Дубликаты - обрабатывайте возможные дубликаты пакетов
  5. Таймауты - устанавливайте таймауты для ожидания подтверждений

Универсальные рекомендации:

  1. Экспоненциальная задержка - при повторных попытках увеличивайте задержку
  2. Логирование - ведите логи важных событий и ошибок
  3. Мониторинг - отслеживайте количество активных соединений и ошибок
  4. Грейсфул shutdown - корректно закрывайте соединения при завершении
  5. Ограничение ресурсов - устанавливайте лимиты на количество одновременных соединений

Выбор между TCP и UDP зависит от требований приложения: TCP для надежности, UDP для скорости и низкой задержки.