"""Plain FTP transport (stdlib ftplib).""" from __future__ import annotations import ftplib import io import logging import posixpath from datetime import datetime from typing import Iterable, Optional from .base import BaseTransport, RemoteFile, TransportError logger = logging.getLogger(__name__) class FTPTransport(BaseTransport): TRANSPORT_SLUG = "ftp" _FTP_CLASS = ftplib.FTP def _connect(self): conn = self._FTP_CLASS(timeout=self.timeout) conn.connect(self.host, self.port or 21) conn.login(self.username or "", self.password or "") # Some FTP servers need binary mode explicitly conn.sendcmd("TYPE I") return conn def _remote_dir(self) -> str: if "/" in self.remote_glob: return posixpath.dirname(self.remote_glob) or "/" return self.extra.get("remote_dir", ".") def _glob_pattern(self) -> str: if "/" in self.remote_glob: return posixpath.basename(self.remote_glob) return self.remote_glob def validate(self) -> tuple[bool, str]: try: conn = self._connect() try: conn.cwd(self._remote_dir()) conn.nlst() return True, "connected + listed remote directory" finally: conn.quit() except Exception as exc: return False, f"FTP validate failed: {exc}" def list_since(self, since: Optional[datetime]) -> Iterable[RemoteFile]: import fnmatch conn = self._connect() try: conn.cwd(self._remote_dir()) pattern = self._glob_pattern() # MLSD is preferred; fall back to LIST parsing if unsupported try: entries = list(conn.mlsd()) except (ftplib.error_perm, AttributeError): entries = _parse_list_fallback(conn) for name, facts in entries: if facts.get("type") not in (None, "file"): continue if pattern and not fnmatch.fnmatch(name, pattern): continue modify = facts.get("modify") if modify: try: mtime = datetime.strptime(modify, "%Y%m%d%H%M%S") except ValueError: mtime = datetime.utcnow() else: mtime = datetime.utcnow() if since and mtime <= since: continue size = int(facts.get("size", 0) or 0) yield RemoteFile( path=posixpath.join(self._remote_dir(), name), mtime=mtime, size_bytes=size, ) finally: conn.quit() def fetch(self, remote_path: str) -> bytes: conn = self._connect() try: conn.cwd(posixpath.dirname(remote_path) or "/") buf = io.BytesIO() conn.retrbinary(f"RETR {posixpath.basename(remote_path)}", buf.write) return buf.getvalue() finally: conn.quit() def _parse_list_fallback(conn): """For servers without MLSD, parse LIST output (best-effort).""" lines: list[str] = [] conn.retrlines("LIST", lines.append) for line in lines: parts = line.split(maxsplit=8) if len(parts) < 9: continue name = parts[-1] try: size = int(parts[4]) except ValueError: size = 0 yield name, {"type": "file", "size": str(size)} class FTPSPlain(ftplib.FTP_TLS): """Trivial subclass enabling data-channel protection by default.""" def __init__(self, *a, **kw): super().__init__(*a, **kw) __all__ = ["FTPTransport"]