From 2bd6df4df4e021961994cca4b0a2f5350cc738ed Mon Sep 17 00:00:00 2001 From: Ahmet Kaya Date: Wed, 20 Aug 2025 19:14:32 +0300 Subject: [PATCH 1/6] install httpx library --- poetry.lock | 111 ++++++++++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 1 + 2 files changed, 110 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index 9e7646c6..1bbb34aa 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,28 @@ # This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +[[package]] +name = "anyio" +version = "4.5.2" +description = "High level compatibility layer for multiple asynchronous event loop implementations" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "anyio-4.5.2-py3-none-any.whl", hash = "sha256:c011ee36bc1e8ba40e5a81cb9df91925c218fe9b778554e0b56a21e1b5d4716f"}, + {file = "anyio-4.5.2.tar.gz", hash = "sha256:23009af4ed04ce05991845451e11ef02fc7c5ed29179ac9a420e5ad0ac7ddc5b"}, +] + +[package.dependencies] +exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""} +idna = ">=2.8" +sniffio = ">=1.1" +typing-extensions = {version = ">=4.1", markers = "python_version < \"3.11\""} + +[package.extras] +doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "truststore (>=0.9.1) ; python_version >= \"3.10\"", "uvloop (>=0.21.0b1) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\""] +trio = ["trio (>=0.26.1)"] + [[package]] name = "certifi" version = "2025.1.31" @@ -230,7 +253,7 @@ version = "1.2.2" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" -groups = ["test"] +groups = ["main", "test"] markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, @@ -240,6 +263,40 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "h11" +version = "0.16.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"}, + {file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"}, +] + +[[package]] +name = "httpcore" +version = "1.0.9" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"}, + {file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"}, +] + +[package.dependencies] +certifi = "*" +h11 = ">=0.16" + +[package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<1.0)"] + [[package]] name = "httpretty" version = "1.0.5" @@ -251,6 +308,31 @@ files = [ {file = "httpretty-1.0.5.tar.gz", hash = "sha256:e53c927c4d3d781a0761727f1edfad64abef94e828718e12b672a678a8b3e0b5"}, ] +[[package]] +name = "httpx" +version = "0.28.1" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, + {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, +] + +[package.dependencies] +anyio = "*" +certifi = "*" +httpcore = "==1.*" +idna = "*" + +[package.extras] +brotli = ["brotli ; platform_python_implementation == \"CPython\"", "brotlicffi ; platform_python_implementation != \"CPython\""] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +zstd = ["zstandard (>=0.18.0)"] + [[package]] name = "idna" version = "3.10" @@ -379,6 +461,18 @@ files = [ {file = "ruff-0.6.9.tar.gz", hash = "sha256:b076ef717a8e5bc819514ee1d602bbdca5b4420ae13a9cf61a0c0a4f53a2baa2"}, ] +[[package]] +name = "sniffio" +version = "1.3.1" +description = "Sniff out which async library your code is running under" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + [[package]] name = "tomli" version = "2.2.1" @@ -422,6 +516,19 @@ files = [ {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, ] +[[package]] +name = "typing-extensions" +version = "4.13.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version < \"3.11\"" +files = [ + {file = "typing_extensions-4.13.2-py3-none-any.whl", hash = "sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c"}, + {file = "typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef"}, +] + [[package]] name = "urllib3" version = "2.2.3" @@ -443,4 +550,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.1" python-versions = ">=3.8,<3.14" -content-hash = "90a050a0b068935ce6452cab0e0fa30c93c1af7ed745896403524110ad47c69b" +content-hash = "79f4e64adc63cef19b42cfe57cb9f22c0079dc41867551eaf80e1aeb379e7363" diff --git a/pyproject.toml b/pyproject.toml index d236365c..b2b4214e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ precommit.shell = "poe format && poe lint && poe coverage" python = ">=3.8,<3.14" requests = "*" defusedxml = "^0.7.1" +httpx = "^0.28.1" [tool.poetry.group.test] optional = true From 9a9d7feed1e0868ac7201a0f3254679ace0971cc Mon Sep 17 00:00:00 2001 From: Ahmet Kaya Date: Wed, 20 Aug 2025 19:14:50 +0300 Subject: [PATCH 2/6] feat: Add asynchronous transcript handling and fetching functionality --- youtube_transcript_api/_transcripts_async.py | 535 +++++++++++++++++++ 1 file changed, 535 insertions(+) create mode 100644 youtube_transcript_api/_transcripts_async.py diff --git a/youtube_transcript_api/_transcripts_async.py b/youtube_transcript_api/_transcripts_async.py new file mode 100644 index 00000000..9d2d2083 --- /dev/null +++ b/youtube_transcript_api/_transcripts_async.py @@ -0,0 +1,535 @@ +from httpx import AsyncClient, Response, HTTPError +from typing import List, Dict, Iterator, Iterable, Pattern, Optional, Union, Any +from ._settings import WATCH_URL, INNERTUBE_CONTEXT, INNERTUBE_API_URL +from dataclasses import dataclass, asdict +from .proxies import ProxyConfig +from ._transcripts import ( + FetchedTranscript, + FetchedTranscriptSnippet, + _TranslationLanguage, + _PlayabilityStatus, + _PlayabilityFailedReason, +) +from ._errors import ( + VideoUnavailable, + YouTubeRequestFailed, + NoTranscriptFound, + TranscriptsDisabled, + NotTranslatable, + TranslationLanguageNotAvailable, + FailedToCreateConsentCookie, + InvalidVideoId, + IpBlocked, + RequestBlocked, + AgeRestricted, + VideoUnplayable, + YouTubeDataUnparsable, + PoTokenRequired, +) + +from html import unescape + +from defusedxml import ElementTree + +import re + +import asyncio + +from itertools import chain + + +def _raise_http_errors(response: Response, video_id: str) -> Response: + try: + if response.status_code == 429: + raise IpBlocked(video_id) + response.raise_for_status() + return response + except HTTPError as error: + raise YouTubeRequestFailed(video_id, error) + + +class TranscriptAsync: + def __init__( + self, + http_client: AsyncClient, + video_id: str, + url: str, + language: str, + language_code: str, + is_generated: bool, + translation_languages: List[_TranslationLanguage], + ): + """ + You probably don't want to initialize this directly. Usually you'll access Transcript objects using a + TranscriptList. + """ + self._http_client = http_client + self.video_id = video_id + self._url = url + self.language = language + self.language_code = language_code + self.is_generated = is_generated + self.translation_languages = translation_languages + self._translation_languages_dict = { + translation_language.language_code: translation_language.language + for translation_language in translation_languages + } + + async def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: + """ + Loads the actual transcript data. + :param preserve_formatting: whether to keep select HTML text formatting + """ + if "&exp=xpe" in self._url: + raise PoTokenRequired(self.video_id) + response = await self._http_client.get(self._url) + transcript_parser = _TranscriptParserAsync( + preserve_formatting=preserve_formatting + ) + snippets = transcript_parser.parse( + _raise_http_errors(response, self.video_id).text, + ) + + return FetchedTranscript( + snippets=snippets, + video_id=self.video_id, + language=self.language, + language_code=self.language_code, + is_generated=self.is_generated, + ) + + def __str__(self) -> str: + return '{language_code} ("{language}"){translation_description}'.format( + language=self.language, + language_code=self.language_code, + translation_description="[TRANSLATABLE]" if self.is_translatable else "", + ) + + @property + def is_translatable(self) -> bool: + return len(self.translation_languages) > 0 + + def translate(self, language_code: str) -> "TranscriptAsync": + if not self.is_translatable: + raise NotTranslatable(self.video_id) + + if language_code not in self._translation_languages_dict: + raise TranslationLanguageNotAvailable(self.video_id) + + return TranscriptAsync( + self._http_client, + self.video_id, + "{url}&tlang={language_code}".format( + url=self._url, language_code=language_code + ), + self._translation_languages_dict[language_code], + language_code, + True, + [], + ) + + +class TranscriptListAsync: + """ + This object represents a list of transcripts. It can be iterated over to list all transcripts which are available + for a given YouTube video. Also, it provides functionality to search for a transcript in a given language. + """ + + def __init__( + self, + video_id: str, + manually_created_transcripts: Dict[str, TranscriptAsync], + generated_transcripts: Dict[str, TranscriptAsync], + translation_languages: List[_TranslationLanguage], + ): + """ + The constructor is only for internal use. Use the static build method instead. + + :param video_id: the id of the video this TranscriptList is for + :param manually_created_transcripts: dict mapping language codes to the manually created transcripts + :param generated_transcripts: dict mapping language codes to the generated transcripts + :param translation_languages: list of languages which can be used for translatable languages + """ + self.video_id = video_id + self._manually_created_transcripts = manually_created_transcripts + self._generated_transcripts = generated_transcripts + self._translation_languages = translation_languages + + @staticmethod + def build( + http_client: AsyncClient, video_id: str, captions_json: Dict + ) -> "TranscriptListAsync": + """ + Factory method for TranscriptList. + + :param http_client: http client which is used to make the transcript retrieving http calls + :param video_id: the id of the video this TranscriptList is for + :param captions_json: the JSON parsed from the YouTube pages static HTML + :return: the created TranscriptList + """ + translation_languages = [ + _TranslationLanguage( + language=translation_language["languageName"]["runs"][0]["text"], + language_code=translation_language["languageCode"], + ) + for translation_language in captions_json.get("translationLanguages", []) + ] + + manually_created_transcripts = {} + generated_transcripts = {} + + for caption in captions_json["captionTracks"]: + if caption.get("kind", "") == "asr": + transcript_dict = generated_transcripts + else: + transcript_dict = manually_created_transcripts + + transcript_dict[caption["languageCode"]] = TranscriptAsync( + http_client, + video_id, + caption["baseUrl"].replace("&fmt=srv3", ""), + caption["name"]["runs"][0]["text"], + caption["languageCode"], + caption.get("kind", "") == "asr", + translation_languages if caption.get("isTranslatable", False) else [], + ) + + return TranscriptListAsync( + video_id, + manually_created_transcripts, + generated_transcripts, + translation_languages, + ) + + def __iter__(self) -> Iterator[TranscriptAsync]: + return chain( + self._manually_created_transcripts.values(), + self._generated_transcripts.values(), + ) + + def find_transcript(self, language_codes: Iterable[str]) -> TranscriptAsync: + """ + Finds a transcript for a given language code. Manually created transcripts are returned first and only if none + are found, generated transcripts are used. If you only want generated transcripts use + `find_manually_created_transcript` instead. + + :param language_codes: A list of language codes in a descending priority. For example, if this is set to + ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if + it fails to do so. + :return: the found Transcript + """ + return self._find_transcript( + language_codes, + [self._manually_created_transcripts, self._generated_transcripts], + ) + + def find_generated_transcript( + self, language_codes: Iterable[str] + ) -> TranscriptAsync: + """ + Finds an automatically generated transcript for a given language code. + + :param language_codes: A list of language codes in a descending priority. For example, if this is set to + ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if + it fails to do so. + :return: the found Transcript + """ + return self._find_transcript(language_codes, [self._generated_transcripts]) + + def find_manually_created_transcript( + self, language_codes: Iterable[str] + ) -> TranscriptAsync: + """ + Finds a manually created transcript for a given language code. + + :param language_codes: A list of language codes in a descending priority. For example, if this is set to + ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if + it fails to do so. + :return: the found Transcript + """ + return self._find_transcript( + language_codes, [self._manually_created_transcripts] + ) + + def _find_transcript( + self, + language_codes: Iterable[str], + transcript_dicts: List[Dict[str, TranscriptAsync]], + ) -> TranscriptAsync: + for language_code in language_codes: + for transcript_dict in transcript_dicts: + if language_code in transcript_dict: + return transcript_dict[language_code] + + raise NoTranscriptFound(self.video_id, language_codes, self) + + def __str__(self) -> str: + return ( + "For this video ({video_id}) transcripts are available in the following languages:\n\n" + "(MANUALLY CREATED)\n" + "{available_manually_created_transcript_languages}\n\n" + "(GENERATED)\n" + "{available_generated_transcripts}\n\n" + "(TRANSLATION LANGUAGES)\n" + "{available_translation_languages}" + ).format( + video_id=self.video_id, + available_manually_created_transcript_languages=self._get_language_description( + str(transcript) + for transcript in self._manually_created_transcripts.values() + ), + available_generated_transcripts=self._get_language_description( + str(transcript) for transcript in self._generated_transcripts.values() + ), + available_translation_languages=self._get_language_description( + '{language_code} ("{language}")'.format( + language=translation_language.language, + language_code=translation_language.language_code, + ) + for translation_language in self._translation_languages + ), + ) + + def _get_language_description(self, transcript_strings: Iterable[str]) -> str: + description = "\n".join( + " - {transcript}".format(transcript=transcript) + for transcript in transcript_strings + ) + return description if description else "None" + + +class TranscriptListFetcherAsync: + def __init__(self, http_client: AsyncClient, proxy_config: Optional[ProxyConfig]): + self._http_client = http_client + self._proxy_config = proxy_config + + async def fetch(self, video_id: str) -> TranscriptListAsync: + return TranscriptListAsync.build( + self._http_client, + video_id, + await self._fetch_captions_json(video_id), + ) + + async def _fetch_captions_json(self, video_id: str, try_number: int = 0) -> Dict: + try: + html = await self._fetch_video_html(video_id) + api_key = self._extract_innertube_api_key(html, video_id) + innertube_data = await self._fetch_innertube_data(video_id, api_key) + return self._extract_captions_json(innertube_data, video_id) + except RequestBlocked as exception: + retries = ( + 0 + if self._proxy_config is None + else self._proxy_config.retries_when_blocked + ) + if try_number + 1 < retries: + return await self._fetch_captions_json( + video_id, try_number=try_number + 1 + ) + raise exception.with_proxy_config(self._proxy_config) + + def _extract_innertube_api_key(self, html: str, video_id: str) -> str: + pattern = r'"INNERTUBE_API_KEY":\s*"([a-zA-Z0-9_-]+)"' + match = re.search(pattern, html) + if match and len(match.groups()) == 1: + return match.group(1) + if 'class="g-recaptcha"' in html: + raise IpBlocked(video_id) + raise YouTubeDataUnparsable(video_id) # pragma: no cover + + def _extract_captions_json(self, innertube_data: Dict, video_id: str) -> Dict: + self._assert_playability(innertube_data.get("playabilityStatus"), video_id) + + captions_json = innertube_data.get("captions", {}).get( + "playerCaptionsTracklistRenderer" + ) + if captions_json is None or "captionTracks" not in captions_json: + raise TranscriptsDisabled(video_id) + + return captions_json + + def _assert_playability(self, playability_status_data: Dict, video_id: str) -> None: + playability_status = playability_status_data.get("status") + if ( + playability_status != _PlayabilityStatus.OK.value + and playability_status is not None + ): + reason = playability_status_data.get("reason") + if playability_status == _PlayabilityStatus.LOGIN_REQUIRED.value: + if reason == _PlayabilityFailedReason.BOT_DETECTED.value: + raise RequestBlocked(video_id) + if reason == _PlayabilityFailedReason.AGE_RESTRICTED.value: + raise AgeRestricted(video_id) + if ( + playability_status == _PlayabilityStatus.ERROR.value + and reason == _PlayabilityFailedReason.VIDEO_UNAVAILABLE.value + ): + if video_id.startswith("http://") or video_id.startswith("https://"): + raise InvalidVideoId(video_id) + raise VideoUnavailable(video_id) + subreasons = ( + playability_status_data.get("errorScreen", {}) + .get("playerErrorMessageRenderer", {}) + .get("subreason", {}) + .get("runs", []) + ) + raise VideoUnplayable( + video_id, reason, [run.get("text", "") for run in subreasons] + ) + + def _create_consent_cookie(self, html: str, video_id: str) -> None: + match = re.search('name="v" value="(.*?)"', html) + if match is None: + raise FailedToCreateConsentCookie(video_id) + self._http_client.cookies.set( + "CONSENT", "YES+" + match.group(1), domain=".youtube.com" + ) + + async def _fetch_video_html(self, video_id: str) -> str: + html = await self._fetch_html(video_id) + if 'action="https://consent.youtube.com/s"' in html: + self._create_consent_cookie(html, video_id) + html = await self._fetch_html(video_id) + if 'action="https://consent.youtube.com/s"' in html: + raise FailedToCreateConsentCookie(video_id) + return html + + async def _fetch_html(self, video_id: str) -> str: + response = await self._http_client.get(WATCH_URL.format(video_id=video_id)) + return unescape(_raise_http_errors(response, video_id).text) + + async def _fetch_innertube_data(self, video_id: str, api_key: str) -> Dict: + response = await self._http_client.post( + INNERTUBE_API_URL.format(api_key=api_key), + json={ + "context": INNERTUBE_CONTEXT, + "videoId": video_id, + }, + ) + data = _raise_http_errors(response, video_id).json() + return data + + +class _TranscriptParserAsync: + _FORMATTING_TAGS = [ + "strong", # important + "em", # emphasized + "b", # bold + "i", # italic + "mark", # marked + "small", # smaller + "del", # deleted + "ins", # inserted + "sub", # subscript + "sup", # superscript + ] + + def __init__(self, preserve_formatting: bool = False): + self._html_regex = self._get_html_regex(preserve_formatting) + + def _get_html_regex(self, preserve_formatting: bool) -> Pattern[str]: + if preserve_formatting: + formats_regex = "|".join(self._FORMATTING_TAGS) + formats_regex = r"<\/?(?!\/?(" + formats_regex + r")\b).*?\b>" + html_regex = re.compile(formats_regex, re.IGNORECASE) + else: + html_regex = re.compile(r"<[^>]*>", re.IGNORECASE) + return html_regex + + def parse(self, raw_data: str) -> List[FetchedTranscriptSnippet]: + return [ + FetchedTranscriptSnippet( + text=re.sub(self._html_regex, "", unescape(xml_element.text)), + start=float(xml_element.attrib["start"]), + duration=float(xml_element.attrib.get("dur", "0.0")), + ) + for xml_element in ElementTree.fromstring(raw_data) + if xml_element.text is not None + ] + + +@dataclass +class BulkFetchResults: + video_id: str + result: Union[FetchedTranscript, Dict[str, Any]] # Either transcript or error dict + + def to_raw_data(self): + return asdict(self) + + +class AsyncTranscriptHandler: + def __init__( + self, + fetcher: TranscriptListFetcherAsync, + proxy_config: Optional[ProxyConfig] = None, + max_concurrent: int = 10, + ): + self._fetcher = fetcher + self._proxy_config = proxy_config + self._semaphore = asyncio.Semaphore(max_concurrent) + + async def fetch_single( + self, + video_id: str, + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> FetchedTranscript: + """Fetch transcript for a single video""" + async with self._semaphore: # Add rate limiting + transcript_list = await self._fetcher.fetch(video_id) + transcript = transcript_list.find_transcript(languages) + return await transcript.fetch(preserve_formatting=preserve_formatting) + + async def fetch_bulk( + self, + video_ids: List[str], + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> List[BulkFetchResults]: + """Fetch transcripts for multiple videos concurrently with error handling""" + + async def _safe_fetch(video_id: str) -> Union[FetchedTranscript, Exception]: + try: + return await self.fetch_single( + video_id, + languages=languages, + preserve_formatting=preserve_formatting, + ) + except Exception as e: + return e + + # Create tasks with proper error handling + tasks = [_safe_fetch(video_id) for video_id in video_ids] + results = await asyncio.gather(*tasks, return_exceptions=True) + + return self._process_bulk_results(video_ids, results) + + def _serialize_exception(self, exc: BaseException) -> Dict[str, Any]: + """Convert exception to serializable dict""" + return { + "type": exc.__class__.__name__, + "message": str(exc), + **getattr(exc, "__dict__", {}), + } + + def _process_bulk_results( + self, + video_ids: List[str], + results: List[Union[FetchedTranscript, Exception]], + ) -> List[BulkFetchResults]: + """Process bulk fetch results with error handling""" + processed_results = [] + + for video_id, result in zip(video_ids, results): + if isinstance(result, Exception): + processed_results.append( + BulkFetchResults( + video_id=video_id, result=self._serialize_exception(result) + ) + ) + else: + processed_results.append( + BulkFetchResults(video_id=video_id, result=result) + ) + + return processed_results From bb7150fc47d86a20d83b87700999a6135d121c4a Mon Sep 17 00:00:00 2001 From: Ahmet Kaya Date: Wed, 20 Aug 2025 19:16:04 +0300 Subject: [PATCH 3/6] feat: Add YoutubeTranscriptAsyncApi --- youtube_transcript_api/__init__.py | 3 +- youtube_transcript_api/_api.py | 134 ++++++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 3 deletions(-) diff --git a/youtube_transcript_api/__init__.py b/youtube_transcript_api/__init__.py index 8e8d7268..cb6e24d5 100644 --- a/youtube_transcript_api/__init__.py +++ b/youtube_transcript_api/__init__.py @@ -1,5 +1,5 @@ # ruff: noqa: F401 -from ._api import YouTubeTranscriptApi +from ._api import YouTubeTranscriptApi, YoutubeTranscriptAsyncApi from ._transcripts import ( TranscriptList, Transcript, @@ -30,6 +30,7 @@ __all__ = [ "YouTubeTranscriptApi", + "YoutubeTranscriptAsyncApi", "TranscriptList", "Transcript", "FetchedTranscript", diff --git a/youtube_transcript_api/_api.py b/youtube_transcript_api/_api.py index b2944934..bca971d2 100644 --- a/youtube_transcript_api/_api.py +++ b/youtube_transcript_api/_api.py @@ -1,5 +1,4 @@ -from typing import Optional, Iterable - +from typing import Optional, Iterable, List from requests import Session from requests.adapters import HTTPAdapter from urllib3 import Retry @@ -7,6 +6,13 @@ from .proxies import ProxyConfig from ._transcripts import TranscriptListFetcher, FetchedTranscript, TranscriptList +from ._transcripts_async import ( + TranscriptListFetcherAsync, + AsyncTranscriptHandler, + BulkFetchResults, +) + +from httpx import AsyncClient, AsyncHTTPTransport class YouTubeTranscriptApi: @@ -125,3 +131,127 @@ def list( Make sure that this is the actual ID, NOT the full URL to the video! """ return self._fetcher.fetch(video_id) + + +class YoutubeTranscriptAsyncApi: + def __init__( + self, + proxy_config: Optional[ProxyConfig] = None, + async_client: Optional[AsyncClient] = None, + ): + async_client = AsyncClient(timeout=20) if async_client is None else async_client + async_client.headers.update({"Accept-Language": "en-US"}) + + if proxy_config is not None: + async_client.proxies = proxy_config.to_requests_dict() + if proxy_config.prevent_keeping_connections_alive: + async_client.headers.update({"Connection": "close"}) + if proxy_config.retries_when_blocked > 0: + transport = AsyncHTTPTransport( + retries=proxy_config.retries_when_blocked + ) + async_client.mount("http://", transport) + async_client.mount("https://", transport) + + self._fetcher = TranscriptListFetcherAsync( + async_client, proxy_config=proxy_config + ) + self._handler = AsyncTranscriptHandler(self._fetcher, proxy_config) + + async def fetch_single( + self, + video_id: str, + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> FetchedTranscript: + """ + Retrieves the transcript for a single video. This is just a shortcut for + calling: + `YouTubeTranscriptApi().list(video_id).find_transcript(languages).fetch(preserve_formatting=preserve_formatting)` + + :param video_id: the ID of the video you want to retrieve the transcript for. + Make sure that this is the actual ID, NOT the full URL to the video! + :param languages: A list of language codes in a descending priority. For + example, if this is set to ["de", "en"] it will first try to fetch the + german transcript (de) and then fetch the english transcript (en) if + it fails to do so. This defaults to ["en"]. + :param preserve_formatting: whether to keep select HTML text formatting + """ + + return await self._handler.fetch_single( + video_id, languages, preserve_formatting + ) + + async def fetch_all( + self, + video_ids: List[str], + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> list[BulkFetchResults]: + """ + Asynchronously retrieves transcripts for a list of video IDs concurrently. + + :param video_ids: List of video IDs. + :param languages: List of language codes in descending priority (default: ["en"]). + :param preserve_formatting: Whether to keep HTML formatting. + :param continue_after_error: If True, skip failed video IDs and return partial results; else raise the first error. + :param log_errors: If True, collected errors will logged in console for more information. + :return: Dict of {video_id: FetchedTranscript}. + """ + transcripts = await self._handler.fetch_bulk( + video_ids, languages, preserve_formatting + ) + return transcripts + + async def list( + self, + video_id: str, + ) -> TranscriptList: + """ + Retrieves the list of transcripts which are available for a given video. It + returns a `TranscriptList` object which is iterable and provides methods to + filter the list of transcripts for specific languages. While iterating over + the `TranscriptList` the individual transcripts are represented by + `Transcript` objects, which provide metadata and can either be fetched by + calling `transcript.fetch()` or translated by calling `transcript.translate( + 'en')`. Example: + + ``` + ytt_api = YouTubeTranscriptApi() + + # retrieve the available transcripts + transcript_list = ytt_api.list('video_id') + + # iterate over all available transcripts + for transcript in transcript_list: + # the Transcript object provides metadata properties + print( + transcript.video_id, + transcript.language, + transcript.language_code, + # whether it has been manually created or generated by YouTube + transcript.is_generated, + # a list of languages the transcript can be translated to + transcript.translation_languages, + ) + + # fetch the actual transcript data + print(transcript.fetch()) + + # translating the transcript will return another transcript object + print(transcript.translate('en').fetch()) + + # you can also directly filter for the language you are looking for, using the transcript list + transcript = transcript_list.find_transcript(['de', 'en']) + + # or just filter for manually created transcripts + transcript = transcript_list.find_manually_created_transcript(['de', 'en']) + + # or automatically generated ones + transcript = transcript_list.find_generated_transcript(['de', 'en']) + ``` + + :param video_id: the ID of the video you want to retrieve the transcript for. + Make sure that this is the actual ID, NOT the full URL to the video! + """ + return await self._fetcher.fetch(video_id) From a3845085d358561d4db05d9571f9ece2b5694e47 Mon Sep 17 00:00:00 2001 From: Ahmet Kaya Date: Thu, 9 Oct 2025 20:08:31 +0300 Subject: [PATCH 4/6] remove duplicated methods from _transcripts_async --- youtube_transcript_api/_transcripts_async.py | 504 ++----------------- 1 file changed, 55 insertions(+), 449 deletions(-) diff --git a/youtube_transcript_api/_transcripts_async.py b/youtube_transcript_api/_transcripts_async.py index 9d2d2083..7774c462 100644 --- a/youtube_transcript_api/_transcripts_async.py +++ b/youtube_transcript_api/_transcripts_async.py @@ -1,466 +1,61 @@ -from httpx import AsyncClient, Response, HTTPError -from typing import List, Dict, Iterator, Iterable, Pattern, Optional, Union, Any -from ._settings import WATCH_URL, INNERTUBE_CONTEXT, INNERTUBE_API_URL +from typing import List, Dict, Iterable, Optional, Union, Any from dataclasses import dataclass, asdict from .proxies import ProxyConfig from ._transcripts import ( FetchedTranscript, - FetchedTranscriptSnippet, - _TranslationLanguage, - _PlayabilityStatus, - _PlayabilityFailedReason, + TranscriptListFetcher, ) -from ._errors import ( - VideoUnavailable, - YouTubeRequestFailed, - NoTranscriptFound, - TranscriptsDisabled, - NotTranslatable, - TranslationLanguageNotAvailable, - FailedToCreateConsentCookie, - InvalidVideoId, - IpBlocked, - RequestBlocked, - AgeRestricted, - VideoUnplayable, - YouTubeDataUnparsable, - PoTokenRequired, -) - -from html import unescape - -from defusedxml import ElementTree - -import re import asyncio -from itertools import chain - - -def _raise_http_errors(response: Response, video_id: str) -> Response: - try: - if response.status_code == 429: - raise IpBlocked(video_id) - response.raise_for_status() - return response - except HTTPError as error: - raise YouTubeRequestFailed(video_id, error) - - -class TranscriptAsync: - def __init__( - self, - http_client: AsyncClient, - video_id: str, - url: str, - language: str, - language_code: str, - is_generated: bool, - translation_languages: List[_TranslationLanguage], - ): - """ - You probably don't want to initialize this directly. Usually you'll access Transcript objects using a - TranscriptList. - """ - self._http_client = http_client - self.video_id = video_id - self._url = url - self.language = language - self.language_code = language_code - self.is_generated = is_generated - self.translation_languages = translation_languages - self._translation_languages_dict = { - translation_language.language_code: translation_language.language - for translation_language in translation_languages - } - - async def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: - """ - Loads the actual transcript data. - :param preserve_formatting: whether to keep select HTML text formatting - """ - if "&exp=xpe" in self._url: - raise PoTokenRequired(self.video_id) - response = await self._http_client.get(self._url) - transcript_parser = _TranscriptParserAsync( - preserve_formatting=preserve_formatting - ) - snippets = transcript_parser.parse( - _raise_http_errors(response, self.video_id).text, - ) - - return FetchedTranscript( - snippets=snippets, - video_id=self.video_id, - language=self.language, - language_code=self.language_code, - is_generated=self.is_generated, - ) - - def __str__(self) -> str: - return '{language_code} ("{language}"){translation_description}'.format( - language=self.language, - language_code=self.language_code, - translation_description="[TRANSLATABLE]" if self.is_translatable else "", - ) - - @property - def is_translatable(self) -> bool: - return len(self.translation_languages) > 0 - - def translate(self, language_code: str) -> "TranscriptAsync": - if not self.is_translatable: - raise NotTranslatable(self.video_id) - - if language_code not in self._translation_languages_dict: - raise TranslationLanguageNotAvailable(self.video_id) - - return TranscriptAsync( - self._http_client, - self.video_id, - "{url}&tlang={language_code}".format( - url=self._url, language_code=language_code - ), - self._translation_languages_dict[language_code], - language_code, - True, - [], - ) - - -class TranscriptListAsync: - """ - This object represents a list of transcripts. It can be iterated over to list all transcripts which are available - for a given YouTube video. Also, it provides functionality to search for a transcript in a given language. - """ - - def __init__( - self, - video_id: str, - manually_created_transcripts: Dict[str, TranscriptAsync], - generated_transcripts: Dict[str, TranscriptAsync], - translation_languages: List[_TranslationLanguage], - ): - """ - The constructor is only for internal use. Use the static build method instead. - - :param video_id: the id of the video this TranscriptList is for - :param manually_created_transcripts: dict mapping language codes to the manually created transcripts - :param generated_transcripts: dict mapping language codes to the generated transcripts - :param translation_languages: list of languages which can be used for translatable languages - """ - self.video_id = video_id - self._manually_created_transcripts = manually_created_transcripts - self._generated_transcripts = generated_transcripts - self._translation_languages = translation_languages - - @staticmethod - def build( - http_client: AsyncClient, video_id: str, captions_json: Dict - ) -> "TranscriptListAsync": - """ - Factory method for TranscriptList. - - :param http_client: http client which is used to make the transcript retrieving http calls - :param video_id: the id of the video this TranscriptList is for - :param captions_json: the JSON parsed from the YouTube pages static HTML - :return: the created TranscriptList - """ - translation_languages = [ - _TranslationLanguage( - language=translation_language["languageName"]["runs"][0]["text"], - language_code=translation_language["languageCode"], - ) - for translation_language in captions_json.get("translationLanguages", []) - ] - - manually_created_transcripts = {} - generated_transcripts = {} - - for caption in captions_json["captionTracks"]: - if caption.get("kind", "") == "asr": - transcript_dict = generated_transcripts - else: - transcript_dict = manually_created_transcripts - - transcript_dict[caption["languageCode"]] = TranscriptAsync( - http_client, - video_id, - caption["baseUrl"].replace("&fmt=srv3", ""), - caption["name"]["runs"][0]["text"], - caption["languageCode"], - caption.get("kind", "") == "asr", - translation_languages if caption.get("isTranslatable", False) else [], - ) - - return TranscriptListAsync( - video_id, - manually_created_transcripts, - generated_transcripts, - translation_languages, - ) - - def __iter__(self) -> Iterator[TranscriptAsync]: - return chain( - self._manually_created_transcripts.values(), - self._generated_transcripts.values(), - ) - - def find_transcript(self, language_codes: Iterable[str]) -> TranscriptAsync: - """ - Finds a transcript for a given language code. Manually created transcripts are returned first and only if none - are found, generated transcripts are used. If you only want generated transcripts use - `find_manually_created_transcript` instead. - - :param language_codes: A list of language codes in a descending priority. For example, if this is set to - ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if - it fails to do so. - :return: the found Transcript - """ - return self._find_transcript( - language_codes, - [self._manually_created_transcripts, self._generated_transcripts], - ) - - def find_generated_transcript( - self, language_codes: Iterable[str] - ) -> TranscriptAsync: - """ - Finds an automatically generated transcript for a given language code. - - :param language_codes: A list of language codes in a descending priority. For example, if this is set to - ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if - it fails to do so. - :return: the found Transcript - """ - return self._find_transcript(language_codes, [self._generated_transcripts]) - - def find_manually_created_transcript( - self, language_codes: Iterable[str] - ) -> TranscriptAsync: - """ - Finds a manually created transcript for a given language code. - - :param language_codes: A list of language codes in a descending priority. For example, if this is set to - ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if - it fails to do so. - :return: the found Transcript - """ - return self._find_transcript( - language_codes, [self._manually_created_transcripts] - ) - - def _find_transcript( - self, - language_codes: Iterable[str], - transcript_dicts: List[Dict[str, TranscriptAsync]], - ) -> TranscriptAsync: - for language_code in language_codes: - for transcript_dict in transcript_dicts: - if language_code in transcript_dict: - return transcript_dict[language_code] - - raise NoTranscriptFound(self.video_id, language_codes, self) - - def __str__(self) -> str: - return ( - "For this video ({video_id}) transcripts are available in the following languages:\n\n" - "(MANUALLY CREATED)\n" - "{available_manually_created_transcript_languages}\n\n" - "(GENERATED)\n" - "{available_generated_transcripts}\n\n" - "(TRANSLATION LANGUAGES)\n" - "{available_translation_languages}" - ).format( - video_id=self.video_id, - available_manually_created_transcript_languages=self._get_language_description( - str(transcript) - for transcript in self._manually_created_transcripts.values() - ), - available_generated_transcripts=self._get_language_description( - str(transcript) for transcript in self._generated_transcripts.values() - ), - available_translation_languages=self._get_language_description( - '{language_code} ("{language}")'.format( - language=translation_language.language, - language_code=translation_language.language_code, - ) - for translation_language in self._translation_languages - ), - ) - - def _get_language_description(self, transcript_strings: Iterable[str]) -> str: - description = "\n".join( - " - {transcript}".format(transcript=transcript) - for transcript in transcript_strings - ) - return description if description else "None" - - -class TranscriptListFetcherAsync: - def __init__(self, http_client: AsyncClient, proxy_config: Optional[ProxyConfig]): - self._http_client = http_client - self._proxy_config = proxy_config - - async def fetch(self, video_id: str) -> TranscriptListAsync: - return TranscriptListAsync.build( - self._http_client, - video_id, - await self._fetch_captions_json(video_id), - ) - - async def _fetch_captions_json(self, video_id: str, try_number: int = 0) -> Dict: - try: - html = await self._fetch_video_html(video_id) - api_key = self._extract_innertube_api_key(html, video_id) - innertube_data = await self._fetch_innertube_data(video_id, api_key) - return self._extract_captions_json(innertube_data, video_id) - except RequestBlocked as exception: - retries = ( - 0 - if self._proxy_config is None - else self._proxy_config.retries_when_blocked - ) - if try_number + 1 < retries: - return await self._fetch_captions_json( - video_id, try_number=try_number + 1 - ) - raise exception.with_proxy_config(self._proxy_config) - - def _extract_innertube_api_key(self, html: str, video_id: str) -> str: - pattern = r'"INNERTUBE_API_KEY":\s*"([a-zA-Z0-9_-]+)"' - match = re.search(pattern, html) - if match and len(match.groups()) == 1: - return match.group(1) - if 'class="g-recaptcha"' in html: - raise IpBlocked(video_id) - raise YouTubeDataUnparsable(video_id) # pragma: no cover - - def _extract_captions_json(self, innertube_data: Dict, video_id: str) -> Dict: - self._assert_playability(innertube_data.get("playabilityStatus"), video_id) - - captions_json = innertube_data.get("captions", {}).get( - "playerCaptionsTracklistRenderer" - ) - if captions_json is None or "captionTracks" not in captions_json: - raise TranscriptsDisabled(video_id) - - return captions_json - - def _assert_playability(self, playability_status_data: Dict, video_id: str) -> None: - playability_status = playability_status_data.get("status") - if ( - playability_status != _PlayabilityStatus.OK.value - and playability_status is not None - ): - reason = playability_status_data.get("reason") - if playability_status == _PlayabilityStatus.LOGIN_REQUIRED.value: - if reason == _PlayabilityFailedReason.BOT_DETECTED.value: - raise RequestBlocked(video_id) - if reason == _PlayabilityFailedReason.AGE_RESTRICTED.value: - raise AgeRestricted(video_id) - if ( - playability_status == _PlayabilityStatus.ERROR.value - and reason == _PlayabilityFailedReason.VIDEO_UNAVAILABLE.value - ): - if video_id.startswith("http://") or video_id.startswith("https://"): - raise InvalidVideoId(video_id) - raise VideoUnavailable(video_id) - subreasons = ( - playability_status_data.get("errorScreen", {}) - .get("playerErrorMessageRenderer", {}) - .get("subreason", {}) - .get("runs", []) - ) - raise VideoUnplayable( - video_id, reason, [run.get("text", "") for run in subreasons] - ) - - def _create_consent_cookie(self, html: str, video_id: str) -> None: - match = re.search('name="v" value="(.*?)"', html) - if match is None: - raise FailedToCreateConsentCookie(video_id) - self._http_client.cookies.set( - "CONSENT", "YES+" + match.group(1), domain=".youtube.com" - ) - - async def _fetch_video_html(self, video_id: str) -> str: - html = await self._fetch_html(video_id) - if 'action="https://consent.youtube.com/s"' in html: - self._create_consent_cookie(html, video_id) - html = await self._fetch_html(video_id) - if 'action="https://consent.youtube.com/s"' in html: - raise FailedToCreateConsentCookie(video_id) - return html - - async def _fetch_html(self, video_id: str) -> str: - response = await self._http_client.get(WATCH_URL.format(video_id=video_id)) - return unescape(_raise_http_errors(response, video_id).text) - - async def _fetch_innertube_data(self, video_id: str, api_key: str) -> Dict: - response = await self._http_client.post( - INNERTUBE_API_URL.format(api_key=api_key), - json={ - "context": INNERTUBE_CONTEXT, - "videoId": video_id, - }, - ) - data = _raise_http_errors(response, video_id).json() - return data - - -class _TranscriptParserAsync: - _FORMATTING_TAGS = [ - "strong", # important - "em", # emphasized - "b", # bold - "i", # italic - "mark", # marked - "small", # smaller - "del", # deleted - "ins", # inserted - "sub", # subscript - "sup", # superscript - ] - - def __init__(self, preserve_formatting: bool = False): - self._html_regex = self._get_html_regex(preserve_formatting) - - def _get_html_regex(self, preserve_formatting: bool) -> Pattern[str]: - if preserve_formatting: - formats_regex = "|".join(self._FORMATTING_TAGS) - formats_regex = r"<\/?(?!\/?(" + formats_regex + r")\b).*?\b>" - html_regex = re.compile(formats_regex, re.IGNORECASE) - else: - html_regex = re.compile(r"<[^>]*>", re.IGNORECASE) - return html_regex - - def parse(self, raw_data: str) -> List[FetchedTranscriptSnippet]: - return [ - FetchedTranscriptSnippet( - text=re.sub(self._html_regex, "", unescape(xml_element.text)), - start=float(xml_element.attrib["start"]), - duration=float(xml_element.attrib.get("dur", "0.0")), - ) - for xml_element in ElementTree.fromstring(raw_data) - if xml_element.text is not None - ] - - @dataclass class BulkFetchResults: video_id: str - result: Union[FetchedTranscript, Dict[str, Any]] # Either transcript or error dict + result: Union[FetchedTranscript, Dict[str, Any]] def to_raw_data(self): return asdict(self) class AsyncTranscriptHandler: + """ + An asynchronous handler for fetching YouTube transcripts concurrently. + + This class provides high-level methods for fetching transcripts for one + or more YouTube videos while handling concurrency limits, exceptions, + and optional proxy configuration. + + Features: + - Concurrency limiting with an asyncio.Semaphore. + - Fetching single or multiple transcripts concurrently. + - Built-in error handling with structured exception serialization. + - Proxy configuration support (optional). + + Attributes: + _fetcher (TranscriptListFetcherAsync): + The transcript fetcher responsible for retrieving transcript lists. + _proxy_config (Optional[ProxyConfig]): + Proxy configuration used when making requests. + _semaphore (asyncio.Semaphore): + Semaphore to limit the number of concurrent requests. + + Example: + >>> handler = AsyncTranscriptHandler(fetcher, max_concurrent=5) + >>> results = await handler.fetch_bulk( + ... ["video_id_1", "video_id_2"], + ... ) + >>> for r in results: + ... print(r.video_id, r.result) + + Notes: + - `fetch_bulk` will always return a list of results in the same order + as the provided `video_ids`. + - If an exception occurs during fetching, the exception is captured + and serialized into a dictionary with `type` and `message`. + """ def __init__( self, - fetcher: TranscriptListFetcherAsync, + fetcher: TranscriptListFetcher, proxy_config: Optional[ProxyConfig] = None, max_concurrent: int = 10, ): @@ -475,7 +70,7 @@ async def fetch_single( preserve_formatting: bool = False, ) -> FetchedTranscript: """Fetch transcript for a single video""" - async with self._semaphore: # Add rate limiting + async with self._semaphore: transcript_list = await self._fetcher.fetch(video_id) transcript = transcript_list.find_transcript(languages) return await transcript.fetch(preserve_formatting=preserve_formatting) @@ -486,7 +81,15 @@ async def fetch_bulk( languages: Iterable[str] = ("en",), preserve_formatting: bool = False, ) -> List[BulkFetchResults]: - """Fetch transcripts for multiple videos concurrently with error handling""" + """Fetch transcripts for multiple videos concurrently with error handling. + Args: + video_ids: List of YouTube video IDs. + languages: Languages to try in order. + preserve_formatting: Whether to preserve original transcript formatting. + + Returns: + A list of FetchResult objects, one per video_id. + """ async def _safe_fetch(video_id: str) -> Union[FetchedTranscript, Exception]: try: @@ -498,7 +101,6 @@ async def _safe_fetch(video_id: str) -> Union[FetchedTranscript, Exception]: except Exception as e: return e - # Create tasks with proper error handling tasks = [_safe_fetch(video_id) for video_id in video_ids] results = await asyncio.gather(*tasks, return_exceptions=True) @@ -524,12 +126,16 @@ def _process_bulk_results( if isinstance(result, Exception): processed_results.append( BulkFetchResults( - video_id=video_id, result=self._serialize_exception(result) + video_id=video_id, + result=self._serialize_exception(result) ) ) else: processed_results.append( - BulkFetchResults(video_id=video_id, result=result) + BulkFetchResults( + video_id=video_id, + result=result + ) ) return processed_results From 5d98d7aaf6b4c0991699087dfd21d2a622d96d13 Mon Sep 17 00:00:00 2001 From: Ahmet Kaya Date: Thu, 9 Oct 2025 20:09:16 +0300 Subject: [PATCH 5/6] refactor Transcript and TranscriptListFetcher class to be async --- youtube_transcript_api/_transcripts.py | 43 ++++++++++++++++---------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/youtube_transcript_api/_transcripts.py b/youtube_transcript_api/_transcripts.py index 55baa426..3e02e7a5 100644 --- a/youtube_transcript_api/_transcripts.py +++ b/youtube_transcript_api/_transcripts.py @@ -8,8 +8,10 @@ from defusedxml import ElementTree import re +import asyncio from requests import HTTPError, Session, Response +from httpx import AsyncClient from .proxies import ProxyConfig from ._settings import WATCH_URL, INNERTUBE_CONTEXT, INNERTUBE_API_URL @@ -103,7 +105,7 @@ def _raise_http_errors(response: Response, video_id: str) -> Response: class Transcript: def __init__( self, - http_client: Session, + http_client: AsyncClient, video_id: str, url: str, language: str, @@ -127,14 +129,14 @@ def __init__( for translation_language in translation_languages } - def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: + async def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: """ Loads the actual transcript data. :param preserve_formatting: whether to keep select HTML text formatting """ if "&exp=xpe" in self._url: raise PoTokenRequired(self.video_id) - response = self._http_client.get(self._url) + response = await self._http_client.get(self._url) snippets = _TranscriptParser(preserve_formatting=preserve_formatting).parse( _raise_http_errors(response, self.video_id).text, ) @@ -145,6 +147,10 @@ def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: language_code=self.language_code, is_generated=self.is_generated, ) + + def fetch_sync(self, preserve_formatting: bool = False) -> FetchedTranscript: + loop = asyncio.get_event_loop() + return loop.run_until_complete(self.fetch(preserve_formatting=preserve_formatting)) def __str__(self) -> str: return '{language_code} ("{language}"){translation_description}'.format( @@ -345,22 +351,26 @@ def _get_language_description(self, transcript_strings: Iterable[str]) -> str: class TranscriptListFetcher: - def __init__(self, http_client: Session, proxy_config: Optional[ProxyConfig]): + def __init__(self, http_client: AsyncClient, proxy_config: Optional[ProxyConfig]): self._http_client = http_client self._proxy_config = proxy_config - def fetch(self, video_id: str) -> TranscriptList: + async def fetch(self, video_id: str) -> TranscriptList: return TranscriptList.build( self._http_client, video_id, - self._fetch_captions_json(video_id), + await self._fetch_captions_json(video_id), ) + + def fetch_sync(self, video_id: str) -> TranscriptList: + loop = asyncio.get_event_loop() + return loop.run_until_complete(self.fetch(video_id)) - def _fetch_captions_json(self, video_id: str, try_number: int = 0) -> Dict: + async def _fetch_captions_json(self, video_id: str, try_number: int = 0) -> Dict: try: - html = self._fetch_video_html(video_id) + html = await self._fetch_video_html(video_id) api_key = self._extract_innertube_api_key(html, video_id) - innertube_data = self._fetch_innertube_data(video_id, api_key) + innertube_data = await self._fetch_innertube_data(video_id, api_key) return self._extract_captions_json(innertube_data, video_id) except RequestBlocked as exception: retries = ( @@ -429,21 +439,22 @@ def _create_consent_cookie(self, html: str, video_id: str) -> None: "CONSENT", "YES+" + match.group(1), domain=".youtube.com" ) - def _fetch_video_html(self, video_id: str) -> str: - html = self._fetch_html(video_id) + async def _fetch_video_html(self, video_id: str) -> str: + html = await self._fetch_html(video_id) if 'action="https://consent.youtube.com/s"' in html: self._create_consent_cookie(html, video_id) - html = self._fetch_html(video_id) + html = await self._fetch_html(video_id) if 'action="https://consent.youtube.com/s"' in html: raise FailedToCreateConsentCookie(video_id) return html - def _fetch_html(self, video_id: str) -> str: - response = self._http_client.get(WATCH_URL.format(video_id=video_id)) + async def _fetch_html(self, video_id: str) -> str: + + response = await self._http_client.get(WATCH_URL.format(video_id=video_id)) return unescape(_raise_http_errors(response, video_id).text) - def _fetch_innertube_data(self, video_id: str, api_key: str) -> Dict: - response = self._http_client.post( + async def _fetch_innertube_data(self, video_id: str, api_key: str) -> Dict: + response = await self._http_client.post( INNERTUBE_API_URL.format(api_key=api_key), json={ "context": INNERTUBE_CONTEXT, From efe3fd92c225e5fa507a3ffaf21293365d3e7517 Mon Sep 17 00:00:00 2001 From: Ahmet Kaya Date: Thu, 9 Oct 2025 20:09:48 +0300 Subject: [PATCH 6/6] refactor: Update YouTubeTranscriptApi to use AsyncClient and streamline HTTP transport --- youtube_transcript_api/_api.py | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/youtube_transcript_api/_api.py b/youtube_transcript_api/_api.py index bca971d2..17ee7758 100644 --- a/youtube_transcript_api/_api.py +++ b/youtube_transcript_api/_api.py @@ -1,13 +1,8 @@ from typing import Optional, Iterable, List -from requests import Session -from requests.adapters import HTTPAdapter -from urllib3 import Retry - from .proxies import ProxyConfig from ._transcripts import TranscriptListFetcher, FetchedTranscript, TranscriptList from ._transcripts_async import ( - TranscriptListFetcherAsync, AsyncTranscriptHandler, BulkFetchResults, ) @@ -19,7 +14,7 @@ class YouTubeTranscriptApi: def __init__( self, proxy_config: Optional[ProxyConfig] = None, - http_client: Optional[Session] = None, + http_client: Optional[AsyncClient] = None, ): """ Note on thread-safety: As this class will initialize a `requests.Session` @@ -35,7 +30,7 @@ def __init__( manually want to share cookies between different instances of `YouTubeTranscriptApi`, overwrite defaults, specify SSL certificates, etc. """ - http_client = Session() if http_client is None else http_client + http_client = AsyncClient(timeout=20) if http_client is None else http_client http_client.headers.update({"Accept-Language": "en-US"}) # Cookie auth has been temporarily disabled, as it is not working properly with # YouTube's most recent changes. @@ -46,12 +41,10 @@ def __init__( if proxy_config.prevent_keeping_connections_alive: http_client.headers.update({"Connection": "close"}) if proxy_config.retries_when_blocked > 0: - retry_config = Retry( - total=proxy_config.retries_when_blocked, - status_forcelist=[429], + transport = AsyncHTTPTransport( + retries=proxy_config.retries_when_blocked ) - http_client.mount("http://", HTTPAdapter(max_retries=retry_config)) - http_client.mount("https://", HTTPAdapter(max_retries=retry_config)) + http_client._transport = transport self._fetcher = TranscriptListFetcher(http_client, proxy_config=proxy_config) def fetch( @@ -76,7 +69,7 @@ def fetch( return ( self.list(video_id) .find_transcript(languages) - .fetch(preserve_formatting=preserve_formatting) + .fetch_sync(preserve_formatting=preserve_formatting) ) def list( @@ -130,7 +123,7 @@ def list( :param video_id: the ID of the video you want to retrieve the transcript for. Make sure that this is the actual ID, NOT the full URL to the video! """ - return self._fetcher.fetch(video_id) + return self._fetcher.fetch_sync(video_id) class YoutubeTranscriptAsyncApi: @@ -150,10 +143,9 @@ def __init__( transport = AsyncHTTPTransport( retries=proxy_config.retries_when_blocked ) - async_client.mount("http://", transport) - async_client.mount("https://", transport) + async_client._transport = transport - self._fetcher = TranscriptListFetcherAsync( + self._fetcher = TranscriptListFetcher( async_client, proxy_config=proxy_config ) self._handler = AsyncTranscriptHandler(self._fetcher, proxy_config)