2026/1/20 23:38:58
网站建设
项目流程
定制化网站,网站集约化建设工作打算,做百度推广网站找谁好,外包app开发本篇文章主要介绍如何构建一个Agent能够解析输入的文字#xff0c;理解其意图#xff0c;并且在本地文件系统中搜索符合条件的文件。同时还提供一个Web的页面可以交互式查询本地文件系统。1、功能说明主要功能时根据用户输入的符合条件的需求描述#xff0c;如#xff1a;f…本篇文章主要介绍如何构建一个Agent能够解析输入的文字理解其意图并且在本地文件系统中搜索符合条件的文件。同时还提供一个Web的页面可以交互式查询本地文件系统。1、功能说明主要功能时根据用户输入的符合条件的需求描述如find the pdf file name contains resume in disk D:\\ updated since Nov 1st 2025通过使用LLM模型解析用户输入的需求的每一个条件然后在本地文件系统中进行查找。可以根据以下条件搜索本地文件的系统文件名称文件类型文件修改日期文件大小文件内容2、环境依赖参考https://blog.csdn.net/jimmyleeee/article/details/155646865对于依赖库有所不同可以参考如下pip install langchain-community langchain-ollama streamlit langchain_core sounddevice scipy SpeechRecognition torch torchvision torchaudio3、构建解析用户输入并且进行查询的Agent所有功能都是通过类NaturalLanguageFileSearchAgent实现的它封装了所有的功能包括调用LLM解析输入中的参数并且在本地按照解析获得参数进行搜索。代码如下import os import fnmatch import re from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from langchain_ollama import ChatOllama from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser class NaturalLanguageFileSearchAgent: An intelligent agent that understands natural language queries for file searching using Ollama/Qwen2.5 def __init__(self, modelqwen2.5, base_urlhttp://localhost:11434): self.search_history [] # Define common file types and their extensions self.file_types { document: [.doc, .docx, .txt, .rtf, .odt, .wpd], word: [.doc, .docx], excel: [.xls, .xlsx, .csv, .ods], spreadsheet: [.xls, .xlsx, .csv, .ods], powerpoint: [.ppt, .pptx, .odp], presentation: [.ppt, .pptx, .odp], pdf: [.pdf], image: [.jpg, .jpeg, .png, .gif, .bmp, .tiff, .svg], photo: [.jpg, .jpeg, .png, .gif, .bmp, .tiff], video: [.mp4, .avi, .mkv, .mov, .wmv, .flv], audio: [.mp3, .wav, .flac, .aac, .ogg], music: [.mp3, .wav, .flac, .aac, .ogg], code: [.py, .java, .cpp, .js, .html, .css, .php], archive: [.zip, .rar, .7z, .tar, .gz], compressed: [.zip, .rar, .7z, .tar, .gz] } # Define time expressions self.time_expressions { today: 1, yesterday: 2, week: 7, month: 30, year: 365 } # Define common drive letters for Windows self.common_drives [C:, D:, E:, F:, G:, H:] # Initialize Ollama model self.llm ChatOllama(modelmodel, base_urlbase_url) # Define prompt for parsing natural language queries self.parse_prompt ChatPromptTemplate.from_messages([ (system, You are an intelligent file search assistant. Your task is to parse natural language queries and extract structured search parameters. Always respond in valid JSON format with the following keys: - name_pattern: file name pattern to search for (string or null) - extensions: list of file extensions to include (array or null) - days_old: number of days relative to now (integer or null) * For updated since [past date]: positive number of days ago * For updated since [future date]: negative number * Example: If today is Dec 11, 2025 and query says since Nov 1, 2025, then days_old 40 - min_size: minimum file size in bytes (integer or null) - max_size: maximum file size in bytes (integer or null) - keyword: content keyword to search for (string or null) - search_path: directory path to search in (string or SYSTEM_WIDE for system-wide search) Examples: Query: Find recent Word documents from last 10 days Response: {name_pattern: null, extensions: [.doc, .docx], days_old: 10, min_size: null, max_size: null, keyword: null, search_path: SYSTEM_WIDE} Query: Show me PDF files on my desktop Response: {name_pattern: null, extensions: [.pdf], days_old: null, min_size: null, max_size: null, keyword: null, search_path: DESKTOP} Query: Look for resume.pdf in my downloads Response: {name_pattern: resume.pdf, extensions: null, days_old: null, min_size: null, max_size: null, keyword: null, search_path: DOWNLOADS} Query: Find PDF files containing resume updated since Nov 1st 2025 (assuming today is Dec 11, 2025) Response: {name_pattern: null, extensions: [.pdf], days_old: 40, min_size: null, max_size: null, keyword: resume, search_path: SYSTEM_WIDE} IMPORTANT: Always respond ONLY with valid JSON, no extra text.), (human, Query: {query}) ]) self.parser_chain self.parse_prompt | self.llm | StrOutputParser() def understand_query_with_llm(self, query: str) - Dict[str, Any]: Parse natural language query using LLM and extract search parameters Args: query: Natural language query string Returns: Dictionary with extracted search parameters try: # Get response from LLM response self.parser_chain.invoke({query: query}) print(fLLM Response: {response}) # Extract JSON from response if needed json_str self._extract_json_from_response(response) # Parse JSON import json params json.loads(json_str) # Process search_path if params[search_path] SYSTEM_WIDE: params[search_path] SYSTEM_WIDE elif params[search_path] DESKTOP: desktop_path os.path.join(os.path.expanduser(~), Desktop) params[search_path] desktop_path if os.path.exists(desktop_path) else . elif params[search_path] DOWNLOADS: downloads_path os.path.join(os.path.expanduser(~), Downloads) params[search_path] downloads_path if os.path.exists(downloads_path) else . elif params[search_path] DOCUMENTS: documents_path os.path.join(os.path.expanduser(~), Documents) params[search_path] documents_path if os.path.exists(documents_path) else . elif not os.path.exists(params[search_path]): # Fallback to system-wide search if path doesnt exist params[search_path] SYSTEM_WIDE return params except Exception as e: print(fError parsing with LLM: {e}) # Fallback to rule-based parsing return self._understand_query_rule_based(query) def _extract_json_from_response(self, response: str) - str: Extract JSON from LLM response Args: response: Raw LLM response Returns: Clean JSON string # Look for JSON object in response import json try: # Try to parse entire response as JSON json.loads(response) return response except: # Look for JSON object in curly braces match re.search(r\{.*\}, response, re.DOTALL) if match: return match.group(0) else: # Return default JSON if parsing fails return {name_pattern: null, extensions: null, days_old: null, min_size: null, max_size: null, keyword: null, search_path: SYSTEM_WIDE} def _understand_query_rule_based(self, query: str) - Dict[str, Any]: Fallback rule-based query understanding Args: query: Natural language query string Returns: Dictionary with extracted search parameters query query.lower().strip() params { name_pattern: None, extensions: None, days_old: None, min_size: None, max_size: None, keyword: None, search_path: None, recursive: True } # Extract search path from query params[search_path] self._extract_search_path(query) # Extract file type for file_type, extensions in self.file_types.items(): if file_type in query: params[extensions] extensions break # Extract time expressions time_patterns [ rlast\s(\d)\sdays?, rrecent.*?(\d)\sdays?, rpast\s(\d)\sdays?, r(\d)\sdays?\sago, rlast\s(\d)\sweeks?, rrecent.*?(\d)\sweeks?, r(\d)\sweeks?\sago ] for pattern in time_patterns: match re.search(pattern, query) if match: number int(match.group(1)) if week in pattern: number * 7 params[days_old] number break # Check for common time expressions for expr, days in self.time_expressions.items(): if expr in query: params[days_old] days break # Handle updated since [date] patterns - NEW CODE date_patterns [ rupdated\s(?:since|after)\s(\w\s\d(?:st|nd|rd|th)?\s\d{4}), rmodified\s(?:since|after)\s(\w\s\d(?:st|nd|rd|th)?\s\d{4}) ] for pattern in date_patterns: match re.search(pattern, query) if match: date_str match.group(1) days_old self._calculate_days_from_date_string(date_str) if days_old is not None: params[days_old] days_old break # Extract file size expressions size_patterns [ rlarger than\s*(\d)\s*(mb|gb|kb), rbigger than\s*(\d)\s*(mb|gb|kb), rsmaller than\s*(\d)\s*(mb|gb|kb), r(\d)\s*(mb|gb|kb)\s*or (larger|bigger|smaller) ] for pattern in size_patterns: match re.search(pattern, query) if match: number int(match.group(1)) unit match.group(2).lower() comparison match.group(3) if len(match.groups()) 2 else larger # Convert to bytes multiplier 1 if unit kb: multiplier 1024 elif unit mb: multiplier 1024 * 1024 elif unit gb: multiplier 1024 * 1024 * 1024 size_bytes number * multiplier if smaller in comparison: params[max_size] size_bytes else: params[min_size] size_bytes break # Extract keywords for content search keyword_patterns [ rcontaining\s[\]?(.?)[\]?$, rwith\s[\]?(.?)[\]?$, rhas\s[\]?(.?)[\]?$, rcontains\s[\]?(.?)[\]?$ ] for pattern in keyword_patterns: match re.search(pattern, query) if match: keyword match.group(1).strip() keyword re.sub(r\s(file|files|document|documents)?\s*$, , keyword) params[keyword] keyword break # Extract specific filenames or patterns - FIXED VERSION # Handle file name contains [pattern] cases more robustly if file name contains in query: # More flexible pattern matching for file name contains [text] # Match everything after file name contains until we hit a boundary word or end of string match re.search(rfile name contains\s(.*?)(?:\s(?:in|on|at|from|to|of|with|by|updated|modified|and)|$), query) if match: filename_part match.group(1).strip() # Clean up trailing punctuation filename_part re.sub(r[.,;:]$, , filename_part).strip() if filename_part: print(fDEBUG: Found filename pattern: {filename_part}) # If it looks like a full filename with extension, use as-is # Otherwise add wildcards if . in filename_part and len(filename_part.split(.)[-1]) 4: params[name_pattern] filename_part else: params[name_pattern] f*{filename_part}* else: # Fallback pattern match re.search(rfile name contains\s(.), query) if match: filename_part match.group(1).strip() filename_part re.sub(r[.,;:]$, , filename_part).strip() if filename_part: print(fDEBUG: Found filename pattern (fallback): {filename_part}) if . in filename_part and len(filename_part.split(.)[-1]) 4: params[name_pattern] filename_part else: params[name_pattern] f*{filename_part}* # General filename extraction if not already set if params[name_pattern] is None: filename_indicators [ rname.*?contains\s[\]?([^.\]\.[^.\]), rfilename.*?contains\s[\]?([^.\]\.[^.\]), rsearch.*?for\s[\]?([^.\]\.[^.\]), rfind.*?[\]?([^.\]\.[^.\]), rcontains\s[\]?([^.\]\.[^.\]) ] for pattern in filename_indicators: match re.search(pattern, query) if match: filename_part match.group(1).strip() if . in filename_part: params[name_pattern] filename_part break # If we still dont have a name pattern but have a keyword that looks like a filename if params[name_pattern] is None and params[keyword]: if . in params[keyword] and len(params[keyword].split(.)[-1]) 4: params[name_pattern] params[keyword] params[keyword] None print(fDEBUG: Final parsed parameters: {params}) return params def _extract_search_path(self, query: str) - str: Extract search path from query or determine system-wide search Args: query: Natural language query string Returns: Search path string query_lower query.lower() # Look for explicit path mentions path_patterns [ rin\sdisk\s([a-zA-Z]:\\), # Handle in disk D:\ rin\s([a-zA-Z]:\\[^\\s](?:\s[^\\s]*?)*), runder\s([a-zA-Z]:\\[^\\s](?:\s[^\\s]*?)*), rfrom\s([a-zA-Z]:\\[^\\s](?:\s[^\\s]*?)*), rin\s([a-zA-Z]:), runder\s([a-zA-Z]:), rfrom\s([a-zA-Z]:) ] for pattern in path_patterns: match re.search(pattern, query_lower) if match: path match.group(1).strip() if os.path.exists(path): return path if : in path and not path.endswith(\\): fixed_path path \\ if os.path.exists(fixed_path): return fixed_path # Look for common directory references if desktop in query_lower: desktop_path os.path.join(os.path.expanduser(~), Desktop) if os.path.exists(desktop_path): return desktop_path if downloads in query_lower or download in query_lower: downloads_path os.path.join(os.path.expanduser(~), Downloads) if os.path.exists(downloads_path): return downloads_path if documents in query_lower or document in query_lower: documents_path os.path.join(os.path.expanduser(~), Documents) if os.path.exists(documents_path): return documents_path # Default to system-wide search if os.name nt: # Windows return SYSTEM_WIDE else: return / def search(self, query: str) - List[Dict[str, Any]]: Perform search based on natural language query Args: query: Natural language query Returns: List of matching files print(fUnderstanding query: {query}) params self.understand_query_with_llm(query) print(fParsed parameters: {params}) # Debug the parameters if params.get(days_old) is not None: target_date datetime.now() - timedelta(daysparams[days_old]) print(fDEBUG: Target date for filtering: {target_date}) # Handle system-wide search if params[search_path] SYSTEM_WIDE: print(Performing system-wide search...) all_results [] print(fSYSTEM_WIDE search) # Search common drives on Windows for drive in self.common_drives: drive_path drive \\ if os.path.exists(drive_path): print(fSearching in {drive_path}...) params_copy params.copy() params_copy[search_path] drive_path results self.advanced_search(query, **params_copy) all_results.extend(results) return all_results else: print(f search in path {params[search_path]}) # Validate path exists if not os.path.exists(params[search_path]): print(fPath {params[search_path]} does not exist, searching in current directory) params[search_path] . # Use the advanced search with parsed parameters return self.advanced_search(query, **params) def advanced_search(self, original_query: str, name_pattern: Optional[str] None, extensions: Optional[List[str]] None, min_size: Optional[int] None, max_size: Optional[int] None, days_old: Optional[int] None, keyword: Optional[str] None, search_path: str ., recursive: bool True) - List[Dict[str, Any]]: Perform advanced search with multiple criteria print(fEnter in advanced_search) results [] # Normalize search path if search_path .: search_path os.getcwd() else: search_path os.path.abspath(search_path) print(fAdvanced search searching in {search_path}) print(fSearching for files matching criteria:) print(f name_pattern: {name_pattern if name_pattern else all}) print(f extensions: {extensions if extensions else all}) print(f min_size: {min_size if min_size else all}) print(f max_size: {max_size if max_size else all}) print(f days_old: {days_old if days_old else all}) print(f keyword: {keyword if keyword else all}) files_processed 0 files_matched 0 try: if recursive: print(Starting recursive search...) for root, dirs, files in os.walk(search_path): # Skip system directories that often cause permission issues dirs[:] [d for d in dirs if not d.startswith(($, System Volume Information))] for filename in files: files_processed 1 file_path os.path.join(root, filename) # Limit debug output to avoid overwhelming logs if files_processed 20 or files_matched 5: match_result self._matches_criteria( file_path, name_pattern, extensions, min_size, max_size, days_old, keyword) else: # After initial files, suppress debug output but still check # Temporarily disable print for performance import sys, io old_stdout sys.stdout sys.stdout io.StringIO() match_result self._matches_criteria( file_path, name_pattern, extensions, min_size, max_size, days_old, keyword) sys.stdout old_stdout if match_result: files_matched 1 results.append(self._get_file_info(file_path)) # Show first few matches if files_matched 10: print(fMATCH #{files_matched}: {file_path}) # Progress indicator for large directories if files_processed % 1000 0: print(fProcessed {files_processed} files, found {files_matched} matches so far...) else: print(Starting non-recursive search...) with os.scandir(search_path) as entries: for entry in entries: if entry.is_file(): files_processed 1 match_result self._matches_criteria( entry.path, name_pattern, extensions, min_size, max_size, days_old, keyword) if match_result: files_matched 1 results.append(self._get_file_info(entry.path)) # Show first few matches if files_matched 10: print(fMATCH #{files_matched}: {entry.path}) print(fSearch complete. Files processed: {files_processed}, Files matched: {files_matched}) print(fRecords found: {len(results)}) except PermissionError as e: print(fPermission denied accessing some directories: {e}) except Exception as e: print(fError during search: {e}) import traceback traceback.print_exc() self.search_history.append({ type: natural_language, query: original_query, criteria: { name_pattern: name_pattern, extensions: extensions, min_size: min_size, max_size: max_size, days_old: days_old, keyword: keyword, path: search_path }, path: search_path, results: len(results), timestamp: datetime.now() }) print(fLeave in advanced_search) return results def _matches_criteria(self, file_path: str, name_pattern: Optional[str], extensions: Optional[List[str]], min_size: Optional[int], max_size: Optional[int], days_old: Optional[int], keyword: Optional[str]) - bool: Check if a file matches all specified criteria #print(f\n CHECKING FILE: {file_path} ) # Name pattern check if name_pattern: filename os.path.basename(file_path) pattern name_pattern #print(fName pattern check: looking for {pattern} in {filename}) # Handle case-insensitive matching filename_lower filename.lower() pattern_lower pattern.lower() # If pattern contains wildcards, use fnmatch if * in pattern_lower or ? in pattern_lower: match_result fnmatch.fnmatch(filename_lower, pattern_lower) #print(fWildcard match {pattern_lower} with {filename_lower}: {match_result}) if not match_result: #print(fREJECTED: Name pattern wildcard mismatch) return False else: # Exact substring matching (case insensitive) substring_match pattern_lower in filename_lower print(fSubstring match {pattern_lower} in {filename_lower}: {substring_match}) if not substring_match: #print(fREJECTED: Name pattern substring mismatch) return False else: print(No name pattern specified) # Extension check if extensions: _, ext os.path.splitext(file_path) # Normalize extensions for comparison normalized_extensions [] for e in extensions: if e.startswith(.): normalized_extensions.append(e.lower()) else: normalized_extensions.append(. e.lower()) ext_check ext.lower() in normalized_extensions print(fExtension check: file has {ext.lower()}, looking for {normalized_extensions}, match: {ext_check}) if not ext_check: print(fREJECTED: Extension mismatch) return False else: print(No extension filter specified) # Size check if min_size is not None or max_size is not None: try: size os.path.getsize(file_path) min_check min_size is None or size min_size max_check max_size is None or size max_size print(fSize check: file size {size}, min {min_size}, max {max_size}) print(fMin check: {min_check}, Max check: {max_check}) if not (min_check and max_check): print(fREJECTED: Size mismatch) return False except (OSError, PermissionError) as e: print(fWARNING: Cannot access file size: {e}) # Dont reject based on size if we cant read it pass # Date check if days_old is not None: try: mod_time datetime.fromtimestamp(os.path.getmtime(file_path)) target_date datetime.now() - timedelta(daysdays_old) print(fDate check:) print(f File modification time: {mod_time}) print(f Target date (since): {target_date}) print(f Days old parameter: {days_old}) print(f Comparison: {mod_time} {target_date} {mod_time target_date}) # For updated since [date], we want files NEWER than or equal to that date if mod_time target_date: print(fREJECTED: File is older than target date) return False else: print(fACCEPTED: File is newer than or equal to target date) except (OSError, PermissionError) as e: print(fWARNING: Cannot access file modification time: {e}) # Dont reject based on date if we cant read it pass else: print(No date filter specified) # Content check Currently disabled if keyword: keyword_found self._file_contains_keyword(file_path, keyword) print(fKeyword check: looking for {keyword}, found: {keyword_found}) if not keyword_found: print(fREJECTED: Keyword {keyword} not found in file content) return False else: print(fACCEPTED: Keyword {keyword} found in file content) else: print(No keyword filter specified) print(fFINAL RESULT: File ACCEPTED) return True def _test_pattern_matching(self): Test function to verify pattern matching works correctly test_cases [ (*resume*, my_resume.pdf, True), (*resume*, resume_final.docx, True), (*resume*, Resume.pdf, True), # Case insensitive (*resume*, application.txt, False), (resume*, resume_draft.pdf, True), (*resume, final_resume.pdf, True), ] print(\n PATTERN MATCHING TESTS ) for pattern, filename, expected in test_cases: result False if * in pattern or ? in pattern: result fnmatch.fnmatch(filename.lower(), pattern.lower()) else: result pattern.lower() in filename.lower() status PASS if result expected else FAIL print(f{status}: Pattern {pattern} with {filename} - {result} (expected {expected})) print( END TESTS \n) def _get_file_info(self, file_path: str) - Dict[str, Any]: Get detailed information about a file try: stat os.stat(file_path) return { path: file_path, name: os.path.basename(file_path), size: stat.st_size, modified: datetime.fromtimestamp(stat.st_mtime), created: datetime.fromtimestamp(stat.st_ctime), extension: os.path.splitext(file_path)[1], directory: os.path.dirname(file_path) } except (OSError, PermissionError): return { path: file_path, name: os.path.basename(file_path), error: Unable to access file information } def _file_contains_keyword(self, file_path: str, keyword: str) - bool: Check if a file contains a keyword in its content try: # Only check text-based files text_extensions [.txt, .py, .js, .html, .css, .csv, .md, .json, .xml] _, ext os.path.splitext(file_path) if ext.lower() not in text_extensions: return False # Skip very large files if os.path.getsize(file_path) 10 * 1024 * 1024: # 10MB limit return False with open(file_path, r, encodingutf-8, errorsignore) as f: content f.read() return keyword.lower() in content.lower() except (OSError, PermissionError, UnicodeDecodeError): return False def _calculate_days_from_date_string(self, date_str: str) - Optional[int]: Calculate days old from a date string like Nov 1st 2025 Args: date_str: Date string in format like Nov 1st 2025 Returns: Number of days between now and the given date (positive if past, negative if future) try: # Clean up the date string # Remove ordinal suffixes (st, nd, rd, th) date_str re.sub(r(\d)(st|nd|rd|th), r\1, date_str) print(fDEBUG: Parsing date string: {date_str}) # Parse the date date_obj datetime.strptime(date_str, %b %d %Y) print(fDEBUG: Parsed date object: {date_obj}) # Calculate difference in days delta datetime.now() - date_obj days delta.days print(fDEBUG: Days difference: {days}) return days except Exception as e: print(fError parsing date string {date_str}: {e}) return None def get_search_history(self) - List[Dict]: Get history of all searches performed return self.search_history def clear_search_history(self): Clear search history self.search_history.clear() def format_file_info(self, file_info: Dict[str, Any]) - str: Format file information for display, including modification time Args: file_info: Dictionary containing file information Returns: Formatted string with file details try: # Format file size size file_info[size] if size 1024: size_str f{size} B elif size 1024 * 1024: size_str f{size // 1024} KB elif size 1024 * 1024 * 1024: size_str f{size // (1024 * 1024)} MB else: size_str f{size // (1024 * 1024 * 1024)} GB # Format modification time mod_time file_info[modified] mod_time_str mod_time.strftime(%Y-%m-%d %H:%M:%S) # Return formatted string return f{file_info[name]} ({size_str}, modified: {mod_time_str}) except Exception as e: # Fallback if theres an error in formatting return f{file_info[name]} ({file_info[size]} bytes) # Example usage if __name__ __main__: # Create agent instance agent NaturalLanguageFileSearchAgent() # Run pattern matching tests agent._test_pattern_matching() #exit() # Test queries test_queries [ Please help search the file name contains resume.pdf, Find recent Word documents from last 10 days, Show me PDF files on my desktop, Look for images in my downloads folder from last week ] print(Testing natural language file search agent with Ollama/Qwen2.5:) print( * 60) for query in test_queries: print(f\nQuery: {query}) try: result agent.search(query) print(fFound {len(result)} files) for file in result[:3]: # Show first 3 results print(f - {agent.format_file_info(file)}) except Exception as e: print(fError: {e}) print(- * 40) try: result agent.search(find the pdf file name contains resume in disk D:\\) #result agent.search(find the pdf file name contains resume in disk D:\\ updated since Nov 1st 2025) print(fFound {len(result)} files) for file in result[:10]: # Show first 10 results print(f - {agent.format_file_info(file)}) except Exception as e: print(fError: {e}) print(- * 40)可以根据本地测试文件的条件修改main函数中的测试的Query在Console中执行python filename.py 测试Agent是否可以正常工作。4、构建Web页面Web页面主要包含一个输入框和一个查询的按钮由于查询时间可能比较长再加上了一个进度条。查询成功之后查询获得的文件以表格的形式显示在输入框的下方。代码如下import streamlit as st import os import sys import traceback import numpy as np import sounddevice as sd import scipy.io.wavfile as wav import speech_recognition as sr from scipy.io.wavfile import write import tempfile # Set page configuration st.set_page_config( page_titleFile Search Agent, page_icon, layoutwide ) # Custom CSS for better appearance st.markdown( style .stProgress div div div { background-color: #4CAF50; } .file-card { border: 1px solid #ddd; border-radius: 5px; padding: 10px; margin: 5px 0; background-color: #f9f9f9; } .file-name { font-weight: bold; color: #2c3e50; } .file-details { font-size: 0.9em; color: #7f8c8d; } .search-history { background-color: #ecf0f1; padding: 10px; border-radius: 5px; margin-top: 20px; } .status-message { padding: 10px; border-radius: 5px; margin: 10px 0; } .recording { background-color: #f44336 !important; animation: pulse 1s infinite; } keyframes pulse { 0% { opacity: 1; } 50% { opacity: 0.5; } 100% { opacity: 1; } } /style , unsafe_allow_htmlTrue) def format_file_size(size_bytes): Format file size in human readable format if size_bytes 1024: return f{size_bytes} B elif size_bytes 1024 * 1024: return f{size_bytes // 1024} KB elif size_bytes 1024 * 1024 * 1024: return f{size_bytes // (1024 * 1024)} MB else: return f{size_bytes // (1024 * 1024 * 1024)} GB st.cache_resource def get_search_agent(): Initialize and cache the search agent try: # Import here to isolate potential issues from file_search_nlp_agent import NaturalLanguageFileSearchAgent return NaturalLanguageFileSearchAgent() except Exception as e: st.error(fFailed to initialize search agent: {str(e)}) st.error(fTraceback: {traceback.format_exc()}) return None def initialize_session_state(): Initialize session state variables if search_results not in st.session_state: st.session_state.search_results [] if search_history not in st.session_state: st.session_state.search_history [] if is_searching not in st.session_state: st.session_state.is_searching False if current_query not in st.session_state: st.session_state.current_query if is_recording not in st.session_state: st.session_state.is_recording False if voice_query not in st.session_state: st.session_state.voice_query return True def add_to_search_history(query): Add query to search history if query not in [h[query] for h in st.session_state.search_history]: import pandas as pd st.session_state.search_history.append({ query: query, timestamp: pd.Timestamp.now() }) def record_audio(duration5, sample_rate44100): Record audio using sounddevice and convert to text try: st.info( Recording... Please speak now.) # Record audio audio_data sd.rec(int(duration * sample_rate), sampleratesample_rate, channels1, dtypenp.int16) sd.wait() # Wait until recording is finished # Save to temporary WAV file with tempfile.NamedTemporaryFile(suffix.wav, deleteFalse) as tmp_file: wav_file tmp_file.name write(wav_file, sample_rate, audio_data) # Use speech recognition recognizer sr.Recognizer() with sr.AudioFile(wav_file) as source: audio recognizer.record(source) # Convert to text st.info( Converting speech to text...) text recognizer.recognize_google(audio) st.success(f✅ Recognized: {text}) # Clean up temporary file os.unlink(wav_file) return text except sr.UnknownValueError: st.error(❓ Could not understand audio. Please try again.) return None except sr.RequestError as e: st.error(f Speech recognition service error: {e}) return None except Exception as e: st.error(f❌ Error recording audio: {str(e)}) return None def display_search_results(): Display search results in a formatted way if st.session_state.search_results is not None: st.divider() st.header(f Search Results ({len(st.session_state.search_results)} files found)) # Show results count st.markdown(fShowing results for: **{st.session_state.current_query}**) # Display results in a table if st.session_state.search_results: # Prepare data for dataframe display_data [] for file_info in st.session_state.search_results: try: display_data.append({ File Name: file_info.get(name, N/A), Size: format_file_size(file_info.get(size, 0)), Modified: file_info.get(modified, ).strftime(%Y-%m-%d %H:%M:%S) if file_info.get(modified) else N/A, Directory: file_info.get(directory, N/A)[:50] ... if len(file_info.get(directory, )) 50 else file_info.get(directory, N/A) }) except Exception: continue # Display as dataframe import pandas as pd df pd.DataFrame(display_data) st.dataframe(df, use_container_widthTrue, height400) # Option to show detailed view st.divider() st.subheader( Detailed View) num_to_show st.slider(Number of files to display, 1, min(50, len(st.session_state.search_results)), 10) for i, file_info in enumerate(st.session_state.search_results[:num_to_show]): with st.container(): st.markdown(f div classfile-card div classfile-name{file_info.get(name, N/A)}/div div classfile-details strongPath:/strong {file_info.get(path, N/A)}br strongSize:/strong {format_file_size(file_info.get(size, 0))}br strongModified:/strong {file_info.get(modified, ).strftime(%Y-%m-%d %H:%M:%S) if file_info.get(modified) else N/A}br strongCreated:/strong {file_info.get(created, ).strftime(%Y-%m-%d %H:%M:%S) if file_info.get(created) else N/A}br strongExtension:/strong {file_info.get(extension, N/A)} /div /div , unsafe_allow_htmlTrue) st.markdown(---) else: # Display message when no results found st.info( No files found matching your query. Try adjusting your search terms or checking the search path.) # Add some helpful suggestions st.markdown( ** Tips for better search results:** - Check if the file path exists - Try using broader search terms - Verify file extensions (e.g., .pdf, .docx) - Make sure you have permissions to access the location - Try searching in a different directory ) def main(): st.title( File Search Agent) st.markdown(Search for files on your computer using natural language queries) # Initialize session state if not initialize_session_state(): st.stop() # Get search agent search_agent get_search_agent() if search_agent is None: st.warning(Search agent is not available. Some features may not work.) return # Sidebar with st.sidebar: st.header(⚙️ Settings) model st.selectbox(Select Model, [qwen2.5], index0, disabledst.session_state.is_searching) st.divider() st.header(ℹ️ About) st.markdown( This agent can search for files using natural language queries such as: - Find PDF files on my desktop - Look for resume.docx in D:\\ - Show me images from last week - Find large video files (100MB) ) # Display search history if st.session_state.search_history: st.divider() st.header( Recent Searches) # Create a copy to avoid issues with reversed iterator history_items list(reversed(st.session_state.search_history[-5:])) for i, history_item in enumerate(history_items): button_key fhistory_{i}_{hash(history_item[query])} # Unique key if st.button(f{history_item[query][:30]}{... if len(history_item[query]) 30 else }, keybutton_key, helphistory_item[query], disabledst.session_state.is_searching): st.session_state.current_query history_item[query] st.session_state.is_searching True st.rerun() # Main content st.subheader(Enter your search query) # Voice input option col_voice1, col_voice2, col_voice3 st.columns([1, 2, 2]) with col_voice1: voice_input st.checkbox( Enable Voice Input, keyvoice_input_checkbox, disabledst.session_state.is_searching) with col_voice2: if voice_input: record_duration st.slider(Recording Duration (seconds), 3, 10, 5) with col_voice3: if voice_input: if st.button(️ Record Query, keyrecord_button, disabledst.session_state.is_searching, typeprimary): st.session_state.is_recording True st.rerun() # Handle voice recording if st.session_state.is_recording: with st.spinner( Recording... Please speak now): voice_query record_audio(durationrecord_duration) if voice_query: st.session_state.voice_query voice_query st.session_state.current_query voice_query st.session_state.is_recording False st.rerun() # Display recognized voice query if st.session_state.voice_query and voice_input: st.info(f Recognized voice query: **{st.session_state.voice_query}**) col1, col2 st.columns([3, 1]) with col1: query st.text_input(Enter your search query:, placeholdere.g., Find PDF files on my desktop, keyquery_input, valuest.session_state.current_query, disabledst.session_state.is_searching) with col2: st.write() # Empty space for alignment st.write() # Empty space for alignment search_button st.button( Search, typeprimary, use_container_widthTrue, disabledst.session_state.is_searching) # Handle search if search_button and (query or st.session_state.voice_query): search_query query if query else st.session_state.voice_query st.session_state.current_query search_query st.session_state.is_searching True add_to_search_history(search_query) # Show progress status_placeholder st.empty() progress_bar st.progress(0) status_placeholder.markdown(div classstatus-message stylebackground-color: #e3f2fd; Searching for files.../div, unsafe_allow_htmlTrue) progress_bar.progress(25) try: # Perform search progress_bar.progress(50) status_placeholder.markdown(div classstatus-message stylebackground-color: #e3f2fd; Analyzing query.../div, unsafe_allow_htmlTrue) results search_agent.search(search_query) st.session_state.search_results results progress_bar.progress(75) status_placeholder.markdown(div classstatus-message stylebackground-color: #e3f2fd; Formatting results.../div, unsafe_allow_htmlTrue) # Update UI progress_bar.progress(100) if len(results) 0: status_placeholder.markdown(fdiv classstatus-message stylebackground-color: #c8e6c9;✅ Search completed! Found {len(results)} files./div, unsafe_allow_htmlTrue) else: status_placeholder.markdown(fdiv classstatus-message stylebackground-color: #fff3cd; color: #856404; Search completed. No files found matching your query./div, unsafe_allow_htmlTrue) # Reset searching state st.session_state.is_searching False st.session_state.voice_query # Clear voice query after search # Rerun to update the UI st.rerun() except Exception as e: st.session_state.is_searching False status_placeholder.markdown(fdiv classstatus-message stylebackground-color: #ffcdd2;❌ Error: {str(e)}/div, unsafe_allow_htmlTrue) progress_bar.empty() st.error(fAn error occurred during search: {str(e)}) st.error(fDetails: {traceback.format_exc()}) # Display results display_search_results() # Welcome message for first-time users if not st.session_state.search_results and not query and not st.session_state.voice_query: st.info( Tip: Enter a natural language query above to search for files. Examples:\n\n - Find PDF files on my desktop\n - Look for resume.docx in D:\\\n - Show me images from last week\n - Find large video files (100MB)\n\n Enable voice input to speak your query instead of typing!) if __name__ __main__: # Check if required libraries are available try: import sounddevice as sd import scipy import speech_recognition as sr except ImportError as e: st.error(fRequired libraries not found: {str(e)}) st.error(Please install required libraries:) st.code(pip install sounddevice scipy SpeechRecognition) st.stop() main()运行Web页面程序streamlit run file_search_app.py就可以在跳出的Web页面里输入想要查询的文件文件的详细信息显示如下5、总结Windows默认的文件查找功能一般不能满足各种查找需求可以通过这个Agent按照文件的需求查找各种条件的需求。关于文件内容的关键词的查找由于比较耗时比较少使用目前代码注释了。 而且关于语音输入的功能尚未调试完成有兴趣的可以进一步完善。本Agent在实现的过程中使用了AI工具通义千问工具辅助不过在使用过程中也遇到了一些幻觉如下需要在原来代码的基础上加上日志打印参数解析步骤和结果以及在文件系统的查询结果才能够清晰地看出问题。 找到问题之后通过Query让工具在进一步优化工具还是能够胜任的。