Структура Incoming
#![allow(unused)] fn main() { pub struct Incoming<'a> { /* приватные поля */ } }
Итератор, который бесконечно accept подключения на TcpListener.
Этот итератор создается методом TcpListener::incoming и никогда не возвращает None. Он также не пропускает ошибки ввода-вывода. Если требуется обработать ошибки, используйте next метод итератора.
Примеры
#![allow(unused)] fn main() { use std::net::TcpListener; let listener = TcpListener::bind("127.0.0.1:80").unwrap(); for stream in listener.incoming() { match stream { Ok(stream) => { println!("новое подключение: {}", stream.peer_addr().unwrap()); } Err(e) => { println!("ошибка подключения: {}", e); } } } }
Методы
next
#![allow(unused)] fn main() { impl<'a> Iterator for Incoming<'a> { type Item = Result<TcpStream>; fn next(&mut self) -> Option<Result<TcpStream>> } }
Возвращает следующий элемент итератора.
Поскольку Incoming никогда не возвращает None, этот метод всегда возвращает Some.
Примеры
#![allow(unused)] fn main() { use std::net::TcpListener; use std::io; let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); let mut incoming = listener.incoming(); // Использование next() в цикле while let Some(result) = incoming.next() { match result { Ok(stream) => { println!("Принято новое подключение: {}", stream.peer_addr().unwrap()); // Обработка потока } Err(e) => { eprintln!("Ошибка при принятии подключения: {}", e); } } } }
Трайт-реализации
Iterator
#![allow(unused)] fn main() { impl<'a> Iterator for Incoming<'a> }
Реализует трейт Iterator для Incoming.
Итератор возвращает Result<TcpStream> и никогда не возвращает None.
Примеры
#![allow(unused)] fn main() { use std::net::TcpListener; let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); // Использование в for цикле for stream_result in listener.incoming() { match stream_result { Ok(stream) => { println!("Новое подключение от: {}", stream.peer_addr().unwrap()); } Err(e) => { eprintln!("Ошибка: {}", e); } } } }
Debug
#![allow(unused)] fn main() { impl<'a> Debug for Incoming<'a> }
Реализует отладочное отображение для Incoming.
Примеры
#![allow(unused)] fn main() { use std::net::TcpListener; use std::fmt; let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); let incoming = listener.incoming(); println!("Отладочная информация: {:?}", incoming); }
Примеры использования
Базовое использование с обработкой ошибок
use std::net::{TcpListener, TcpStream}; use std::io::{self, Read, Write}; use std::thread; fn handle_client(mut stream: TcpStream) -> io::Result<()> { let mut buffer = [0; 1024]; // Чтение данных от клиента let bytes_read = stream.read(&mut buffer)?; if bytes_read > 0 { // Эхо-ответ stream.write_all(&buffer[..bytes_read])?; println!("Обработано {} байт от {}", bytes_read, stream.peer_addr()?); } Ok(()) } fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080")?; println!("Сервер запущен на {}", listener.local_addr()?); for stream_result in listener.incoming() { match stream_result { Ok(stream) => { println!("Новое подключение от: {}", stream.peer_addr().unwrap()); // Обработка каждого клиента в отдельном потоке thread::spawn(move || { if let Err(e) = handle_client(stream) { eprintln!("Ошибка обработки клиента: {}", e); } }); } Err(e) => { eprintln!("Ошибка при принятии подключения: {}", e); } } } Ok(()) }
Использование с неблокирующим I/O
use std::net::TcpListener; use std::io; use std::thread; use std::time::Duration; fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080")?; // Установка неблокирующего режима listener.set_nonblocking(true)?; println!("Неблокирующий сервер запущен на {}", listener.local_addr()?); let mut incoming = listener.incoming(); loop { match incoming.next() { Some(Ok(stream)) => { println!("Принято подключение: {}", stream.peer_addr().unwrap()); // Обработка потока } Some(Err(ref e)) if e.kind() == io::ErrorKind::WouldBlock => { // Нет новых подключений - ждем thread::sleep(Duration::from_millis(100)); continue; } Some(Err(e)) => { eprintln!("Ошибка: {}", e); } None => { // Incoming никогда не возвращает None, но компилятор требует обработки unreachable!(); } } } }
Ограничение количества одновременных подключений
use std::net::{TcpListener, TcpStream}; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::thread; use std::time::{Duration, Instant}; struct ConnectionManager { connections: Mutex<HashMap<thread::ThreadId, Instant>>, max_connections: usize, } impl ConnectionManager { fn new(max_connections: usize) -> Self { Self { connections: Mutex::new(HashMap::new()), max_connections, } } fn can_accept(&self) -> bool { let connections = self.connections.lock().unwrap(); connections.len() < self.max_connections } fn add_connection(&self, id: thread::ThreadId) { let mut connections = self.connections.lock().unwrap(); connections.insert(id, Instant::now()); } fn remove_connection(&self, id: thread::ThreadId) { let mut connections = self.connections.lock().unwrap(); connections.remove(&id); } fn cleanup_old_connections(&self, max_age: Duration) { let mut connections = self.connections.lock().unwrap(); let now = Instant::now(); connections.retain(|_, &mut timestamp| now.duration_since(timestamp) < max_age); } } fn handle_client(stream: TcpStream, manager: Arc<ConnectionManager>) { let thread_id = thread::current().id(); // Добавляем подключение в менеджер manager.add_connection(thread_id); // Имитация обработки клиента println!("Обработка клиента {} в потоке {:?}", stream.peer_addr().unwrap(), thread_id); thread::sleep(Duration::from_secs(5)); // Удаляем подключение из менеджера manager.remove_connection(thread_id); println!("Завершена обработка клиента в потоке {:?}", thread_id); } fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080")?; let manager = Arc::new(ConnectionManager::new(3)); // Максимум 3 одновременных подключения println!("Сервер с ограничением подключений запущен на {}", listener.local_addr()?); for stream_result in listener.incoming() { // Очистка старых подключений каждую итерацию manager.cleanup_old_connections(Duration::from_secs(10)); if !manager.can_accept() { println!("Достигнут лимит подключений, ожидаем..."); thread::sleep(Duration::from_secs(1)); continue; } match stream_result { Ok(stream) => { let manager_clone = Arc::clone(&manager); thread::spawn(move || { handle_client(stream, manager_clone); }); } Err(e) => { eprintln!("Ошибка при принятии подключения: {}", e); } } } Ok(()) }
Использование с таймаутами
use std::net::TcpListener; use std::io; use std::thread; use std::time::Duration; fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080")?; // Установка таймаута для операций с слушателем listener.set_nonblocking(false)?; println!("Сервер с таймаутами запущен на {}", listener.local_addr()?); let start_time = std::time::Instant::now(); let timeout = Duration::from_secs(30); // 30 секунд работы сервера for stream_result in listener.incoming() { // Проверка таймаута сервера if start_time.elapsed() > timeout { println!("Время работы сервера истекло"); break; } match stream_result { Ok(stream) => { // Установка таймаутов для клиентского потока stream.set_read_timeout(Some(Duration::from_secs(10)))?; stream.set_write_timeout(Some(Duration::from_secs(10)))?; println!("Принято подключение с таймаутами: {}", stream.peer_addr().unwrap()); // Обработка в отдельном потоке thread::spawn(move || { // Обработка клиента println!("Обработка клиента {}", stream.peer_addr().unwrap()); }); } Err(e) => { eprintln!("Ошибка при принятии подключения: {}", e); } } } println!("Сервер завершил работу"); Ok(()) }
Статистика подключений
use std::net::TcpListener; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Instant; #[derive(Debug, Default)] struct ServerStats { total_connections: usize, successful_connections: usize, failed_connections: usize, start_time: Instant, } impl ServerStats { fn new() -> Self { Self { start_time: Instant::now(), ..Default::default() } } fn record_connection(&mut self, success: bool) { self.total_connections += 1; if success { self.successful_connections += 1; } else { self.failed_connections += 1; } } fn print_stats(&self) { let uptime = self.start_time.elapsed(); println!( "Статистика сервера (uptime: {:?}):\n\ Всего подключений: {}\n\ Успешных: {}\n\ Неудачных: {}", uptime, self.total_connections, self.successful_connections, self.failed_connections ); } } fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080")?; let stats = Arc::new(Mutex::new(ServerStats::new())); println!("Сервер со статистикой запущен на {}", listener.local_addr()?); // Поток для вывода статистики let stats_clone = Arc::clone(&stats); thread::spawn(move || { loop { thread::sleep(std::time::Duration::from_secs(10)); let stats = stats_clone.lock().unwrap(); stats.print_stats(); } }); for stream_result in listener.incoming() { let stats = Arc::clone(&stats); match stream_result { Ok(stream) => { { let mut stats = stats.lock().unwrap(); stats.record_connection(true); } println!("Успешное подключение от: {}", stream.peer_addr().unwrap()); thread::spawn(move || { // Обработка клиента println!("Обработка клиента {}", stream.peer_addr().unwrap()); }); } Err(e) => { { let mut stats = stats.lock().unwrap(); stats.record_connection(false); } eprintln!("Неудачное подключение: {}", e); } } } Ok(()) }
Особенности использования
Бесконечный итератор
Incoming никогда не возвращает None, поэтому циклы с ним будут выполняться бесконечно, пока программа не будет прервана.
Обработка ошибок
В отличие от некоторых итераторов, Incoming не пропускает ошибки ввода-вывода. Каждая ошибка должна быть обработана явно.
Блокирующие операции
По умолчанию операции accept блокирующие. Для неблокирующего поведения используйте set_nonblocking.
Владение
Incoming заимствует TcpListener, поэтому слушатель не может быть использован для других операций пока итератор существует.