Структура Incoming

#![allow(unused)]
fn main() {
pub struct Incoming<'a> { /* приватные поля */ }
}

Итератор, который бесконечно accept подключения на TcpListener.

Этот итератор создается методом TcpListener::incoming и никогда не возвращает None. Он также не пропускает ошибки ввода-вывода. Если требуется обработать ошибки, используйте next метод итератора.

Примеры

#![allow(unused)]
fn main() {
use std::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:80").unwrap();

for stream in listener.incoming() {
    match stream {
        Ok(stream) => {
            println!("новое подключение: {}", stream.peer_addr().unwrap());
        }
        Err(e) => {
            println!("ошибка подключения: {}", e);
        }
    }
}
}

Методы

next

#![allow(unused)]
fn main() {
impl<'a> Iterator for Incoming<'a> {
    type Item = Result<TcpStream>;
    
    fn next(&mut self) -> Option<Result<TcpStream>>
}
}

Возвращает следующий элемент итератора.

Поскольку Incoming никогда не возвращает None, этот метод всегда возвращает Some.

Примеры

#![allow(unused)]
fn main() {
use std::net::TcpListener;
use std::io;

let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let mut incoming = listener.incoming();

// Использование next() в цикле
while let Some(result) = incoming.next() {
    match result {
        Ok(stream) => {
            println!("Принято новое подключение: {}", stream.peer_addr().unwrap());
            // Обработка потока
        }
        Err(e) => {
            eprintln!("Ошибка при принятии подключения: {}", e);
        }
    }
}
}

Трайт-реализации

Iterator

#![allow(unused)]
fn main() {
impl<'a> Iterator for Incoming<'a>
}

Реализует трейт Iterator для Incoming.

Итератор возвращает Result<TcpStream> и никогда не возвращает None.

Примеры

#![allow(unused)]
fn main() {
use std::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:8080").unwrap();

// Использование в for цикле
for stream_result in listener.incoming() {
    match stream_result {
        Ok(stream) => {
            println!("Новое подключение от: {}", stream.peer_addr().unwrap());
        }
        Err(e) => {
            eprintln!("Ошибка: {}", e);
        }
    }
}
}

Debug

#![allow(unused)]
fn main() {
impl<'a> Debug for Incoming<'a>
}

Реализует отладочное отображение для Incoming.

Примеры

#![allow(unused)]
fn main() {
use std::net::TcpListener;
use std::fmt;

let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let incoming = listener.incoming();

println!("Отладочная информация: {:?}", incoming);
}

Примеры использования

Базовое использование с обработкой ошибок

use std::net::{TcpListener, TcpStream};
use std::io::{self, Read, Write};
use std::thread;

fn handle_client(mut stream: TcpStream) -> io::Result<()> {
    let mut buffer = [0; 1024];
    
    // Чтение данных от клиента
    let bytes_read = stream.read(&mut buffer)?;
    
    if bytes_read > 0 {
        // Эхо-ответ
        stream.write_all(&buffer[..bytes_read])?;
        println!("Обработано {} байт от {}", bytes_read, stream.peer_addr()?);
    }
    
    Ok(())
}

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    println!("Сервер запущен на {}", listener.local_addr()?);
    
    for stream_result in listener.incoming() {
        match stream_result {
            Ok(stream) => {
                println!("Новое подключение от: {}", stream.peer_addr().unwrap());
                
                // Обработка каждого клиента в отдельном потоке
                thread::spawn(move || {
                    if let Err(e) = handle_client(stream) {
                        eprintln!("Ошибка обработки клиента: {}", e);
                    }
                });
            }
            Err(e) => {
                eprintln!("Ошибка при принятии подключения: {}", e);
            }
        }
    }
    
    Ok(())
}

Использование с неблокирующим I/O

use std::net::TcpListener;
use std::io;
use std::thread;
use std::time::Duration;

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    
    // Установка неблокирующего режима
    listener.set_nonblocking(true)?;
    
    println!("Неблокирующий сервер запущен на {}", listener.local_addr()?);
    
    let mut incoming = listener.incoming();
    
    loop {
        match incoming.next() {
            Some(Ok(stream)) => {
                println!("Принято подключение: {}", stream.peer_addr().unwrap());
                // Обработка потока
            }
            Some(Err(ref e)) if e.kind() == io::ErrorKind::WouldBlock => {
                // Нет новых подключений - ждем
                thread::sleep(Duration::from_millis(100));
                continue;
            }
            Some(Err(e)) => {
                eprintln!("Ошибка: {}", e);
            }
            None => {
                // Incoming никогда не возвращает None, но компилятор требует обработки
                unreachable!();
            }
        }
    }
}

Ограничение количества одновременных подключений

use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};

struct ConnectionManager {
    connections: Mutex<HashMap<thread::ThreadId, Instant>>,
    max_connections: usize,
}

impl ConnectionManager {
    fn new(max_connections: usize) -> Self {
        Self {
            connections: Mutex::new(HashMap::new()),
            max_connections,
        }
    }
    
    fn can_accept(&self) -> bool {
        let connections = self.connections.lock().unwrap();
        connections.len() < self.max_connections
    }
    
    fn add_connection(&self, id: thread::ThreadId) {
        let mut connections = self.connections.lock().unwrap();
        connections.insert(id, Instant::now());
    }
    
    fn remove_connection(&self, id: thread::ThreadId) {
        let mut connections = self.connections.lock().unwrap();
        connections.remove(&id);
    }
    
