This commit is contained in:
Bradley 2026-01-22 12:50:59 +08:00 committed by GitHub
commit cd4488c07d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,3 +1,4 @@
import contextlib
import functools
import itertools
import json
@ -37,6 +38,7 @@ from ..utils import (
class TikTokBaseIE(InfoExtractor):
_UPLOADER_URL_FORMAT = 'https://www.tiktok.com/@%s'
_WEBPAGE_HOST = 'https://www.tiktok.com/'
_OEMBED_API = 'https://www.tiktok.com/oembed'
QUALITIES = ('360p', '540p', '720p', '1080p')
_APP_INFO_DEFAULTS = {
@ -47,16 +49,17 @@ class TikTokBaseIE(InfoExtractor):
'app_version': '35.1.3',
'manifest_app_version': '2023501030',
# "app id": aweme = 1128, trill = 1180, musical_ly = 1233, universal = 0
'aid': '0',
'aid': '1233',
}
_APP_INFO_POOL = None
_APP_INFO = None
_APP_USER_AGENT = None
_cookies_initialized = False
@functools.cached_property
def _KNOWN_APP_INFO(self):
# If we have a genuine device ID, we may not need any IID
default = [''] if self._KNOWN_DEVICE_ID else []
default = ['']
return self._configuration_arg('app_info', default, ie_key=TikTokIE)
@functools.cached_property
@ -68,9 +71,22 @@ class TikTokBaseIE(InfoExtractor):
return self._KNOWN_DEVICE_ID or str(random.randint(7250000000000000000, 7325099899999994577))
@functools.cached_property
def _API_HOSTNAME(self):
def _IID(self):
return str(random.randint(10 ** 18, 10 ** 19 - 1))
@functools.cached_property
def _API_HOSTNAMES(self):
return self._configuration_arg(
'api_hostname', ['api16-normal-c-useast1a.tiktokv.com'], ie_key=TikTokIE)[0]
'api_hostname', [
'api16-normal-c-useast1a.tiktokv.com',
'api22-normal-c-useast1a.tiktokv.com',
'api19-normal-c-useast1a.tiktokv.com',
'api-h2.tiktokv.com',
], ie_key=TikTokIE)
@functools.cached_property
def _API_HOSTNAME(self):
return self._API_HOSTNAMES[0]
def _get_next_app_info(self):
if self._APP_INFO_POOL is None:
@ -89,6 +105,7 @@ class TikTokBaseIE(InfoExtractor):
return False
self._APP_INFO = self._APP_INFO_POOL.pop(0)
self._APP_INFO.setdefault('iid', self._IID)
app_name = self._APP_INFO['app_name']
version = self._APP_INFO['manifest_app_version']
@ -115,14 +132,55 @@ class TikTokBaseIE(InfoExtractor):
'universal data', display_id, end_pattern=r'</script>', default={}),
('__DEFAULT_SCOPE__', {dict})) or {}
def _initialize_cookies(self, video_id):
"""Pre-initialize cookies by making a request to establish a session."""
if self._cookies_initialized:
return
# Make a lightweight request to get session cookies
with contextlib.suppress(Exception):
self._request_webpage(
'https://www.tiktok.com/', video_id,
note='Initializing session', errnote=False,
headers={'Accept': 'text/html'}, fatal=False)
self._cookies_initialized = True
def _get_oembed_data(self, url, video_id):
"""Fetch video metadata from TikTok's oEmbed API."""
try:
return self._download_json(
self._OEMBED_API, video_id,
note='Downloading oEmbed data',
errnote='Unable to download oEmbed data',
query={'url': url}, fatal=False)
except ExtractorError:
return None
def _is_blocked_response(self, webpage, urlh=None):
"""Check if the response indicates a blocked/error page from TikTok."""
if not webpage:
return True
if len(webpage) < 1000:
return True
if 'x-tt-system-error' in webpage.lower() or '__NEXT_DATA__' not in webpage:
if not any(marker in webpage for marker in [
'__UNIVERSAL_DATA_FOR_REHYDRATION__',
'SIGI_STATE',
'sigi-persisted-data',
'__NEXT_DATA__',
]):
return True
return False
def _call_api_impl(self, ep, video_id, query=None, data=None, headers=None, fatal=True,
note='Downloading API JSON', errnote='Unable to download API page'):
self._set_cookie(self._API_HOSTNAME, 'odin_tt', ''.join(random.choices('0123456789abcdef', k=160)))
note='Downloading API JSON', errnote='Unable to download API page', api_hostname=None):
api_hostname = api_hostname or self._API_HOSTNAME
self._set_cookie(api_hostname, 'odin_tt', ''.join(random.choices('0123456789abcdef', k=160)))
webpage_cookies = self._get_cookies(self._WEBPAGE_HOST)
if webpage_cookies.get('sid_tt'):
self._set_cookie(self._API_HOSTNAME, 'sid_tt', webpage_cookies['sid_tt'].value)
self._set_cookie(api_hostname, 'sid_tt', webpage_cookies['sid_tt'].value)
return self._download_json(
f'https://{self._API_HOSTNAME}/aweme/v1/{ep}/', video_id=video_id,
f'https://{api_hostname}/aweme/v1/{ep}/', video_id=video_id,
fatal=fatal, note=note, errnote=errnote, headers={
'User-Agent': self._APP_USER_AGENT,
'Accept': 'application/json',
@ -171,7 +229,7 @@ class TikTokBaseIE(InfoExtractor):
'build_number': self._APP_INFO['app_version'],
'region': 'US',
'ts': int(time.time()),
'iid': self._APP_INFO.get('iid'),
'iid': self._APP_INFO.get('iid') or self._IID,
'device_id': self._DEVICE_ID,
'openudid': ''.join(random.choices('0123456789abcdef', k=16)),
})
@ -186,14 +244,18 @@ class TikTokBaseIE(InfoExtractor):
self.report_warning(message)
return
api_hostnames = self._API_HOSTNAMES or [self._API_HOSTNAME]
max_tries = len(self._APP_INFO_POOL) + 1 # _APP_INFO_POOL + _APP_INFO
for count in itertools.count(1):
api_hostname = api_hostnames[(count - 1) % len(api_hostnames)]
self.write_debug(f'Using API hostname: {api_hostname}')
self.write_debug(str(self._APP_INFO))
real_query = self._build_api_query(query or {})
try:
return self._call_api_impl(
ep, video_id, query=real_query, data=data, headers=headers,
fatal=fatal, note=note, errnote=errnote)
fatal=fatal, note=note, errnote=errnote, api_hostname=api_hostname)
except ExtractorError as e:
if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
message = str(e.cause or e.msg)
@ -220,34 +282,69 @@ class TikTokBaseIE(InfoExtractor):
def _extract_web_data_and_status(self, url, video_id, fatal=True):
video_data, status = {}, -1
res = self._download_webpage_handle(url, video_id, fatal=fatal, impersonate=True)
if res is False:
return video_data, status
# Initialize cookies first for better success rate
self._initialize_cookies(video_id)
webpage, urlh = res
if urllib.parse.urlparse(urlh.url).path == '/login':
message = 'TikTok is requiring login for access to this content'
if fatal:
self.raise_login_required(message)
self.report_warning(f'{message}. {self._login_hint()}')
return video_data, status
# First try with impersonation, then fall back to other methods if no joy
max_retries = 3
for attempt in range(max_retries):
res = self._download_webpage_handle(
url, video_id, fatal=False, impersonate=True,
note=f'Downloading webpage{f" (attempt {attempt + 1})" if attempt else ""}')
if res is False:
if attempt < max_retries - 1:
self.write_debug(f'Webpage download failed, retrying ({attempt + 1}/{max_retries})')
time.sleep(1 + random.random())
continue
break
if universal_data := self._get_universal_data(webpage, video_id):
self.write_debug('Found universal data for rehydration')
status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0
video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict}))
webpage, urlh = res
elif sigi_data := self._get_sigi_state(webpage, video_id):
self.write_debug('Found sigi state data')
status = traverse_obj(sigi_data, ('VideoPage', 'statusCode', {int})) or 0
video_data = traverse_obj(sigi_data, ('ItemModule', video_id, {dict}))
if urllib.parse.urlparse(urlh.url).path == '/login':
message = 'TikTok is requiring login for access to this content'
if fatal:
self.raise_login_required(message)
self.report_warning(f'{message}. {self._login_hint()}')
return video_data, status
elif next_data := self._search_nextjs_data(webpage, video_id, default={}):
self.write_debug('Found next.js data')
status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode', {int})) or 0
video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct', {dict}))
if self._is_blocked_response(webpage, urlh):
self.write_debug(f'Received blocked/minimal response (attempt {attempt + 1})')
if attempt < max_retries - 1:
time.sleep(1.5 + random.random())
continue
self.write_debug('All attempts returned blocked responses, trying to parse anyway')
elif fatal:
if universal_data := self._get_universal_data(webpage, video_id):
self.write_debug('Found universal data for rehydration')
status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0
video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict}))
if video_data:
break
if sigi_data := self._get_sigi_state(webpage, video_id):
self.write_debug('Found sigi state data')
status = traverse_obj(sigi_data, ('VideoPage', 'statusCode', {int})) or 0
video_data = traverse_obj(sigi_data, ('ItemModule', video_id, {dict}))
if video_data:
break
if next_data := self._search_nextjs_data(webpage, video_id, default={}):
self.write_debug('Found next.js data')
status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode', {int})) or 0
video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct', {dict}))
if video_data:
break
if attempt < max_retries - 1:
self.write_debug('No video data found in response, retrying')
time.sleep(1 + random.random())
continue
# If still no data, try the embed page as a last resort
if not video_data:
video_data, status = self._try_extract_from_embed(url, video_id)
if not video_data and fatal:
raise ExtractorError('Unable to extract webpage video data')
if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})):
@ -258,6 +355,39 @@ class TikTokBaseIE(InfoExtractor):
return video_data, status
def _try_extract_from_embed(self, url, video_id):
"""Try to extract video data from the embed page."""
video_data, status = {}, -1
try:
embed_url = f'https://www.tiktok.com/embed/v2/{video_id}'
embed_page = self._download_webpage(
embed_url, video_id, note='Downloading embed page',
errnote='Unable to download embed page', fatal=False)
if embed_page:
if frontity_data := self._search_json(
r'<script[^>]+\bid=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>',
embed_page, 'frontity data', video_id, default={}):
self.write_debug('Found frontity data in embed page')
video_data = traverse_obj(frontity_data, (
'source', 'data', ..., 'itemInfo', 'itemStruct', {dict}), get_all=False)
if video_data:
status = 0
if not video_data:
if embed_data := self._search_json(
r'<script[^>]+\bdata-testid=[\'"]__UNIVERSAL_DATA__[\'"][^>]*>',
embed_page, 'embed data', video_id, default={}, end_pattern=r'</script>'):
video_data = traverse_obj(embed_data, ('itemInfo', 'itemStruct', {dict}))
if video_data:
status = 0
except Exception as e:
self.write_debug(f'Embed extraction failed: {e}')
return video_data, status
def _get_subtitles(self, aweme_detail, aweme_id, user_name):
# TODO: Extract text positioning info
@ -910,7 +1040,7 @@ class TikTokIE(TikTokBaseIE):
self.report_warning(f'{e}; trying with webpage')
url = self._create_url(user_id, video_id)
video_data, status = self._extract_web_data_and_status(url, video_id)
video_data, status = self._extract_web_data_and_status(url, video_id, fatal=False)
if video_data and status == 0:
return self._parse_aweme_video_web(video_data, url, video_id)
@ -920,8 +1050,179 @@ class TikTokIE(TikTokBaseIE):
'You do not have permission to view this post. Log into an account that has access')
elif status == 10204:
raise ExtractorError('Your IP address is blocked from accessing this post', expected=True)
self.write_debug('Trying oEmbed API fallback')
oembed_data = self._get_oembed_data(url, video_id)
if oembed_data:
self.write_debug('Got oEmbed data, attempting video extraction')
result = self._extract_from_oembed(oembed_data, url, video_id)
if result and result.get('formats'):
return result
if result:
self.report_warning('Could not extract video formats, but metadata was retrieved')
result['formats'] = []
return result
if status == -1:
raise ExtractorError(
'Unable to extract video data. TikTok may be blocking automated access. '
'Try using --cookies-from-browser to pass your browser cookies.', expected=True)
raise ExtractorError(f'Video not available, status code {status}', video_id=video_id)
def _extract_from_oembed(self, oembed_data, url, video_id):
"""Extract video info from oEmbed data."""
if not oembed_data:
return None
thumbnail_url = oembed_data.get('thumbnail_url')
formats = []
if thumbnail_url:
video_patterns = self._try_video_urls_from_thumbnail(thumbnail_url, video_id)
for video_url in video_patterns:
formats.append({
'url': video_url,
'ext': 'mp4',
'format_id': 'oembed',
'format_note': 'From oEmbed thumbnail pattern',
})
if not formats:
embed_formats = self._try_extract_formats_from_embed(video_id)
formats.extend(embed_formats)
author_url = oembed_data.get('author_url', '')
uploader = None
if author_url:
uploader_match = re.search(r'@([\w.-]+)', author_url)
if uploader_match:
uploader = uploader_match.group(1)
return {
'id': video_id,
'title': oembed_data.get('title') or f'TikTok video #{video_id}',
'description': oembed_data.get('title'),
'uploader': uploader or oembed_data.get('author_name'),
'uploader_url': oembed_data.get('author_url'),
'thumbnail': thumbnail_url,
'thumbnails': [{'url': thumbnail_url}] if thumbnail_url else None,
'formats': formats,
'http_headers': {'Referer': url},
}
def _try_video_urls_from_thumbnail(self, thumbnail_url, video_id):
"""Try to derive video URLs from thumbnail URL patterns."""
return [] # Don't generate potentially broken URLs
def _try_extract_formats_from_embed(self, video_id):
"""Try to extract video formats from the embed page."""
formats = []
try:
embed_url = f'https://www.tiktok.com/embed/v2/{video_id}'
embed_page = self._download_webpage(
embed_url, video_id, note='Downloading embed page for formats',
errnote=False, fatal=False)
if not embed_page:
return formats
frontity_data = self._search_json(
r'<script[^>]+\bid=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>',
embed_page, 'frontity data', video_id, default={}, end_pattern=r'</script>')
if frontity_data:
item_struct = traverse_obj(frontity_data, (
'source', 'data', ..., 'itemInfo', 'itemStruct', {dict}), get_all=False)
if item_struct:
formats = self._extract_web_formats(item_struct)
if formats:
return formats
# Try to extract URLs manually from the video structure (fallback)
video_info = traverse_obj(item_struct, ('video', {dict})) or {}
play_width = int_or_none(video_info.get('width'))
play_height = int_or_none(video_info.get('height'))
for play_url in traverse_obj(video_info, ('playAddr', ((..., 'src'), None), {url_or_none})):
formats.append({
'url': self._proto_relative_url(play_url),
'ext': 'mp4',
'format_id': 'play',
'format_note': 'From embed page',
'vcodec': 'h264',
'acodec': 'aac',
'width': play_width,
'height': play_height,
})
for dl_url in traverse_obj(video_info, (('downloadAddr', ('download', 'url')), {url_or_none})):
formats.append({
'url': self._proto_relative_url(dl_url),
'ext': 'mp4',
'format_id': 'download',
'format_note': 'From embed page, watermarked',
'vcodec': 'h264',
'acodec': 'aac',
'preference': -2,
})
if not formats:
video_url_candidates = []
video_url_patterns = [
# TikTok CDN video URLs (exclude audio patterns)
r'(https?://v\d+[a-z]?\.tiktokcdn\.com/[^"\'<>\s\\]+)',
r'(https?://v\d+[a-z]?-[a-z]+\.tiktokcdn\.com/[^"\'<>\s\\]+)',
r'(https?://v\d+m\.tiktokcdn\.com/[^"\'<>\s\\]+)',
# Escaped JSON URLs in embed page
r'"(?:playAddr|src)"["\']?\s*:\s*"(https?:[^"]+)"',
]
for pattern in video_url_patterns:
for video_url in re.findall(pattern, embed_page, re.IGNORECASE):
video_url = video_url.replace('\\u002F', '/').replace('\\/', '/').replace('\\u0026', '&')
if any(audio_marker in video_url for audio_marker in (
'audio_mpeg', 'mime_type=audio', '/music/', '-music-',
)):
continue
if video_url and ('tiktokcdn' in video_url or 'bytedance' in video_url):
if '/video/' in video_url or 'mime_type=video' in video_url or 'video_mp4' in video_url:
# Prioritize by app ID: a=1233/a=0 > no app ID > a=1180
app_id_match = re.search(r'[?&]a=(\d+)', video_url)
app_id = app_id_match.group(1) if app_id_match else None
if app_id in ('1233', '0'):
priority = 2 # Preferred
elif app_id == '1180':
priority = 0 # Lower priority (trill app)
else:
priority = 1 # Neutral
video_url_candidates.append((video_url, priority))
# Sort by priority and theb by URL
seen_urls = set()
sorted_candidates = sorted(video_url_candidates, key=lambda x: (-x[1], x[0]))
for i, (video_url, priority) in enumerate(sorted_candidates):
if video_url in seen_urls:
continue
seen_urls.add(video_url)
formats.append({
'url': video_url,
'ext': 'mp4',
'format_id': f'embed_{i}',
'format_note': 'From embed page (CDN)',
'vcodec': 'h264',
'acodec': 'aac',
'preference': priority - 1,
})
self._remove_duplicate_formats(formats)
except Exception as e:
self.write_debug(f'Embed format extraction failed: {e}')
return formats
class TikTokUserIE(TikTokBaseIE):
IE_NAME = 'tiktok:user'