Malin hat die Gist bearbeitet . Zu Änderung gehen
1 file changed, 76 insertions
extract-emails.py(Datei erstellt)
@@ -0,0 +1,76 @@ | |||
1 | + | import os | |
2 | + | import csv | |
3 | + | import re | |
4 | + | import logging | |
5 | + | ||
6 | + | # Set up logging | |
7 | + | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
8 | + | ||
9 | + | def is_valid_email(email): | |
10 | + | """Check if the email is valid and meets additional criteria.""" | |
11 | + | # Basic structure check | |
12 | + | if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email): | |
13 | + | return False | |
14 | + | ||
15 | + | # Check for common issues | |
16 | + | if email.startswith(('.', '-', '+')) or email.endswith('.'): | |
17 | + | return False | |
18 | + | if '..' in email: | |
19 | + | return False | |
20 | + | if '@.' in email or '.@' in email: | |
21 | + | return False | |
22 | + | ||
23 | + | # Check local part and domain | |
24 | + | local, domain = email.split('@') | |
25 | + | if len(local) > 64 or len(domain) > 255 or len(local) < 2: # Minimum local part length of 2 | |
26 | + | return False | |
27 | + | if domain.endswith('.'): | |
28 | + | return False | |
29 | + | ||
30 | + | # Additional checks | |
31 | + | if len(email) < 6: # Enforce a minimum total length | |
32 | + | return False | |
33 | + | ||
34 | + | return True | |
35 | + | ||
36 | + | def extract_emails(text): | |
37 | + | """Extract valid email addresses from a given text.""" | |
38 | + | email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
39 | + | potential_emails = re.findall(email_pattern, text) | |
40 | + | return [email for email in potential_emails if is_valid_email(email)] | |
41 | + | ||
42 | + | def process_directory(directory): | |
43 | + | """Process all CSV files in a directory and extract emails.""" | |
44 | + | emails = set() | |
45 | + | csv_count = 0 | |
46 | + | for filename in os.listdir(directory): | |
47 | + | if filename.endswith('.csv'): | |
48 | + | csv_count += 1 | |
49 | + | filepath = os.path.join(directory, filename) | |
50 | + | try: | |
51 | + | with open(filepath, 'r', encoding='utf-8', errors='ignore') as csvfile: | |
52 | + | reader = csv.reader(csvfile) | |
53 | + | for row in reader: | |
54 | + | for cell in row: | |
55 | + | emails.update(extract_emails(cell)) | |
56 | + | except Exception as e: | |
57 | + | logging.error(f"Error processing {filepath}: {str(e)}") | |
58 | + | ||
59 | + | # Write emails to a text file | |
60 | + | output_filename = f"{os.path.basename(directory)}.txt" | |
61 | + | with open(output_filename, 'w', encoding='utf-8') as outfile: | |
62 | + | for email in sorted(emails): | |
63 | + | outfile.write(email + '\n') | |
64 | + | ||
65 | + | logging.info(f"Processed {csv_count} CSV files in {directory}") | |
66 | + | logging.info(f"Extracted {len(emails)} unique valid emails to {output_filename}") | |
67 | + | ||
68 | + | def main(): | |
69 | + | current_dir = os.getcwd() | |
70 | + | for item in os.listdir(current_dir): | |
71 | + | if os.path.isdir(item): | |
72 | + | logging.info(f"Processing directory: {item}") | |
73 | + | process_directory(item) | |
74 | + | ||
75 | + | if __name__ == "__main__": | |
76 | + | main() |
Neuer
Älter