    fn cleanup_old_connections(&self, max_age: Duration) {
        let mut connections = self.connections.lock().unwrap();
        let now = Instant::now();
        connections.retain(|_, &mut timestamp| now.duration_since(timestamp) < max_age);
    }
}

fn handle_client(stream: TcpStream, manager: Arc<ConnectionManager>) {
    let thread_id = thread::current().id();
    
    // Добавляем подключение в менеджер
    manager.add_connection(thread_id);
    
    // Имитация обработки клиента
    println!("Обработка клиента {} в потоке {:?}", 
             stream.peer_addr().unwrap(), thread_id);
    thread::sleep(Duration::from_secs(5));
    
    // Удаляем подключение из менеджера
    manager.remove_connection(thread_id);
    println!("Завершена обработка клиента в потоке {:?}", thread_id);
}

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    let manager = Arc::new(ConnectionManager::new(3)); // Максимум 3 одновременных подключения
    
    println!("Сервер с ограничением подключений запущен на {}", listener.local_addr()?);
    
    for stream_result in listener.incoming() {
        // Очистка старых подключений каждую итерацию
        manager.cleanup_old_connections(Duration::from_secs(10));
        
        if !manager.can_accept() {
            println!("Достигнут лимит подключений, ожидаем...");
            thread::sleep(Duration::from_secs(1));
            continue;
        }
        
        match stream_result {
            Ok(stream) => {
                let manager_clone = Arc::clone(&manager);
                
                thread::spawn(move || {
                    handle_client(stream, manager_clone);
                });
            }
            Err(e) => {
                eprintln!("Ошибка при принятии подключения: {}", e);
            }
        }
    }
    
    Ok(())
}

Использование с таймаутами

use std::net::TcpListener;
use std::io;
use std::thread;
use std::time::Duration;

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    
    // Установка таймаута для операций с слушателем
    listener.set_nonblocking(false)?;
    
    println!("Сервер с таймаутами запущен на {}", listener.local_addr()?);
    
    let start_time = std::time::Instant::now();
    let timeout = Duration::from_secs(30); // 30 секунд работы сервера
    
    for stream_result in listener.incoming() {
        // Проверка таймаута сервера
        if start_time.elapsed() > timeout {
            println!("Время работы сервера истекло");
            break;
        }
        
        match stream_result {
            Ok(stream) => {
                // Установка таймаутов для клиентского потока
                stream.set_read_timeout(Some(Duration::from_secs(10)))?;
                stream.set_write_timeout(Some(Duration::from_secs(10)))?;
                
                println!("Принято подключение с таймаутами: {}", stream.peer_addr().unwrap());
                
                // Обработка в отдельном потоке
                thread::spawn(move || {
                    // Обработка клиента
                    println!("Обработка клиента {}", stream.peer_addr().unwrap());
                });
            }
            Err(e) => {
                eprintln!("Ошибка при принятии подключения: {}", e);
            }
        }
    }
    
    println!("Сервер завершил работу");
    Ok(())
}

Статистика подключений

use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;

#[derive(Debug, Default)]
struct ServerStats {
    total_connections: usize,
    successful_connections: usize,
    failed_connections: usize,
    start_time: Instant,
}

impl ServerStats {
    fn new() -> Self {
        Self {
            start_time: Instant::now(),
            ..Default::default()
        }
    }
    
    fn record_connection(&mut self, success: bool) {
        self.total_connections += 1;
        if success {
            self.successful_connections += 1;
        } else {
            self.failed_connections += 1;
        }
    }
    
    fn print_stats(&self) {
        let uptime = self.start_time.elapsed();
        println!(
            "Статистика сервера (uptime: {:?}):\n\
             Всего подключений: {}\n\
             Успешных: {}\n\
             Неудачных: {}",
            uptime, self.total_connections, self.successful_connections, self.failed_connections
        );
    }
}

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    let stats = Arc::new(Mutex::new(ServerStats::new()));
    
    println!("Сервер со статистикой запущен на {}", listener.local_addr()?);
    
    // Поток для вывода статистики
    let stats_clone = Arc::clone(&stats);
    thread::spawn(move || {
        loop {
            thread::sleep(std::time::Duration::from_secs(10));
            let stats = stats_clone.lock().unwrap();
            stats.print_stats();
        }
    });
    
    for stream_result in listener.incoming() {
        let stats = Arc::clone(&stats);
        
        match stream_result {
            Ok(stream) => {
                {
                    let mut stats = stats.lock().unwrap();
                    stats.record_connection(true);
                }
                
                println!("Успешное подключение от: {}", stream.peer_addr().unwrap());
                
                thread::spawn(move || {
                    // Обработка клиента
                    println!("Обработка клиента {}", stream.peer_addr().unwrap());
                });
            }
            Err(e) => {
                {
                    let mut stats = stats.lock().unwrap();
                    stats.record_connection(false);
                }
                
                eprintln!("Неудачное подключение: {}", e);
            }
        }
    }
    
    Ok(())
}

Особенности использования

Бесконечный итератор

Incoming никогда не возвращает None, поэтому циклы с ним будут выполняться бесконечно, пока программа не будет прервана.

Обработка ошибок

В отличие от некоторых итераторов, Incoming не пропускает ошибки ввода-вывода. Каждая ошибка должна быть обработана явно.

Блокирующие операции

По умолчанию операции accept блокирующие. Для неблокирующего поведения используйте set_nonblocking.

Владение

Incoming заимствует TcpListener, поэтому слушатель не может быть использован для других операций пока итератор существует.