How do I avoid scraping duplicate content with Python?

Avoiding duplicate content is crucial for efficient web scraping as it reduces server load, saves bandwidth, prevents data redundancy, and ensures your scraped dataset remains clean and manageable.

Here are seven proven strategies to avoid scraping duplicate content when using Python:

1. Track Unique URLs with Sets

The simplest approach is using Python's set data structure to track already-processed URLs. Sets automatically handle duplicate elimination with O(1) lookup time.

import requests
from bs4 import BeautifulSoup
import time

class SimpleScraper:
    def __init__(self):
        self.visited_urls = set()

    def scrape_page(self, url):
        if url in self.visited_urls:
            print(f"Skipping duplicate URL: {url}")
            return None

        self.visited_urls.add(url)

        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')

            print(f"Scraped: {url}")
            return soup.get_text()
        except requests.RequestException as e:
            print(f"Error scraping {url}: {e}")
            return None

# Usage
scraper = SimpleScraper()
scraper.scrape_page('https://example.com/page1')
scraper.scrape_page('https://example.com/page2')
scraper.scrape_page('https://example.com/page1')  # Will be skipped

2. Content Hashing for Duplicate Detection

When URLs differ but content is identical, use content hashing to detect duplicates. This is particularly useful for sites with dynamic URLs or multiple URLs serving the same content.

import hashlib
import requests
from bs4 import BeautifulSoup

class ContentHashScraper:
    def __init__(self):
        self.content_hashes = set()

    def get_content_hash(self, content):
        """Generate SHA-256 hash of content"""
        return hashlib.sha256(content.encode('utf-8')).hexdigest()

    def is_duplicate_content(self, content):
        content_hash = self.get_content_hash(content)
        if content_hash in self.content_hashes:
            return True
        self.content_hashes.add(content_hash)
        return False

    def scrape_with_dedup(self, url):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()

            # Extract main content (remove navigation, ads, etc.)
            soup = BeautifulSoup(response.content, 'html.parser')
            main_content = soup.find('main') or soup.find('article') or soup.find('body')
            content = main_content.get_text().strip() if main_content else ""

            if self.is_duplicate_content(content):
                print(f"Duplicate content detected for: {url}")
                return None

            print(f"New content scraped from: {url}")
            return content

        except requests.RequestException as e:
            print(f"Error scraping {url}: {e}")
            return None

# Usage
scraper = ContentHashScraper()
scraper.scrape_with_dedup('https://example.com/article1')
scraper.scrape_with_dedup('https://example.com/article1?ref=social')  # Same content, different URL

3. Persistent Storage with SQLite

For large-scale or long-running scrapers, use SQLite to persist visited URLs and content hashes across multiple runs.

import sqlite3
import hashlib
import requests
from contextlib import contextmanager

