extract-emails.py
· 2.5 KiB · Python
Brut
import os
import csv
import re
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def is_valid_email(email):
"""Check if the email is valid and meets additional criteria."""
# Basic structure check
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email):
return False
# Check for common issues
if email.startswith(('.', '-', '+')) or email.endswith('.'):
return False
if '..' in email:
return False
if '@.' in email or '.@' in email:
return False
# Check local part and domain
local, domain = email.split('@')
if len(local) > 64 or len(domain) > 255 or len(local) < 2: # Minimum local part length of 2
return False
if domain.endswith('.'):
return False
# Additional checks
if len(email) < 6: # Enforce a minimum total length
return False
return True
def extract_emails(text):
"""Extract valid email addresses from a given text."""
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
potential_emails = re.findall(email_pattern, text)
return [email for email in potential_emails if is_valid_email(email)]
def process_directory(directory):
"""Process all CSV files in a directory and extract emails."""
emails = set()
csv_count = 0
for filename in os.listdir(directory):
if filename.endswith('.csv'):
csv_count += 1
filepath = os.path.join(directory, filename)
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
for cell in row:
emails.update(extract_emails(cell))
except Exception as e:
logging.error(f"Error processing {filepath}: {str(e)}")
# Write emails to a text file
output_filename = f"{os.path.basename(directory)}.txt"
with open(output_filename, 'w', encoding='utf-8') as outfile:
for email in sorted(emails):
outfile.write(email + '\n')
logging.info(f"Processed {csv_count} CSV files in {directory}")
logging.info(f"Extracted {len(emails)} unique valid emails to {output_filename}")
def main():
current_dir = os.getcwd()
for item in os.listdir(current_dir):
if os.path.isdir(item):
logging.info(f"Processing directory: {item}")
process_directory(item)
if __name__ == "__main__":
main()
1 | import os |
2 | import csv |
3 | import re |
4 | import logging |
5 | |
6 | # Set up logging |
7 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
8 | |
9 | def is_valid_email(email): |
10 | """Check if the email is valid and meets additional criteria.""" |
11 | # Basic structure check |
12 | if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email): |
13 | return False |
14 | |
15 | # Check for common issues |
16 | if email.startswith(('.', '-', '+')) or email.endswith('.'): |
17 | return False |
18 | if '..' in email: |
19 | return False |
20 | if '@.' in email or '.@' in email: |
21 | return False |
22 | |
23 | # Check local part and domain |
24 | local, domain = email.split('@') |
25 | if len(local) > 64 or len(domain) > 255 or len(local) < 2: # Minimum local part length of 2 |
26 | return False |
27 | if domain.endswith('.'): |
28 | return False |
29 | |
30 | # Additional checks |
31 | if len(email) < 6: # Enforce a minimum total length |
32 | return False |
33 | |
34 | return True |
35 | |
36 | def extract_emails(text): |
37 | """Extract valid email addresses from a given text.""" |
38 | email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
39 | potential_emails = re.findall(email_pattern, text) |
40 | return [email for email in potential_emails if is_valid_email(email)] |
41 | |
42 | def process_directory(directory): |
43 | """Process all CSV files in a directory and extract emails.""" |
44 | emails = set() |
45 | csv_count = 0 |
46 | for filename in os.listdir(directory): |
47 | if filename.endswith('.csv'): |
48 | csv_count += 1 |
49 | filepath = os.path.join(directory, filename) |
50 | try: |
51 | with open(filepath, 'r', encoding='utf-8', errors='ignore') as csvfile: |
52 | reader = csv.reader(csvfile) |
53 | for row in reader: |
54 | for cell in row: |
55 | emails.update(extract_emails(cell)) |
56 | except Exception as e: |
57 | logging.error(f"Error processing {filepath}: {str(e)}") |
58 | |
59 | # Write emails to a text file |
60 | output_filename = f"{os.path.basename(directory)}.txt" |
61 | with open(output_filename, 'w', encoding='utf-8') as outfile: |
62 | for email in sorted(emails): |
63 | outfile.write(email + '\n') |
64 | |
65 | logging.info(f"Processed {csv_count} CSV files in {directory}") |
66 | logging.info(f"Extracted {len(emails)} unique valid emails to {output_filename}") |
67 | |
68 | def main(): |
69 | current_dir = os.getcwd() |
70 | for item in os.listdir(current_dir): |
71 | if os.path.isdir(item): |
72 | logging.info(f"Processing directory: {item}") |
73 | process_directory(item) |
74 | |
75 | if __name__ == "__main__": |
76 | main() |