import os import csv import re import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def is_valid_email(email): """Check if the email is valid and meets additional criteria.""" # Basic structure check if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email): return False # Check for common issues if email.startswith(('.', '-', '+')) or email.endswith('.'): return False if '..' in email: return False if '@.' in email or '.@' in email: return False # Check local part and domain local, domain = email.split('@') if len(local) > 64 or len(domain) > 255 or len(local) < 2: # Minimum local part length of 2 return False if domain.endswith('.'): return False # Additional checks if len(email) < 6: # Enforce a minimum total length return False return True def extract_emails(text): """Extract valid email addresses from a given text.""" email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' potential_emails = re.findall(email_pattern, text) return [email for email in potential_emails if is_valid_email(email)] def process_directory(directory): """Process all CSV files in a directory and extract emails.""" emails = set() csv_count = 0 for filename in os.listdir(directory): if filename.endswith('.csv'): csv_count += 1 filepath = os.path.join(directory, filename) try: with open(filepath, 'r', encoding='utf-8', errors='ignore') as csvfile: reader = csv.reader(csvfile) for row in reader: for cell in row: emails.update(extract_emails(cell)) except Exception as e: logging.error(f"Error processing {filepath}: {str(e)}") # Write emails to a text file output_filename = f"{os.path.basename(directory)}.txt" with open(output_filename, 'w', encoding='utf-8') as outfile: for email in sorted(emails): outfile.write(email + '\n') logging.info(f"Processed {csv_count} CSV files in {directory}") logging.info(f"Extracted {len(emails)} unique valid emails to {output_filename}") def main(): current_dir = os.getcwd() for item in os.listdir(current_dir): if os.path.isdir(item): logging.info(f"Processing directory: {item}") process_directory(item) if __name__ == "__main__": main()