class PersistentScraper:
    def __init__(self, db_path='scraper_cache.db'):
        self.db_path = db_path
        self.init_database()

    def init_database(self):
        with self.get_db_connection() as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS visited_urls (
                    url TEXT PRIMARY KEY,
                    scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            conn.execute('''
                CREATE TABLE IF NOT EXISTS content_hashes (
                    hash TEXT PRIMARY KEY,
                    url TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

    @contextmanager
    def get_db_connection(self):
        conn = sqlite3.connect(self.db_path)
        try:
            yield conn
        finally:
            conn.close()

    def is_url_visited(self, url):
        with self.get_db_connection() as conn:
            cursor = conn.execute('SELECT 1 FROM visited_urls WHERE url = ?', (url,))
            return cursor.fetchone() is not None

    def mark_url_visited(self, url):
        with self.get_db_connection() as conn:
            conn.execute('INSERT OR IGNORE INTO visited_urls (url) VALUES (?)', (url,))
            conn.commit()

    def is_content_duplicate(self, content, url):
        content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()

        with self.get_db_connection() as conn:
            cursor = conn.execute('SELECT url FROM content_hashes WHERE hash = ?', (content_hash,))
            existing = cursor.fetchone()

            if existing:
                print(f"Duplicate content found. Original URL: {existing[0]}, Current URL: {url}")
                return True

            conn.execute('INSERT INTO content_hashes (hash, url) VALUES (?, ?)', (content_hash, url))
            conn.commit()
            return False

    def scrape_url(self, url):
        if self.is_url_visited(url):
            print(f"URL already visited: {url}")
            return None

        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            content = response.text

            if self.is_content_duplicate(content, url):
                self.mark_url_visited(url)  # Mark as visited even if duplicate
                return None

            self.mark_url_visited(url)
            print(f"Successfully scraped: {url}")
            return content

        except requests.RequestException as e:
            print(f"Error scraping {url}: {e}")
            return None

# Usage
scraper = PersistentScraper()
scraper.scrape_url('https://example.com/page1')
scraper.scrape_url('https://example.com/page1')  # Will be skipped

4. Advanced URL Canonicalization

Proper URL canonicalization prevents treating similar URLs as different ones, reducing false duplicates.

from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
import re

class URLCanonicalizer:
    @staticmethod
    def canonicalize_url(url):
        """Normalize URL to standard form"""
        parsed = urlparse(url.lower())

        # Normalize scheme
        scheme = parsed.scheme or 'https'

        # Normalize domain
        netloc = parsed.netloc.replace('www.', '')

        # Normalize path
        path = parsed.path.rstrip('/')
        if not path:
            path = '/'

        # Sort query parameters
        query_params = parse_qs(parsed.query)
        # Remove tracking parameters
        tracking_params = {'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content', 'ref', 'source'}
        filtered_params = {k: v for k, v in query_params.items() if k not in tracking_params}

        # Reconstruct query string
        sorted_query = urlencode(sorted(filtered_params.items()), doseq=True)

        # Ignore fragment
        return urlunparse((scheme, netloc, path, '', sorted_query, ''))

class CanonicalScraper:
    def __init__(self):
        self.visited_canonical_urls = set()

    def scrape_url(self, url):
        canonical_url = URLCanonicalizer.canonicalize_url(url)

        if canonical_url in self.visited_canonical_urls:
            print(f"Canonical URL already visited: {canonical_url} (original: {url})")
            return None

        self.visited_canonical_urls.add(canonical_url)

        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            print(f"Scraped: {url} (canonical: {canonical_url})")
            return response.text
        except requests.RequestException as e:
            print(f"Error scraping {url}: {e}")
            return None

# Usage
scraper = CanonicalScraper()
scraper.scrape_url('https://example.com/page')
scraper.scrape_url('https://www.example.com/page/')  # Same canonical URL
scraper.scrape_url('https://example.com/page?utm_source=google')  # Same canonical URL

5. Scrapy with Built-in Duplication Filtering

Scrapy provides built-in duplicate filtering capabilities that can be customized for your needs.

import scrapy
from scrapy.dupefilters import RFPDupeFilter
from scrapy.utils.request import request_fingerprint

class CustomDupeFilter(RFPDupeFilter):
    def request_fingerprint(self, request):
        """Custom fingerprinting to ignore certain URL parameters"""
        # Remove tracking parameters before fingerprinting
        url = request.url
        # Add custom logic here
        return super().request_fingerprint(request)

class DeduplicationSpider(scrapy.Spider):
    name = 'dedup_spider'
    custom_settings = {
        'DUPEFILTER_CLASS': CustomDupeFilter,
        'DUPEFILTER_DEBUG': True,
    }

    def start_requests(self):
        urls = [
            'https://example.com/page1',
            'https://example.com/page2',
            'https://example.com/page1',  # Duplicate
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response):
        # Scrapy automatically filters duplicates
        self.logger.info(f'Scraped: {response.url}')

        # Extract data
        title = response.css('title::text').get()

        yield {
            'url': response.url,
            'title': title,
            'content': response.text[:1000]  # First 1000 chars
        }

        # Follow links
        for link in response.css('a::attr(href)').getall():
            yield response.follow(link, self.parse)

6. Bloom Filters for Memory-Efficient Deduplication

For very large-scale scraping, Bloom filters provide memory-efficient duplicate detection with a small probability of false positives.

from pybloom_live import BloomFilter
import requests

class BloomFilterScraper:
    def __init__(self, capacity=1000000, error_rate=0.1):
        self.bloom_filter = BloomFilter(capacity=capacity, error_rate=error_rate)

    def is_probably_duplicate(self, url):
        """Returns True if URL is probably a duplicate"""
        return url in self.bloom_filter

    def add_url(self, url):
        """Add URL to the filter"""
        self.bloom_filter.add(url)

    def scrape_url(self, url):
        if self.is_probably_duplicate(url):
            print(f"Probably duplicate URL: {url}")
            return None

        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()

            self.add_url(url)
            print(f"Scraped: {url}")
            return response.text

        except requests.RequestException as e:
            print(f"Error scraping {url}: {e}")
            return None

# Note: Install pybloom_live with: pip install pybloom_live

7. Complete Scraper with Multiple Deduplication Strategies

Here's a comprehensive example combining multiple strategies:

import requests
from bs4 import BeautifulSoup
import sqlite3
import hashlib
from urllib.parse import urljoin, urlparse
import time
from contextlib import contextmanager

class ComprehensiveScraper:
    def __init__(self, db_path='comprehensive_scraper.db', delay=1):
        self.db_path = db_path
        self.delay = delay
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        self.init_database()

    def init_database(self):
        with self.get_db_connection() as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS scraped_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    url TEXT UNIQUE,
                    canonical_url TEXT,
                    content_hash TEXT,
                    title TEXT,
                    scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    content_length INTEGER
                )
            ''')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_url ON scraped_data(url)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_canonical_url ON scraped_data(canonical_url)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_content_hash ON scraped_data(content_hash)')

    @contextmanager
    def get_db_connection(self):
        conn = sqlite3.connect(self.db_path)
        try:
            yield conn
        finally:
            conn.close()

    def canonicalize_url(self, url):
        """Normalize URL to standard form"""
        parsed = urlparse(url.lower())
        scheme = parsed.scheme or 'https'
        netloc = parsed.netloc.replace('www.', '')
        path = parsed.path.rstrip('/') or '/'
        return f"{scheme}://{netloc}{path}"

    def get_content_hash(self, content):
        """Generate SHA-256 hash of cleaned content"""
        # Clean content by removing extra whitespace
        cleaned = ' '.join(content.split())
        return hashlib.sha256(cleaned.encode('utf-8')).hexdigest()

    def is_duplicate(self, url, content_hash):
        """Check if URL or content is duplicate"""
        canonical_url = self.canonicalize_url(url)

        with self.get_db_connection() as conn:
            # Check URL duplicate
            cursor = conn.execute('SELECT 1 FROM scraped_data WHERE url = ? OR canonical_url = ?', 
                                (url, canonical_url))
            if cursor.fetchone():
                return True, "URL duplicate"

            # Check content duplicate
            cursor = conn.execute('SELECT url FROM scraped_data WHERE content_hash = ?', 
                                (content_hash,))
            existing = cursor.fetchone()
            if existing:
                return True, f"Content duplicate (original: {existing[0]})"

        return False, None

    def save_scraped_data(self, url, content, title):
        """Save scraped data to database"""
        canonical_url = self.canonicalize_url(url)
        content_hash = self.get_content_hash(content)

        with self.get_db_connection() as conn:
            conn.execute('''
                INSERT OR IGNORE INTO scraped_data 
                (url, canonical_url, content_hash, title, content_length)
                VALUES (?, ?, ?, ?, ?)
            ''', (url, canonical_url, content_hash, title, len(content)))
            conn.commit()

    def scrape_url(self, url):
        """Scrape URL with comprehensive duplicate detection"""
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'html.parser')

            # Extract main content
            title = soup.find('title')
            title_text = title.get_text().strip() if title else ""

            # Get main content (try different selectors)
            content_selectors = ['main', 'article', '.content', '#content', 'body']
            content = ""
            for selector in content_selectors:
                element = soup.select_one(selector)
                if element:
                    content = element.get_text().strip()
                    break

            if not content:
                content = soup.get_text().strip()

            # Check for duplicates
            content_hash = self.get_content_hash(content)
            is_dup, reason = self.is_duplicate(url, content_hash)

            if is_dup:
                print(f"Skipping {url}: {reason}")
                return None

            # Save and return data
            self.save_scraped_data(url, content, title_text)

            print(f"Successfully scraped: {url}")

            # Rate limiting
            time.sleep(self.delay)

            return {
                'url': url,
                'title': title_text,
                'content': content,
                'content_length': len(content)
            }

        except requests.RequestException as e:
            print(f"Error scraping {url}: {e}")
            return None

    def get_stats(self):
        """Get scraping statistics"""
        with self.get_db_connection() as conn:
            cursor = conn.execute('SELECT COUNT(*) FROM scraped_data')
            total_scraped = cursor.fetchone()[0]

            cursor = conn.execute('SELECT AVG(content_length) FROM scraped_data')
            avg_content_length = cursor.fetchone()[0] or 0

            return {
                'total_scraped': total_scraped,
                'avg_content_length': round(avg_content_length, 2)
            }

# Usage example
def main():
    scraper = ComprehensiveScraper(delay=1)

    urls_to_scrape = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://www.example.com/page1/',  # Duplicate canonical URL
        'https://example.com/page3',
    ]

    for url in urls_to_scrape:
        result = scraper.scrape_url(url)
        if result:
            print(f"Scraped {len(result['content'])} characters from {url}")

    # Print statistics
    stats = scraper.get_stats()
    print(f"\nScraping completed. Total pages: {stats['total_scraped']}")
    print(f"Average content length: {stats['avg_content_length']} characters")

if __name__ == "__main__":
    main()

Best Practices for Duplicate Avoidance

  1. Combine Multiple Strategies: Use URL canonicalization with content hashing for maximum effectiveness
  2. Memory Management: For large-scale scraping, use persistent storage or Bloom filters
  3. Rate Limiting: Always implement delays to be respectful to target servers
  4. Error Handling: Robust error handling prevents crashes and data loss
  5. Monitoring: Track duplicate detection rates to optimize your strategies
  6. Database Indexing: Proper indexing speeds up duplicate lookups significantly

Remember to always respect robots.txt files, implement proper rate limiting, and follow the website's terms of service when scraping.

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon