What Is a Data Pipeline? Processing Scraped Data at Scale
A data pipeline in web scraping is a series of automated steps that process raw scraped data into clean, structured, and usable output. It typically includes extraction, cleaning, validation, transformation, and storage stages.
The Scraping Pipeline
Fetch → Parse → Clean → Validate → Transform → Store
Each stage handles a specific responsibility:
- 1.Fetch: Download the web page (requests, Playwright)
- 2.Parse: Extract raw data from HTML (BeautifulSoup, selectors)
- 3.Clean: Remove noise (whitespace, HTML tags, special characters)
- 4.Validate: Check data quality (required fields, correct formats)
- 5.Transform: Convert to desired format (dates, prices, normalization)
- 6.Store: Save to destination (CSV, database, API)
Example Pipeline in Python
import re
import csv
from datetime import datetime
def clean_price(raw_price: str) -> float:
"""Extract numeric price from text like '$1,299.99'"""
cleaned = re.sub(r'[^\d.]', '', raw_price)
return float(cleaned) if cleaned else 0.0
def clean_text(text: str) -> str:
"""Remove extra whitespace and normalize"""
return ' '.join(text.split()).strip()
def validate_product(product: dict) -> bool:
"""Check required fields exist and are valid"""
return bool(product.get("name")) and product.get("price", 0) > 0
# Pipeline
raw_products = scrape_products() # your scraping function
cleaned = [
{
"name": clean_text(p["name"]),
"price": clean_price(p["price"]),
"scraped_at": datetime.now().isoformat(),
}
for p in raw_products
]
valid = [p for p in cleaned if validate_product(p)]
# Store
with open("products.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["name", "price", "scraped_at"])
writer.writeheader()
writer.writerows(valid)
Scrapy Pipelines
Scrapy has a built-in pipeline system:
class CleanPricePipeline:
def process_item(self, item, spider):
item["price"] = float(re.sub(r'[^\d.]', '', item["price"]))
return item
class DuplicatesPipeline:
def __init__(self):
self.seen = set()
def process_item(self, item, spider):
if item["url"] in self.seen:
raise DropItem(f"Duplicate: {item['url']}")
self.seen.add(item["url"])
return item
Best Practices
- •Always store raw data alongside cleaned data (you can re-process later)
- •Log rejected/invalid items for debugging
- •Add timestamps to every record
- •Deduplicate early in the pipeline