#!/usr/bin/env python3 """ Build a multilingual keyword portfolio from Google Ads exports. Usage: python scripts/build_keyword_portfolio.py python scripts/build_keyword_portfolio.py --output-dir docs/keyword-research/2026-04-05 """ from __future__ import annotations import argparse import csv import math import re import unicodedata from collections import Counter from dataclasses import dataclass, field from pathlib import Path ROOT = Path(__file__).resolve().parents[1] BASE_INPUTS = [ ROOT / "docs" / "KeywordStats_4_5_2026.csv", ROOT / "docs" / "Keyword Stats 2026-04-05 at 10_02_37.csv", ] DEFAULT_OUTPUT_DIR = ROOT / "docs" / "keyword-research" / "2026-04-05" SUPPLEMENTAL_INPUT_DIR = DEFAULT_OUTPUT_DIR / "Keywords" SUPPORTED_LANGUAGES = {"en", "ar", "fr"} GROWTH_LANGUAGES = {"es"} WATCHLIST_LANGUAGES = {"zh", "it", "pt", "other"} HOW_TO_MARKERS = {"how to", "comment ", "كيفية", "كيف ", "how do i"} FREE_MARKERS = {"free", "gratis", "gratuit"} ONLINE_MARKERS = {"online", "en ligne"} PAGE_MARKERS = {"page", "pages", "pagina", "paginas", "página", "páginas"} FILE_MARKERS = {"file", "files", "document", "documents", "archivo", "archivos", "fichier", "fichiers"} SPLIT_MARKERS = { "split", "splitter", "splitpdf", "pdfsplit", "separate", "separator", "divide", "divider", "cut", "cutter", "slicer", "trimmer", "breaker", "unmerge", "dividir", "separar", "separa", "separador", "cortar", "diviser", "séparer", "separer", "fractionner", "decouper", "découper", "couper", "dividi", "تقسيم", "فصل", "拆分", "分割", } EXTRACT_MARKERS = {"extract", "extractor", "extraction", "extract pages", "استخراج"} MERGE_MARKERS = {"merge", "merger", "combine", "join", "fusionner", "fusion", "دمج"} COMPRESS_MARKERS = {"compress", "compressor", "compression", "reduce size", "reduce pdf", "ضغط"} CONVERT_MARKERS = {"convert", "converter", "conversion", "to pdf", "pdf to", "تحويل"} EDIT_MARKERS = {"edit", "editor", "editing", "software"} IMAGE_TO_PDF_MARKERS = {"image pdf", "images to pdf", "image to pdf", "add image to pdf", "photo to pdf", "jpg to pdf", "png to pdf"} PDF_TOOL_MARKERS = {"pdf tools", "tool pdf", "pdf tool"} PDF_TO_WORD_MARKERS = {"pdf to word", "pdf to doc", "pdf to docx", "convert pdf to word"} WORD_TO_PDF_MARKERS = {"word to pdf", "doc to pdf", "docx to pdf", "convert word to pdf"} OCR_MARKERS = { "ocr", "text recognition", "extract text from image", "extract text from pdf", "image to text", "pdf to text", "scan to text", "optical character recognition", "استخراج النص", } SPANISH_MARKERS = {"dividir", "separar", "separa", "separador", "gratis", "cortar"} FRENCH_MARKERS = {"diviser", "séparer", "separer", "fractionner", "decouper", "découper", "couper", "gratuit"} ITALIAN_MARKERS = {"dividi"} PORTUGUESE_MARKERS = {"separador"} SPLIT_VALID_PATTERNS = [ re.compile(r"^(?:online )?split pdf(?: free| online| free online| pages| pages free| file| files| document)?$"), re.compile(r"^pdf split(?: online| free)?$"), re.compile(r"^pdf splitter(?: online| free| free online)?$"), re.compile(r"^splitter pdf$"), re.compile(r"^separate pdf(?: pages| files| free| pages free)?$"), re.compile(r"^pdf separate(?: pages)?$"), re.compile(r"^pdf separator$"), re.compile(r"^pdf page separator$"), re.compile(r"^cut pdf(?: pages)?$"), re.compile(r"^pdf cutter(?: online)?$"), re.compile(r"^pdf divider$"), re.compile(r"^unmerge pdf(?: free| online)?$"), re.compile(r"^dividir pdf(?: gratis| online)?$"), re.compile(r"^separar pdf$"), re.compile(r"^separa pdf$"), re.compile(r"^separador de pdf$"), re.compile(r"^cortar pdf$"), re.compile(r"^diviser pdf$"), re.compile(r"^séparer pdf$"), re.compile(r"^separer pdf$"), re.compile(r"^fractionner pdf$"), re.compile(r"^decouper pdf$"), re.compile(r"^découper pdf$"), re.compile(r"^couper pdf$"), re.compile(r"^pdfsplit$"), re.compile(r"^splitpdf$"), re.compile(r"^(?:拆分pdf|pdf拆分|分割pdf|pdf分割)$"), ] EXTRACT_VALID_PATTERNS = [ re.compile(r"^extract pages? from pdf$"), re.compile(r"^pdf extractor$"), re.compile(r"^extract pdf$"), re.compile(r"^extract pdf pages$"), re.compile(r"^pdf extract(?:or)?$"), ] MERGE_VALID_PATTERNS = [ re.compile(r"^merge pdf(?: files| documents| free| online)?$"), re.compile(r"^pdf merge$"), re.compile(r"^pdf merger$"), re.compile(r"^دمج pdf$"), ] COMPRESS_VALID_PATTERNS = [ re.compile(r"^compress pdf(?: file| document| online| free| online free)?$"), re.compile(r"^pdf compressor(?: free| online)?$"), re.compile(r"^pdf compression$"), re.compile(r"^ضغط pdf$"), ] CONVERSION_VALID_PATTERNS = [ re.compile(r"^pdf converter$"), re.compile(r"^convert (?:file|file type|document|documents|image|images|photo|photos|word|doc|docx|excel|xls|xlsx|ppt|pptx|powerpoint|html|text|txt) to pdf$"), re.compile(r"^(?:word|doc|docx|excel|xls|xlsx|ppt|pptx|powerpoint|html|image|images|photo|photos|jpg|jpeg|png) to pdf$"), re.compile(r"^pdf to (?:word|excel|ppt|pptx|powerpoint|images?|jpg|jpeg|png)$"), ] EDITOR_VALID_PATTERNS = [ re.compile(r"^pdf editor$"), re.compile(r"^edit pdf$"), re.compile(r"^pdf editing software$"), re.compile(r"^online pdf editor$"), ] IMAGE_TO_PDF_VALID_PATTERNS = [ re.compile(r"^image pdf$"), re.compile(r"^image to pdf$"), re.compile(r"^images to pdf$"), re.compile(r"^add image to pdf(?: document)?$"), re.compile(r"^photo to pdf$"), re.compile(r"^jpg to pdf$"), re.compile(r"^png to pdf$"), ] PDF_TO_WORD_VALID_PATTERNS = [ re.compile(r"^pdf to (?:word|doc|docx)$"), re.compile(r"^convert pdf to (?:word|doc|docx)$"), re.compile(r"^تحويل pdf (?:الى|إلى) (?:word|وورد)$"), re.compile(r"^تحويل من pdf (?:الى|إلى) (?:word|وورد)$"), re.compile(r"^(?:pdf|بي دي اف) (?:الى|إلى) (?:word|وورد)$"), ] WORD_TO_PDF_VALID_PATTERNS = [ re.compile(r"^(?:word|doc|docx) to pdf$"), re.compile(r"^convert (?:word|doc|docx) to pdf$"), re.compile(r"^تحويل (?:word|وورد|doc|docx) (?:الى|إلى) pdf$"), re.compile(r"^تحويل من (?:word|وورد|doc|docx) (?:الى|إلى) pdf$"), ] OCR_VALID_PATTERNS = [ re.compile(r"^ocr(?: pdf| image| scanner)?$"), re.compile(r"^text recognition$"), re.compile(r"^extract text from (?:image|pdf|scan|scanned pdf)$"), re.compile(r"^image to text$"), re.compile(r"^pdf to text$"), re.compile(r"^scan to text$"), re.compile(r"^optical character recognition$"), re.compile(r"^استخراج النص من (?:pdf|صورة)$"), re.compile(r"^تحويل (?:pdf|صورة) (?:الى|إلى) نص$"), ] BRAND_PATTERNS = { "ilovepdf": re.compile(r"\bi\s*love\s*pdf\b|\bilovepdf\b", re.IGNORECASE), "smallpdf": re.compile(r"\bsmall\s*pdf\b|\bsmallpdf\b", re.IGNORECASE), "sejda": re.compile(r"\bsejda\b", re.IGNORECASE), "adobe": re.compile(r"\badobe\b|\bacrobat\b", re.IGNORECASE), "cutepdf": re.compile(r"\bcute\s*pdf\b|\bcutepdf\b", re.IGNORECASE), "pdf24": re.compile(r"\bpdf\s*24\b|\bpdf24\b", re.IGNORECASE), } AMBIGUOUS_EXACT = { "split", "pdf", "pd f", "pdf file", "pdf format", "pdf online", "split pages", "split online", "page separator", "pdf to split", "pdf smart", } CLUSTER_METADATA = { "split-pdf": { "label": "Split PDF", "recommended_target": "/tools/split-pdf", "target_type": "live_tool", "implementation_note": "Prioritize this existing landing page with unbranded transactional terms and page-focused variants.", }, "extract-pages": { "label": "Extract Pages", "recommended_target": "/tools/extract-pages", "target_type": "live_tool", "implementation_note": "Use as a secondary page cluster for extraction-specific and page-removal intent.", }, "merge-pdf": { "label": "Merge PDF", "recommended_target": "/tools/merge-pdf", "target_type": "live_tool", "implementation_note": "Target merge-specific queries separately from split keywords to avoid mixed intent pages.", }, "compress-pdf": { "label": "Compress PDF", "recommended_target": "/tools/compress-pdf", "target_type": "live_tool", "implementation_note": "This cluster broadens reach beyond split and should be treated as a parallel priority pillar.", }, "pdf-to-word": { "label": "PDF to Word", "recommended_target": "/tools/pdf-to-word", "target_type": "live_tool", "implementation_note": "Map direct PDF-to-Word conversion intent to the existing converter page rather than a generic conversion hub.", }, "word-to-pdf": { "label": "Word to PDF", "recommended_target": "/tools/word-to-pdf", "target_type": "live_tool", "implementation_note": "Route Word-to-PDF terms to the dedicated converter page because the intent is specific and high value.", }, "ocr": { "label": "OCR / Text Extraction", "recommended_target": "/tools/ocr", "target_type": "live_tool", "implementation_note": "Send OCR and text-extraction intent to the OCR tool page instead of mixing it into broad AI copy.", }, "pdf-conversion": { "label": "PDF Conversion Hub", "recommended_target": "homepage-or-future-conversion-hub", "target_type": "hub_or_future_page", "implementation_note": "Use these keywords to justify a collection page for generic converter intent.", }, "pdf-editor": { "label": "PDF Editor", "recommended_target": "/tools/pdf-editor", "target_type": "live_tool", "implementation_note": "Position editor and editing-software terms on the live PDF editor page.", }, "images-to-pdf": { "label": "Images to PDF", "recommended_target": "/tools/images-to-pdf", "target_type": "live_tool", "implementation_note": "Capture image-to-PDF phrasing and upload intent on the existing converter tool.", }, "mixed-pdf-operations": { "label": "Mixed PDF Operations", "recommended_target": "homepage-or-future-pdf-tools-hub", "target_type": "hub_or_future_page", "implementation_note": "Mixed split-and-merge intent should point to a tools hub, not a single-action landing page.", }, "pdf-tools-hub": { "label": "PDF Tools Hub", "recommended_target": "homepage-or-future-pdf-tools-hub", "target_type": "hub_or_future_page", "implementation_note": "Reserve this cluster for clear hub-style terms such as pdf tools.", }, "unclear": { "label": "Manual Review", "recommended_target": "manual-review", "target_type": "manual_review", "implementation_note": "Keep unclear terms out of the primary portfolio until manually validated.", }, } RECOMMENDATION_ORDER = { "target_now": 0, "target_after_localization": 1, "supporting_content": 2, "watchlist": 3, "exclude": 4, } @dataclass class SourceRow: keyword: str normalized: str source_name: str source_path: str volume: int raw_metric_name: str competition: str = "" competition_index: int = 0 raw_trends: str = "" @dataclass class KeywordAggregate: keyword: str normalized: str source_names: set[str] = field(default_factory=set) source_paths: set[str] = field(default_factory=set) file1_impressions: int = 0 file2_avg_monthly_searches: int = 0 competitions: set[str] = field(default_factory=set) competition_index_max: int = 0 raw_trends: list[str] = field(default_factory=list) def clean_int(value: str | None) -> int: if not value: return 0 digits = re.sub(r"[^0-9]", "", str(value)) return int(digits) if digits else 0 def normalize_keyword(value: str) -> str: text = unicodedata.normalize("NFKC", value or "") text = re.sub(r"[\u200e\u200f\u202a-\u202e\u2066-\u2069]", "", text) text = text.lower().replace("_", " ") text = text.replace("&", " and ") text = re.sub(r"[|/+]+", " ", text) text = re.sub(r"[^\w\s\u0600-\u06FF\u4E00-\u9FFF-]", " ", text, flags=re.UNICODE) text = re.sub(r"\s+", " ", text) return text.strip() def contains_any(text: str, markers: set[str]) -> bool: tokens = set(text.split()) for marker in markers: if re.search(r"[\u0600-\u06FF\u4E00-\u9FFF]", marker): if marker in text: return True continue if " " in marker and marker in text: return True if marker in tokens: return True return False def has_token_or_phrase(keyword: str, markers: set[str]) -> bool: return contains_any(keyword, markers) def matches_any_pattern(keyword: str, patterns: list[re.Pattern[str]]) -> bool: return any(pattern.search(keyword) for pattern in patterns) def discover_default_inputs() -> list[Path]: input_paths = [path for path in BASE_INPUTS if path.exists()] seen_names = {path.name for path in input_paths} if SUPPLEMENTAL_INPUT_DIR.exists(): for path in sorted(SUPPLEMENTAL_INPUT_DIR.glob("*.csv")): if path.name in seen_names: continue input_paths.append(path) seen_names.add(path.name) return input_paths DEFAULT_INPUTS = discover_default_inputs() def strip_informational_prefix(keyword: str) -> str: for prefix in ("how to ", "comment ", "كيفية ", "كيف ", "how do i "): if keyword.startswith(prefix): return keyword[len(prefix):].strip() return keyword def detect_language(keyword: str) -> str: if re.search(r"[\u0600-\u06FF]", keyword): return "ar" if re.search(r"[\u4E00-\u9FFF]", keyword): return "zh" if has_token_or_phrase(keyword, FRENCH_MARKERS): return "fr" if has_token_or_phrase(keyword, SPANISH_MARKERS): return "es" if has_token_or_phrase(keyword, ITALIAN_MARKERS): return "it" if has_token_or_phrase(keyword, PORTUGUESE_MARKERS): return "pt" return "en" def detect_brands(keyword: str) -> list[str]: hits = [] for brand, pattern in BRAND_PATTERNS.items(): if pattern.search(keyword): hits.append(brand) return sorted(hits) def extract_modifiers(keyword: str, brand_hits: list[str]) -> list[str]: modifiers = [] if contains_any(keyword, HOW_TO_MARKERS): modifiers.append("how_to") if contains_any(keyword, FREE_MARKERS): modifiers.append("free") if contains_any(keyword, ONLINE_MARKERS): modifiers.append("online") if contains_any(keyword, PAGE_MARKERS): modifiers.append("pages") if contains_any(keyword, FILE_MARKERS): modifiers.append("files") if brand_hits: modifiers.append("brand") return modifiers def classify_cluster(keyword: str) -> str: has_pdf_to_word = contains_any(keyword, PDF_TO_WORD_MARKERS) or matches_any_pattern(keyword, PDF_TO_WORD_VALID_PATTERNS) has_word_to_pdf = contains_any(keyword, WORD_TO_PDF_MARKERS) or matches_any_pattern(keyword, WORD_TO_PDF_VALID_PATTERNS) has_ocr = contains_any(keyword, OCR_MARKERS) or matches_any_pattern(keyword, OCR_VALID_PATTERNS) has_split = contains_any(keyword, SPLIT_MARKERS) has_extract = contains_any(keyword, EXTRACT_MARKERS) has_merge = contains_any(keyword, MERGE_MARKERS) has_compress = contains_any(keyword, COMPRESS_MARKERS) has_convert = contains_any(keyword, CONVERT_MARKERS) has_edit = contains_any(keyword, EDIT_MARKERS) has_image_to_pdf = contains_any(keyword, IMAGE_TO_PDF_MARKERS) has_pdf_tool = contains_any(keyword, PDF_TOOL_MARKERS) if has_pdf_to_word: return "pdf-to-word" if has_word_to_pdf: return "word-to-pdf" if has_ocr: return "ocr" if has_split and has_merge: return "mixed-pdf-operations" if has_extract: return "extract-pages" if has_split or "to pages" in keyword: return "split-pdf" if has_merge: return "merge-pdf" if has_compress: return "compress-pdf" if has_image_to_pdf: return "images-to-pdf" if has_edit: return "pdf-editor" if has_convert or keyword.startswith("pdf to ") or keyword.endswith(" to pdf"): return "pdf-conversion" if has_pdf_tool: return "pdf-tools-hub" return "unclear" def repeated_phrase(tokens: list[str]) -> bool: if len(tokens) < 4: return False for size in range(1, len(tokens) // 2 + 1): if len(tokens) % size: continue chunk = tokens[:size] repeats = len(tokens) // size if repeats > 1 and chunk * repeats == tokens: return True return False def detect_noise_reason(keyword: str, cluster: str, brand_hits: list[str], file1_impressions: int, file2_searches: int) -> str: tokens = keyword.split() if keyword in AMBIGUOUS_EXACT: return "too_broad_or_ambiguous" if keyword == "page separator": return "not_pdf_specific" if repeated_phrase(tokens): return "repeated_phrase_spam" if tokens and max(Counter(tokens).values()) >= 3 and len(set(tokens)) <= 3: return "repeated_tokens_spam" if brand_hits: return "" if "pdf" not in keyword and cluster not in {"pdf-tools-hub", "pdf-editor", "images-to-pdf", "ocr"}: return "not_pdf_specific" if cluster == "unclear" and max(file1_impressions, file2_searches) < 500: return "unclear_low_value" if keyword.startswith("pd f") or keyword.endswith("pd f"): return "malformed_keyword" if cluster == "unclear": return "manual_review_required" cluster_phrase_issue = detect_cluster_phrase_issue(keyword, cluster) if cluster_phrase_issue: return cluster_phrase_issue return "" def detect_cluster_phrase_issue(keyword: str, cluster: str) -> str: candidate = strip_informational_prefix(keyword) if cluster == "split-pdf": if candidate.count("pdf") > 1 and candidate not in {"pdf split", "pdf splitter", "pdf separator", "pdf page separator", "pdf separate", "pdf divider", "pdf cutter"}: return "unnatural_cluster_phrase" if any(pattern.search(candidate) for pattern in SPLIT_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "extract-pages": if any(pattern.search(candidate) for pattern in EXTRACT_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "merge-pdf": if candidate.count("pdf") > 1: return "unnatural_cluster_phrase" if any(pattern.search(candidate) for pattern in MERGE_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "compress-pdf": if candidate.count("pdf") > 1 or candidate.count("compress") > 1 or candidate.count("compressor") > 1: return "unnatural_cluster_phrase" if any(pattern.search(candidate) for pattern in COMPRESS_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "pdf-to-word": if any(pattern.search(candidate) for pattern in PDF_TO_WORD_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "word-to-pdf": if any(pattern.search(candidate) for pattern in WORD_TO_PDF_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "ocr": if any(pattern.search(candidate) for pattern in OCR_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "pdf-conversion": if candidate == "pdf converter": return "" if candidate.count("pdf") > 1: return "unnatural_cluster_phrase" if any(pattern.search(candidate) for pattern in CONVERSION_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "pdf-editor": if candidate.count("pdf") > 1: return "unnatural_cluster_phrase" if any(pattern.search(candidate) for pattern in EDITOR_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "images-to-pdf": if candidate.count("pdf") > 1: return "unnatural_cluster_phrase" if any(pattern.search(candidate) for pattern in IMAGE_TO_PDF_VALID_PATTERNS): return "" return "unnatural_cluster_phrase" if cluster == "mixed-pdf-operations": if candidate in {"pdf split and merge", "split and merge pdf"}: return "" return "unnatural_cluster_phrase" if cluster == "pdf-tools-hub": if "pdf tools" in keyword: return "" return "unnatural_cluster_phrase" return "" def detect_intent(keyword: str, brand_hits: list[str]) -> str: if brand_hits: return "competitor" if contains_any(keyword, HOW_TO_MARKERS): return "informational" if "pdf tools" in keyword: return "commercial_investigation" return "transactional" def market_bucket(language: str) -> str: if language == "en": return "core_en" if language == "es": return "growth_es" if language == "ar": return "expansion_ar" if language == "fr": return "expansion_fr" return "watchlist_other" def recommendation_for(language: str, intent: str, cluster: str, brand_hits: list[str], noise_reason: str) -> tuple[str, str]: if noise_reason: return "exclude", noise_reason if brand_hits: return "watchlist", "competitor_branded" if language in WATCHLIST_LANGUAGES: return "watchlist", "unsupported_language_market" if language in GROWTH_LANGUAGES: if intent == "informational": return "supporting_content", "spanish_content_after_localization" return "target_after_localization", "spanish_localization_required" if cluster == "pdf-tools-hub": return "supporting_content", "homepage_or_tools_hub" if intent == "informational": return "supporting_content", "blog_or_faq_support" return "target_now", "mapped_to_live_page_or_current_i18n" def score_keyword(file1_impressions: int, file2_searches: int, max_file1: int, max_file2: int) -> float: file1_score = 0.0 file2_score = 0.0 if max_file1: file1_score = math.log10(file1_impressions + 1) / math.log10(max_file1 + 1) if max_file2: file2_score = math.log10(file2_searches + 1) / math.log10(max_file2 + 1) return round(file1_score * 45 + file2_score * 55, 2) def load_keyword_stats(path: Path) -> list[SourceRow]: rows: list[SourceRow] = [] with path.open("r", encoding="utf-8-sig", newline="") as handle: reader = csv.DictReader(handle) for row in reader: keyword = (row.get("Keyword") or "").strip() if not keyword: continue rows.append( SourceRow( keyword=keyword, normalized=normalize_keyword(keyword), source_name="keyword_trends_export", source_path=str(path.relative_to(ROOT)).replace("\\", "/"), volume=clean_int(row.get("Impressions")), raw_metric_name="impressions", raw_trends=(row.get("Trends") or "").strip(), ) ) return rows def load_keyword_planner(path: Path) -> list[SourceRow]: rows: list[SourceRow] = [] with path.open("r", encoding="utf-16") as handle: lines = handle.read().splitlines() reader = csv.DictReader(lines[2:], delimiter="\t") for row in reader: keyword = (row.get("Keyword") or "").strip() if not keyword: continue rows.append( SourceRow( keyword=keyword, normalized=normalize_keyword(keyword), source_name="keyword_planner_export", source_path=str(path.relative_to(ROOT)).replace("\\", "/"), volume=clean_int(row.get("Avg. monthly searches")), raw_metric_name="avg_monthly_searches", competition=(row.get("Competition") or "").strip(), competition_index=clean_int(row.get("Competition (indexed value)")), ) ) return rows def aggregate_rows(rows: list[SourceRow]) -> list[KeywordAggregate]: aggregates: dict[str, KeywordAggregate] = {} for row in rows: if not row.normalized: continue aggregate = aggregates.get(row.normalized) if aggregate is None: aggregate = KeywordAggregate(keyword=row.keyword, normalized=row.normalized) aggregates[row.normalized] = aggregate current_best = max(aggregate.file1_impressions, aggregate.file2_avg_monthly_searches) incoming_best = row.volume if incoming_best > current_best: aggregate.keyword = row.keyword aggregate.source_names.add(row.source_name) aggregate.source_paths.add(row.source_path) if row.source_name == "keyword_trends_export": aggregate.file1_impressions += row.volume if row.raw_trends: aggregate.raw_trends.append(row.raw_trends) else: aggregate.file2_avg_monthly_searches += row.volume if row.competition: aggregate.competitions.add(row.competition) aggregate.competition_index_max = max(aggregate.competition_index_max, row.competition_index) return list(aggregates.values()) def build_keyword_rows(aggregates: list[KeywordAggregate]) -> tuple[list[dict[str, str]], list[dict[str, str]]]: max_file1 = max((item.file1_impressions for item in aggregates), default=0) max_file2 = max((item.file2_avg_monthly_searches for item in aggregates), default=0) rows: list[dict[str, str]] = [] excluded: list[dict[str, str]] = [] for aggregate in aggregates: normalized = aggregate.normalized brand_hits = detect_brands(normalized) language = detect_language(normalized) cluster = classify_cluster(normalized) metadata = CLUSTER_METADATA[cluster] modifiers = extract_modifiers(normalized, brand_hits) noise_reason = detect_noise_reason( normalized, cluster, brand_hits, aggregate.file1_impressions, aggregate.file2_avg_monthly_searches, ) intent = detect_intent(normalized, brand_hits) recommendation, rationale = recommendation_for(language, intent, cluster, brand_hits, noise_reason) priority_score = score_keyword( aggregate.file1_impressions, aggregate.file2_avg_monthly_searches, max_file1, max_file2, ) row = { "keyword": aggregate.keyword, "normalized_keyword": normalized, "language": language, "market_bucket": market_bucket(language), "intent": intent, "cluster": cluster, "cluster_label": metadata["label"], "recommended_target": metadata["recommended_target"], "target_type": metadata["target_type"], "recommendation": recommendation, "recommendation_reason": rationale, "priority_score": f"{priority_score:.2f}", "file1_impressions": str(aggregate.file1_impressions), "file2_avg_monthly_searches": str(aggregate.file2_avg_monthly_searches), "competition_levels": ", ".join(sorted(aggregate.competitions)), "competition_index_max": str(aggregate.competition_index_max), "brands": ", ".join(brand_hits), "modifiers": ", ".join(modifiers), "source_count": str(len(aggregate.source_names)), "sources": ", ".join(sorted(aggregate.source_names)), "source_paths": ", ".join(sorted(aggregate.source_paths)), "notes": metadata["implementation_note"], } if recommendation == "exclude": excluded.append(row) else: rows.append(row) rows.sort( key=lambda item: ( RECOMMENDATION_ORDER[item["recommendation"]], -float(item["priority_score"]), item["normalized_keyword"], ) ) excluded.sort(key=lambda item: (-float(item["priority_score"]), item["normalized_keyword"])) for index, row in enumerate(rows, start=1): row["priority_rank"] = str(index) return rows, excluded def build_cluster_rows(rows: list[dict[str, str]], excluded: list[dict[str, str]]) -> list[dict[str, str]]: grouped: dict[str, list[dict[str, str]]] = {} for row in rows + excluded: grouped.setdefault(row["cluster"], []).append(row) cluster_rows = [] for cluster, items in grouped.items(): metadata = CLUSTER_METADATA[cluster] targetable = [item for item in items if item["recommendation"] != "exclude"] sorted_items = sorted(items, key=lambda item: -float(item["priority_score"])) top_candidates = sorted(targetable, key=lambda item: -float(item["priority_score"])) top_item = top_candidates[0] if top_candidates else sorted_items[0] cluster_rows.append( { "cluster": cluster, "cluster_label": metadata["label"], "recommended_target": metadata["recommended_target"], "target_type": metadata["target_type"], "cluster_score": f"{sum(float(item['priority_score']) for item in targetable):.2f}", "keywords_total": str(len(items)), "target_now_keywords": str(sum(item["recommendation"] == "target_now" for item in items)), "target_after_localization_keywords": str(sum(item["recommendation"] == "target_after_localization" for item in items)), "supporting_content_keywords": str(sum(item["recommendation"] == "supporting_content" for item in items)), "watchlist_keywords": str(sum(item["recommendation"] == "watchlist" for item in items)), "excluded_keywords": str(sum(item["recommendation"] == "exclude" for item in items)), "top_keyword": top_item["keyword"], "top_language": top_item["language"], "file1_impressions": str(sum(int(item["file1_impressions"]) for item in items)), "file2_avg_monthly_searches": str(sum(int(item["file2_avg_monthly_searches"]) for item in items)), "implementation_note": metadata["implementation_note"], } ) cluster_rows.sort(key=lambda item: -float(item["cluster_score"])) return cluster_rows def to_markdown_table(rows: list[dict[str, str]], headers: list[tuple[str, str]]) -> str: if not rows: return "_No rows._" header_row = "| " + " | ".join(label for _, label in headers) + " |" separator = "| " + " | ".join("---" for _ in headers) + " |" body = [ "| " + " | ".join(str(row.get(key, "")) for key, _ in headers) + " |" for row in rows ] return "\n".join([header_row, separator, *body]) def write_csv(path: Path, rows: list[dict[str, str]], fieldnames: list[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() writer.writerows(rows) def write_summary( output_path: Path, input_paths: list[Path], raw_rows: list[SourceRow], aggregates: list[KeywordAggregate], rows: list[dict[str, str]], excluded: list[dict[str, str]], clusters: list[dict[str, str]], ) -> None: counts_by_recommendation = Counter(row["recommendation"] for row in rows) counts_by_recommendation.update(row["recommendation"] for row in excluded) visible_rows = [row for row in rows if row["recommendation"] != "watchlist"] language_counts = Counter(row["language"] for row in visible_rows) market_counts = Counter(row["market_bucket"] for row in visible_rows) top_target_now = [row for row in rows if row["recommendation"] == "target_now"][:15] top_localization = [row for row in rows if row["recommendation"] == "target_after_localization"][:15] top_supporting = [row for row in rows if row["recommendation"] == "supporting_content"][:12] top_watchlist = [row for row in rows if row["recommendation"] == "watchlist"][:10] top_clusters = clusters[:10] top_excluded = excluded[:10] input_paths_display = [str(path.relative_to(ROOT)).replace("\\", "/") for path in input_paths] input_list = "\n".join(f"- {path}" for path in input_paths_display) market_table_rows = [ { "bucket": "core_en", "execution": "Target now", "notes": "Current product and current data both support this market immediately.", "count": str(market_counts.get("core_en", 0)), }, { "bucket": "growth_es", "execution": "Target after localization", "notes": "Highest upside after English, but the site needs Spanish landing-page coverage first.", "count": str(market_counts.get("growth_es", 0)), }, { "bucket": "expansion_fr", "execution": "Target now where demand exists", "notes": "Supported in product and ready for selective rollout where the uploads show clear intent.", "count": str(market_counts.get("expansion_fr", 0)), }, { "bucket": "expansion_ar", "execution": "Target supported terms, expand with native research", "notes": "Product support exists and the latest uploads surface Arabic conversion intent, but category coverage still needs broader native-language research.", "count": str(market_counts.get("expansion_ar", 0)), }, ] content = f"""# Keyword Portfolio - 2026-04-05 Generated with `scripts/build_keyword_portfolio.py` from the latest Google Ads exports. ## Source Files {input_list} ## Source Overview - Raw rows processed: {len(raw_rows)} - Unique normalized keywords: {len(aggregates)} - Included or watchlist keywords: {len(rows)} - Excluded keywords: {len(excluded)} - `target_now`: {counts_by_recommendation.get('target_now', 0)} - `target_after_localization`: {counts_by_recommendation.get('target_after_localization', 0)} - `supporting_content`: {counts_by_recommendation.get('supporting_content', 0)} - `watchlist`: {counts_by_recommendation.get('watchlist', 0)} - `exclude`: {counts_by_recommendation.get('exclude', 0)} ## Recommended Market Mix {to_markdown_table(market_table_rows, [('bucket', 'Market Bucket'), ('execution', 'Execution'), ('count', 'Keywords'), ('notes', 'Notes')])} ## Language Distribution (Non-Watchlist) {to_markdown_table([ {'language': language, 'count': str(count)} for language, count in sorted(language_counts.items(), key=lambda item: (-item[1], item[0])) ], [('language', 'Language'), ('count', 'Keywords')])} ## Priority Clusters {to_markdown_table(top_clusters, [ ('cluster_label', 'Cluster'), ('recommended_target', 'Recommended Target'), ('cluster_score', 'Cluster Score'), ('target_now_keywords', 'Target Now'), ('target_after_localization_keywords', 'Target After Localization'), ('watchlist_keywords', 'Watchlist'), ('top_keyword', 'Top Keyword'), ])} ## Top Keywords to Target Now {to_markdown_table(top_target_now, [ ('priority_rank', 'Rank'), ('keyword', 'Keyword'), ('language', 'Language'), ('cluster_label', 'Cluster'), ('file2_avg_monthly_searches', 'Avg Monthly Searches'), ('file1_impressions', 'Impressions'), ('priority_score', 'Score'), ('recommended_target', 'Target'), ])} ## Spanish Growth Keywords {to_markdown_table(top_localization, [ ('priority_rank', 'Rank'), ('keyword', 'Keyword'), ('cluster_label', 'Cluster'), ('file2_avg_monthly_searches', 'Avg Monthly Searches'), ('file1_impressions', 'Impressions'), ('priority_score', 'Score'), ('recommendation_reason', 'Why'), ])} ## Supporting Content Keywords {to_markdown_table(top_supporting, [ ('priority_rank', 'Rank'), ('keyword', 'Keyword'), ('language', 'Language'), ('cluster_label', 'Cluster'), ('priority_score', 'Score'), ('recommendation_reason', 'Why'), ])} ## Watchlist {to_markdown_table(top_watchlist, [ ('priority_rank', 'Rank'), ('keyword', 'Keyword'), ('language', 'Language'), ('brands', 'Brands'), ('priority_score', 'Score'), ('recommendation_reason', 'Why'), ])} ## Excluded Samples {to_markdown_table(top_excluded, [ ('keyword', 'Keyword'), ('language', 'Language'), ('cluster_label', 'Cluster'), ('priority_score', 'Score'), ('recommendation_reason', 'Exclusion Reason'), ])} ## Implementation Notes - The combined exports now show immediate live-page opportunities across `split pdf`, `compress pdf`, `merge pdf`, `pdf to word`, `word to pdf`, and adjacent OCR/conversion intent. - Spanish is the strongest growth market in the uploaded data, but those keywords are intentionally separated into `target_after_localization` until the site ships Spanish landing pages. - Arabic and French remain strategically valid because the product already supports both languages. Use the current dataset for targeted pages now, then supplement with native-language research before scaling site-wide coverage. - Competitor-branded phrases are kept in the watchlist only. They should not be mixed into the core unbranded landing-page portfolio. - Generic or malformed terms are excluded when they are too broad, not PDF-specific, or obviously generated noise from Keyword Planner suggestions. ## Output Files - `prioritized_keywords.csv` - master portfolio with recommendation status, market bucket, cluster mapping, and source metrics. - `keyword_clusters.csv` - cluster-level rollup for page planning. - `excluded_keywords.csv` - excluded or noisy terms with reasons. """ output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(content, encoding="utf-8") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Build a keyword portfolio from Google Ads exports.") parser.add_argument( "--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="Directory where the generated deliverables will be written.", ) parser.add_argument( "--inputs", nargs="*", default=[str(path) for path in DEFAULT_INPUTS], help="Input export files. Supports the repository's CSV and UTF-16 TSV Google Ads formats.", ) return parser.parse_args() def main() -> None: args = parse_args() input_paths = [Path(path) if Path(path).is_absolute() else ROOT / path for path in args.inputs] output_dir = Path(args.output_dir) if Path(args.output_dir).is_absolute() else ROOT / args.output_dir if not input_paths: raise FileNotFoundError("No keyword input files were found. Add exports under docs/keyword-research/2026-04-05/Keywords or pass --inputs explicitly.") raw_rows: list[SourceRow] = [] for path in input_paths: if path.name == "KeywordStats_4_5_2026.csv": raw_rows.extend(load_keyword_stats(path)) else: raw_rows.extend(load_keyword_planner(path)) aggregates = aggregate_rows(raw_rows) rows, excluded = build_keyword_rows(aggregates) clusters = build_cluster_rows(rows, excluded) prioritized_fields = [ "priority_rank", "recommendation", "recommendation_reason", "market_bucket", "keyword", "normalized_keyword", "language", "intent", "cluster", "cluster_label", "recommended_target", "target_type", "priority_score", "file2_avg_monthly_searches", "file1_impressions", "competition_levels", "competition_index_max", "brands", "modifiers", "source_count", "sources", "source_paths", "notes", ] excluded_fields = [ "keyword", "normalized_keyword", "language", "intent", "cluster", "cluster_label", "priority_score", "file2_avg_monthly_searches", "file1_impressions", "brands", "modifiers", "recommendation_reason", "sources", "source_paths", ] cluster_fields = [ "cluster", "cluster_label", "recommended_target", "target_type", "cluster_score", "keywords_total", "target_now_keywords", "target_after_localization_keywords", "supporting_content_keywords", "watchlist_keywords", "excluded_keywords", "top_keyword", "top_language", "file1_impressions", "file2_avg_monthly_searches", "implementation_note", ] write_csv(output_dir / "prioritized_keywords.csv", rows, prioritized_fields) write_csv(output_dir / "excluded_keywords.csv", excluded, excluded_fields) write_csv(output_dir / "keyword_clusters.csv", clusters, cluster_fields) write_summary(output_dir / "keyword_strategy.md", input_paths, raw_rows, aggregates, rows, excluded, clusters) print(f"Generated keyword portfolio in {output_dir}") print(f"Included rows: {len(rows)}") print(f"Excluded rows: {len(excluded)}") if __name__ == "__main__": main()