extract-emails.py
· 2.5 KiB · Python
Raw
import os
import csv
import re
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def is_valid_email(email):
"""Check if the email is valid and meets additional criteria."""
# Basic structure check
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email):
return False
# Check for common issues
if email.startswith(('.', '-', '+')) or email.endswith('.'):
return False
if '..' in email:
return False
if '@.' in email or '.@' in email:
return False
# Check local part and domain
local, domain = email.split('@')
if len(local) > 64 or len(domain) > 255 or len(local) < 2: # Minimum local part length of 2
return False
if domain.endswith('.'):
return False
# Additional checks
if len(email) < 6: # Enforce a minimum total length
return False
return True
def extract_emails(text):
"""Extract valid email addresses from a given text."""
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
potential_emails = re.findall(email_pattern, text)
return [email for email in potential_emails if is_valid_email(email)]
def process_directory(directory):
"""Process all CSV files in a directory and extract emails."""
emails = set()
csv_count = 0
for filename in os.listdir(directory):
if filename.endswith('.csv'):
csv_count += 1
filepath = os.path.join(directory, filename)
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
for cell in row:
emails.update(extract_emails(cell))
except Exception as e:
logging.error(f"Error processing {filepath}: {str(e)}")
# Write emails to a text file
output_filename = f"{os.path.basename(directory)}.txt"
with open(output_filename, 'w', encoding='utf-8') as outfile:
for email in sorted(emails):
outfile.write(email + '\n')
logging.info(f"Processed {csv_count} CSV files in {directory}")
logging.info(f"Extracted {len(emails)} unique valid emails to {output_filename}")
def main():
current_dir = os.getcwd()
for item in os.listdir(current_dir):
if os.path.isdir(item):
logging.info(f"Processing directory: {item}")
process_directory(item)
if __name__ == "__main__":
main()
| 1 | import os |
| 2 | import csv |
| 3 | import re |
| 4 | import logging |
| 5 | |
| 6 | # Set up logging |
| 7 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| 8 | |
| 9 | def is_valid_email(email): |
| 10 | """Check if the email is valid and meets additional criteria.""" |
| 11 | # Basic structure check |
| 12 | if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email): |
| 13 | return False |
| 14 | |
| 15 | # Check for common issues |
| 16 | if email.startswith(('.', '-', '+')) or email.endswith('.'): |
| 17 | return False |
| 18 | if '..' in email: |
| 19 | return False |
| 20 | if '@.' in email or '.@' in email: |
| 21 | return False |
| 22 | |
| 23 | # Check local part and domain |
| 24 | local, domain = email.split('@') |
| 25 | if len(local) > 64 or len(domain) > 255 or len(local) < 2: # Minimum local part length of 2 |
| 26 | return False |
| 27 | if domain.endswith('.'): |
| 28 | return False |
| 29 | |
| 30 | # Additional checks |
| 31 | if len(email) < 6: # Enforce a minimum total length |
| 32 | return False |
| 33 | |
| 34 | return True |
| 35 | |
| 36 | def extract_emails(text): |
| 37 | """Extract valid email addresses from a given text.""" |
| 38 | email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
| 39 | potential_emails = re.findall(email_pattern, text) |
| 40 | return [email for email in potential_emails if is_valid_email(email)] |
| 41 | |
| 42 | def process_directory(directory): |
| 43 | """Process all CSV files in a directory and extract emails.""" |
| 44 | emails = set() |
| 45 | csv_count = 0 |
| 46 | for filename in os.listdir(directory): |
| 47 | if filename.endswith('.csv'): |
| 48 | csv_count += 1 |
| 49 | filepath = os.path.join(directory, filename) |
| 50 | try: |
| 51 | with open(filepath, 'r', encoding='utf-8', errors='ignore') as csvfile: |
| 52 | reader = csv.reader(csvfile) |
| 53 | for row in reader: |
| 54 | for cell in row: |
| 55 | emails.update(extract_emails(cell)) |
| 56 | except Exception as e: |
| 57 | logging.error(f"Error processing {filepath}: {str(e)}") |
| 58 | |
| 59 | # Write emails to a text file |
| 60 | output_filename = f"{os.path.basename(directory)}.txt" |
| 61 | with open(output_filename, 'w', encoding='utf-8') as outfile: |
| 62 | for email in sorted(emails): |
| 63 | outfile.write(email + '\n') |
| 64 | |
| 65 | logging.info(f"Processed {csv_count} CSV files in {directory}") |
| 66 | logging.info(f"Extracted {len(emails)} unique valid emails to {output_filename}") |
| 67 | |
| 68 | def main(): |
| 69 | current_dir = os.getcwd() |
| 70 | for item in os.listdir(current_dir): |
| 71 | if os.path.isdir(item): |
| 72 | logging.info(f"Processing directory: {item}") |
| 73 | process_directory(item) |
| 74 | |
| 75 | if __name__ == "__main__": |
| 76 | main() |