diff --git a/yt_dlp/extractor/tiktok.py b/yt_dlp/extractor/tiktok.py index 02ec2b2f45..a04e90800e 100644 --- a/yt_dlp/extractor/tiktok.py +++ b/yt_dlp/extractor/tiktok.py @@ -1,3 +1,4 @@ +import contextlib import functools import itertools import json @@ -37,6 +38,7 @@ from ..utils import ( class TikTokBaseIE(InfoExtractor): _UPLOADER_URL_FORMAT = 'https://www.tiktok.com/@%s' _WEBPAGE_HOST = 'https://www.tiktok.com/' + _OEMBED_API = 'https://www.tiktok.com/oembed' QUALITIES = ('360p', '540p', '720p', '1080p') _APP_INFO_DEFAULTS = { @@ -47,16 +49,17 @@ class TikTokBaseIE(InfoExtractor): 'app_version': '35.1.3', 'manifest_app_version': '2023501030', # "app id": aweme = 1128, trill = 1180, musical_ly = 1233, universal = 0 - 'aid': '0', + 'aid': '1233', } _APP_INFO_POOL = None _APP_INFO = None _APP_USER_AGENT = None + _cookies_initialized = False @functools.cached_property def _KNOWN_APP_INFO(self): # If we have a genuine device ID, we may not need any IID - default = [''] if self._KNOWN_DEVICE_ID else [] + default = [''] return self._configuration_arg('app_info', default, ie_key=TikTokIE) @functools.cached_property @@ -68,9 +71,22 @@ class TikTokBaseIE(InfoExtractor): return self._KNOWN_DEVICE_ID or str(random.randint(7250000000000000000, 7325099899999994577)) @functools.cached_property - def _API_HOSTNAME(self): + def _IID(self): + return str(random.randint(10 ** 18, 10 ** 19 - 1)) + + @functools.cached_property + def _API_HOSTNAMES(self): return self._configuration_arg( - 'api_hostname', ['api16-normal-c-useast1a.tiktokv.com'], ie_key=TikTokIE)[0] + 'api_hostname', [ + 'api16-normal-c-useast1a.tiktokv.com', + 'api22-normal-c-useast1a.tiktokv.com', + 'api19-normal-c-useast1a.tiktokv.com', + 'api-h2.tiktokv.com', + ], ie_key=TikTokIE) + + @functools.cached_property + def _API_HOSTNAME(self): + return self._API_HOSTNAMES[0] def _get_next_app_info(self): if self._APP_INFO_POOL is None: @@ -89,6 +105,7 @@ class TikTokBaseIE(InfoExtractor): return False self._APP_INFO = self._APP_INFO_POOL.pop(0) + self._APP_INFO.setdefault('iid', self._IID) app_name = self._APP_INFO['app_name'] version = self._APP_INFO['manifest_app_version'] @@ -115,14 +132,55 @@ class TikTokBaseIE(InfoExtractor): 'universal data', display_id, end_pattern=r'', default={}), ('__DEFAULT_SCOPE__', {dict})) or {} + def _initialize_cookies(self, video_id): + """Pre-initialize cookies by making a request to establish a session.""" + if self._cookies_initialized: + return + + # Make a lightweight request to get session cookies + with contextlib.suppress(Exception): + self._request_webpage( + 'https://www.tiktok.com/', video_id, + note='Initializing session', errnote=False, + headers={'Accept': 'text/html'}, fatal=False) + self._cookies_initialized = True + + def _get_oembed_data(self, url, video_id): + """Fetch video metadata from TikTok's oEmbed API.""" + try: + return self._download_json( + self._OEMBED_API, video_id, + note='Downloading oEmbed data', + errnote='Unable to download oEmbed data', + query={'url': url}, fatal=False) + except ExtractorError: + return None + + def _is_blocked_response(self, webpage, urlh=None): + """Check if the response indicates a blocked/error page from TikTok.""" + if not webpage: + return True + if len(webpage) < 1000: + return True + if 'x-tt-system-error' in webpage.lower() or '__NEXT_DATA__' not in webpage: + if not any(marker in webpage for marker in [ + '__UNIVERSAL_DATA_FOR_REHYDRATION__', + 'SIGI_STATE', + 'sigi-persisted-data', + '__NEXT_DATA__', + ]): + return True + return False + def _call_api_impl(self, ep, video_id, query=None, data=None, headers=None, fatal=True, - note='Downloading API JSON', errnote='Unable to download API page'): - self._set_cookie(self._API_HOSTNAME, 'odin_tt', ''.join(random.choices('0123456789abcdef', k=160))) + note='Downloading API JSON', errnote='Unable to download API page', api_hostname=None): + api_hostname = api_hostname or self._API_HOSTNAME + self._set_cookie(api_hostname, 'odin_tt', ''.join(random.choices('0123456789abcdef', k=160))) webpage_cookies = self._get_cookies(self._WEBPAGE_HOST) if webpage_cookies.get('sid_tt'): - self._set_cookie(self._API_HOSTNAME, 'sid_tt', webpage_cookies['sid_tt'].value) + self._set_cookie(api_hostname, 'sid_tt', webpage_cookies['sid_tt'].value) return self._download_json( - f'https://{self._API_HOSTNAME}/aweme/v1/{ep}/', video_id=video_id, + f'https://{api_hostname}/aweme/v1/{ep}/', video_id=video_id, fatal=fatal, note=note, errnote=errnote, headers={ 'User-Agent': self._APP_USER_AGENT, 'Accept': 'application/json', @@ -171,7 +229,7 @@ class TikTokBaseIE(InfoExtractor): 'build_number': self._APP_INFO['app_version'], 'region': 'US', 'ts': int(time.time()), - 'iid': self._APP_INFO.get('iid'), + 'iid': self._APP_INFO.get('iid') or self._IID, 'device_id': self._DEVICE_ID, 'openudid': ''.join(random.choices('0123456789abcdef', k=16)), }) @@ -186,14 +244,18 @@ class TikTokBaseIE(InfoExtractor): self.report_warning(message) return + api_hostnames = self._API_HOSTNAMES or [self._API_HOSTNAME] + max_tries = len(self._APP_INFO_POOL) + 1 # _APP_INFO_POOL + _APP_INFO for count in itertools.count(1): + api_hostname = api_hostnames[(count - 1) % len(api_hostnames)] + self.write_debug(f'Using API hostname: {api_hostname}') self.write_debug(str(self._APP_INFO)) real_query = self._build_api_query(query or {}) try: return self._call_api_impl( ep, video_id, query=real_query, data=data, headers=headers, - fatal=fatal, note=note, errnote=errnote) + fatal=fatal, note=note, errnote=errnote, api_hostname=api_hostname) except ExtractorError as e: if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0: message = str(e.cause or e.msg) @@ -220,34 +282,69 @@ class TikTokBaseIE(InfoExtractor): def _extract_web_data_and_status(self, url, video_id, fatal=True): video_data, status = {}, -1 - res = self._download_webpage_handle(url, video_id, fatal=fatal, impersonate=True) - if res is False: - return video_data, status + # Initialize cookies first for better success rate + self._initialize_cookies(video_id) - webpage, urlh = res - if urllib.parse.urlparse(urlh.url).path == '/login': - message = 'TikTok is requiring login for access to this content' - if fatal: - self.raise_login_required(message) - self.report_warning(f'{message}. {self._login_hint()}') - return video_data, status + # First try with impersonation, then fall back to other methods if no joy + max_retries = 3 + for attempt in range(max_retries): + res = self._download_webpage_handle( + url, video_id, fatal=False, impersonate=True, + note=f'Downloading webpage{f" (attempt {attempt + 1})" if attempt else ""}') + if res is False: + if attempt < max_retries - 1: + self.write_debug(f'Webpage download failed, retrying ({attempt + 1}/{max_retries})') + time.sleep(1 + random.random()) + continue + break - if universal_data := self._get_universal_data(webpage, video_id): - self.write_debug('Found universal data for rehydration') - status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0 - video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict})) + webpage, urlh = res - elif sigi_data := self._get_sigi_state(webpage, video_id): - self.write_debug('Found sigi state data') - status = traverse_obj(sigi_data, ('VideoPage', 'statusCode', {int})) or 0 - video_data = traverse_obj(sigi_data, ('ItemModule', video_id, {dict})) + if urllib.parse.urlparse(urlh.url).path == '/login': + message = 'TikTok is requiring login for access to this content' + if fatal: + self.raise_login_required(message) + self.report_warning(f'{message}. {self._login_hint()}') + return video_data, status - elif next_data := self._search_nextjs_data(webpage, video_id, default={}): - self.write_debug('Found next.js data') - status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode', {int})) or 0 - video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct', {dict})) + if self._is_blocked_response(webpage, urlh): + self.write_debug(f'Received blocked/minimal response (attempt {attempt + 1})') + if attempt < max_retries - 1: + time.sleep(1.5 + random.random()) + continue + self.write_debug('All attempts returned blocked responses, trying to parse anyway') - elif fatal: + if universal_data := self._get_universal_data(webpage, video_id): + self.write_debug('Found universal data for rehydration') + status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0 + video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict})) + if video_data: + break + + if sigi_data := self._get_sigi_state(webpage, video_id): + self.write_debug('Found sigi state data') + status = traverse_obj(sigi_data, ('VideoPage', 'statusCode', {int})) or 0 + video_data = traverse_obj(sigi_data, ('ItemModule', video_id, {dict})) + if video_data: + break + + if next_data := self._search_nextjs_data(webpage, video_id, default={}): + self.write_debug('Found next.js data') + status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode', {int})) or 0 + video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct', {dict})) + if video_data: + break + + if attempt < max_retries - 1: + self.write_debug('No video data found in response, retrying') + time.sleep(1 + random.random()) + continue + + # If still no data, try the embed page as a last resort + if not video_data: + video_data, status = self._try_extract_from_embed(url, video_id) + + if not video_data and fatal: raise ExtractorError('Unable to extract webpage video data') if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})): @@ -258,6 +355,39 @@ class TikTokBaseIE(InfoExtractor): return video_data, status + def _try_extract_from_embed(self, url, video_id): + """Try to extract video data from the embed page.""" + video_data, status = {}, -1 + + try: + embed_url = f'https://www.tiktok.com/embed/v2/{video_id}' + embed_page = self._download_webpage( + embed_url, video_id, note='Downloading embed page', + errnote='Unable to download embed page', fatal=False) + + if embed_page: + if frontity_data := self._search_json( + r']+\bid=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>', + embed_page, 'frontity data', video_id, default={}): + self.write_debug('Found frontity data in embed page') + video_data = traverse_obj(frontity_data, ( + 'source', 'data', ..., 'itemInfo', 'itemStruct', {dict}), get_all=False) + if video_data: + status = 0 + + if not video_data: + if embed_data := self._search_json( + r']+\bdata-testid=[\'"]__UNIVERSAL_DATA__[\'"][^>]*>', + embed_page, 'embed data', video_id, default={}, end_pattern=r''): + video_data = traverse_obj(embed_data, ('itemInfo', 'itemStruct', {dict})) + if video_data: + status = 0 + + except Exception as e: + self.write_debug(f'Embed extraction failed: {e}') + + return video_data, status + def _get_subtitles(self, aweme_detail, aweme_id, user_name): # TODO: Extract text positioning info @@ -910,7 +1040,7 @@ class TikTokIE(TikTokBaseIE): self.report_warning(f'{e}; trying with webpage') url = self._create_url(user_id, video_id) - video_data, status = self._extract_web_data_and_status(url, video_id) + video_data, status = self._extract_web_data_and_status(url, video_id, fatal=False) if video_data and status == 0: return self._parse_aweme_video_web(video_data, url, video_id) @@ -920,8 +1050,179 @@ class TikTokIE(TikTokBaseIE): 'You do not have permission to view this post. Log into an account that has access') elif status == 10204: raise ExtractorError('Your IP address is blocked from accessing this post', expected=True) + + self.write_debug('Trying oEmbed API fallback') + oembed_data = self._get_oembed_data(url, video_id) + + if oembed_data: + self.write_debug('Got oEmbed data, attempting video extraction') + result = self._extract_from_oembed(oembed_data, url, video_id) + if result and result.get('formats'): + return result + if result: + self.report_warning('Could not extract video formats, but metadata was retrieved') + result['formats'] = [] + return result + + if status == -1: + raise ExtractorError( + 'Unable to extract video data. TikTok may be blocking automated access. ' + 'Try using --cookies-from-browser to pass your browser cookies.', expected=True) raise ExtractorError(f'Video not available, status code {status}', video_id=video_id) + def _extract_from_oembed(self, oembed_data, url, video_id): + """Extract video info from oEmbed data.""" + if not oembed_data: + return None + + thumbnail_url = oembed_data.get('thumbnail_url') + formats = [] + + if thumbnail_url: + video_patterns = self._try_video_urls_from_thumbnail(thumbnail_url, video_id) + for video_url in video_patterns: + formats.append({ + 'url': video_url, + 'ext': 'mp4', + 'format_id': 'oembed', + 'format_note': 'From oEmbed thumbnail pattern', + }) + + if not formats: + embed_formats = self._try_extract_formats_from_embed(video_id) + formats.extend(embed_formats) + + author_url = oembed_data.get('author_url', '') + uploader = None + if author_url: + uploader_match = re.search(r'@([\w.-]+)', author_url) + if uploader_match: + uploader = uploader_match.group(1) + + return { + 'id': video_id, + 'title': oembed_data.get('title') or f'TikTok video #{video_id}', + 'description': oembed_data.get('title'), + 'uploader': uploader or oembed_data.get('author_name'), + 'uploader_url': oembed_data.get('author_url'), + 'thumbnail': thumbnail_url, + 'thumbnails': [{'url': thumbnail_url}] if thumbnail_url else None, + 'formats': formats, + 'http_headers': {'Referer': url}, + } + + def _try_video_urls_from_thumbnail(self, thumbnail_url, video_id): + """Try to derive video URLs from thumbnail URL patterns.""" + return [] # Don't generate potentially broken URLs + + def _try_extract_formats_from_embed(self, video_id): + """Try to extract video formats from the embed page.""" + formats = [] + try: + embed_url = f'https://www.tiktok.com/embed/v2/{video_id}' + embed_page = self._download_webpage( + embed_url, video_id, note='Downloading embed page for formats', + errnote=False, fatal=False) + + if not embed_page: + return formats + + frontity_data = self._search_json( + r']+\bid=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>', + embed_page, 'frontity data', video_id, default={}, end_pattern=r'') + + if frontity_data: + item_struct = traverse_obj(frontity_data, ( + 'source', 'data', ..., 'itemInfo', 'itemStruct', {dict}), get_all=False) + + if item_struct: + formats = self._extract_web_formats(item_struct) + if formats: + return formats + + # Try to extract URLs manually from the video structure (fallback) + video_info = traverse_obj(item_struct, ('video', {dict})) or {} + play_width = int_or_none(video_info.get('width')) + play_height = int_or_none(video_info.get('height')) + + for play_url in traverse_obj(video_info, ('playAddr', ((..., 'src'), None), {url_or_none})): + formats.append({ + 'url': self._proto_relative_url(play_url), + 'ext': 'mp4', + 'format_id': 'play', + 'format_note': 'From embed page', + 'vcodec': 'h264', + 'acodec': 'aac', + 'width': play_width, + 'height': play_height, + }) + + for dl_url in traverse_obj(video_info, (('downloadAddr', ('download', 'url')), {url_or_none})): + formats.append({ + 'url': self._proto_relative_url(dl_url), + 'ext': 'mp4', + 'format_id': 'download', + 'format_note': 'From embed page, watermarked', + 'vcodec': 'h264', + 'acodec': 'aac', + 'preference': -2, + }) + + if not formats: + video_url_candidates = [] + video_url_patterns = [ + # TikTok CDN video URLs (exclude audio patterns) + r'(https?://v\d+[a-z]?\.tiktokcdn\.com/[^"\'<>\s\\]+)', + r'(https?://v\d+[a-z]?-[a-z]+\.tiktokcdn\.com/[^"\'<>\s\\]+)', + r'(https?://v\d+m\.tiktokcdn\.com/[^"\'<>\s\\]+)', + # Escaped JSON URLs in embed page + r'"(?:playAddr|src)"["\']?\s*:\s*"(https?:[^"]+)"', + ] + for pattern in video_url_patterns: + for video_url in re.findall(pattern, embed_page, re.IGNORECASE): + video_url = video_url.replace('\\u002F', '/').replace('\\/', '/').replace('\\u0026', '&') + + if any(audio_marker in video_url for audio_marker in ( + 'audio_mpeg', 'mime_type=audio', '/music/', '-music-', + )): + continue + if video_url and ('tiktokcdn' in video_url or 'bytedance' in video_url): + if '/video/' in video_url or 'mime_type=video' in video_url or 'video_mp4' in video_url: + # Prioritize by app ID: a=1233/a=0 > no app ID > a=1180 + app_id_match = re.search(r'[?&]a=(\d+)', video_url) + app_id = app_id_match.group(1) if app_id_match else None + if app_id in ('1233', '0'): + priority = 2 # Preferred + elif app_id == '1180': + priority = 0 # Lower priority (trill app) + else: + priority = 1 # Neutral + video_url_candidates.append((video_url, priority)) + + # Sort by priority and theb by URL + seen_urls = set() + sorted_candidates = sorted(video_url_candidates, key=lambda x: (-x[1], x[0])) + for i, (video_url, priority) in enumerate(sorted_candidates): + if video_url in seen_urls: + continue + seen_urls.add(video_url) + formats.append({ + 'url': video_url, + 'ext': 'mp4', + 'format_id': f'embed_{i}', + 'format_note': 'From embed page (CDN)', + 'vcodec': 'h264', + 'acodec': 'aac', + 'preference': priority - 1, + }) + + self._remove_duplicate_formats(formats) + + except Exception as e: + self.write_debug(f'Embed format extraction failed: {e}') + + return formats + class TikTokUserIE(TikTokBaseIE): IE_NAME = 'tiktok:user'