extract-emails.py(檔案已創建)
| @@ -0,0 +1,76 @@ | |||
| 1 | + | import os | |
| 2 | + | import csv | |
| 3 | + | import re | |
| 4 | + | import logging | |
| 5 | + | ||
| 6 | + | # Set up logging | |
| 7 | + | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| 8 | + | ||
| 9 | + | def is_valid_email(email): | |
| 10 | + | """Check if the email is valid and meets additional criteria.""" | |
| 11 | + | # Basic structure check | |
| 12 | + | if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email): | |
| 13 | + | return False | |
| 14 | + | ||
| 15 | + | # Check for common issues | |
| 16 | + | if email.startswith(('.', '-', '+')) or email.endswith('.'): | |
| 17 | + | return False | |
| 18 | + | if '..' in email: | |
| 19 | + | return False | |
| 20 | + | if '@.' in email or '.@' in email: | |
| 21 | + | return False | |
| 22 | + | ||
| 23 | + | # Check local part and domain | |
| 24 | + | local, domain = email.split('@') | |
| 25 | + | if len(local) > 64 or len(domain) > 255 or len(local) < 2: # Minimum local part length of 2 | |
| 26 | + | return False | |
| 27 | + | if domain.endswith('.'): | |
| 28 | + | return False | |
| 29 | + | ||
| 30 | + | # Additional checks | |
| 31 | + | if len(email) < 6: # Enforce a minimum total length | |
| 32 | + | return False | |
| 33 | + | ||
| 34 | + | return True | |
| 35 | + | ||
| 36 | + | def extract_emails(text): | |
| 37 | + | """Extract valid email addresses from a given text.""" | |
| 38 | + | email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
| 39 | + | potential_emails = re.findall(email_pattern, text) | |
| 40 | + | return [email for email in potential_emails if is_valid_email(email)] | |
| 41 | + | ||
| 42 | + | def process_directory(directory): | |
| 43 | + | """Process all CSV files in a directory and extract emails.""" | |
| 44 | + | emails = set() | |
| 45 | + | csv_count = 0 | |
| 46 | + | for filename in os.listdir(directory): | |
| 47 | + | if filename.endswith('.csv'): | |
| 48 | + | csv_count += 1 | |
| 49 | + | filepath = os.path.join(directory, filename) | |
| 50 | + | try: | |
| 51 | + | with open(filepath, 'r', encoding='utf-8', errors='ignore') as csvfile: | |
| 52 | + | reader = csv.reader(csvfile) | |
| 53 | + | for row in reader: | |
| 54 | + | for cell in row: | |
| 55 | + | emails.update(extract_emails(cell)) | |
| 56 | + | except Exception as e: | |
| 57 | + | logging.error(f"Error processing {filepath}: {str(e)}") | |
| 58 | + | ||
| 59 | + | # Write emails to a text file | |
| 60 | + | output_filename = f"{os.path.basename(directory)}.txt" | |
| 61 | + | with open(output_filename, 'w', encoding='utf-8') as outfile: | |
| 62 | + | for email in sorted(emails): | |
| 63 | + | outfile.write(email + '\n') | |
| 64 | + | ||
| 65 | + | logging.info(f"Processed {csv_count} CSV files in {directory}") | |
| 66 | + | logging.info(f"Extracted {len(emails)} unique valid emails to {output_filename}") | |
| 67 | + | ||
| 68 | + | def main(): | |
| 69 | + | current_dir = os.getcwd() | |
| 70 | + | for item in os.listdir(current_dir): | |
| 71 | + | if os.path.isdir(item): | |
| 72 | + | logging.info(f"Processing directory: {item}") | |
| 73 | + | process_directory(item) | |
| 74 | + | ||
| 75 | + | if __name__ == "__main__": | |
| 76 | + | main() | |
上一頁
下一頁