This commit is contained in:
0x∅ 2025-12-04 23:04:24 +05:30 committed by GitHub
commit 3f120f9fe8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,10 +1,8 @@
import hashlib
import itertools import itertools
import json import json
import re import re
from .common import InfoExtractor from .common import InfoExtractor
from ..networking.exceptions import HTTPError
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,
bug_reports_message, bug_reports_message,
@ -64,36 +62,33 @@ class InstagramBaseIE(InfoExtractor):
or int_or_none(self._html_search_meta( or int_or_none(self._html_search_meta(
(f'og:video:{name}', f'video:{name}'), webpage or '', default=None))) (f'og:video:{name}', f'video:{name}'), webpage or '', default=None)))
def _extract_nodes(self, nodes, is_direct=False): def _extract_nodes(self, nodes, **other):
for idx, node in enumerate(nodes, start=1): for idx, node in enumerate(nodes, start=1):
if node.get('__typename') != 'GraphVideo' and node.get('is_video') is not True: typename = node.get('__typename')
if typename not in ('XDTMediaDict', 'XDTGraphVideo'):
continue continue
video_id = node.get('shortcode') formats = []
if is_direct: video_url = node.get('video_url')
info = { if video_url:
'id': video_id or node['id'], formats.append({'url': video_url})
'url': node.get('video_url'), if node.get('video_versions'):
'width': self._get_dimension('width', node), media = self._extract_product_media(node)
'height': self._get_dimension('height', node), formats = (media or {}).get('formats', [])
'http_headers': { elif not formats:
'Referer': 'https://www.instagram.com/', self.raise_no_formats()
},
}
elif not video_id:
continue continue
else: else:
info = { dash = traverse_obj(node, ('dash_info', 'video_dash_manifest'))
'_type': 'url', if dash:
'ie_key': 'Instagram', mpd = self._parse_mpd_formats(self._parse_xml(dash, node.get('code')), mpd_id='dash')
'id': video_id, formats.extend(mpd)
'url': f'https://instagram.com/p/{video_id}',
}
yield { yield {
**info, 'id': node.get('code') or node.get('shortcode'),
'title': node.get('title') or (f'Video {idx}' if is_direct else None), 'formats': formats,
'title': node.get('title') or f'Video {idx}',
'description': traverse_obj( 'description': traverse_obj(
node, ('edge_media_to_caption', 'edges', 0, 'node', 'text'), expected_type=str), node, ('edge_media_to_caption', 'edges', 0, 'node', 'text'), expected_type=str),
'thumbnail': traverse_obj( 'thumbnail': traverse_obj(
@ -103,6 +98,7 @@ class InstagramBaseIE(InfoExtractor):
'view_count': int_or_none(node.get('video_view_count')), 'view_count': int_or_none(node.get('video_view_count')),
'comment_count': self._get_count(node, 'comments', 'preview_comment', 'to_comment', 'to_parent_comment'), 'comment_count': self._get_count(node, 'comments', 'preview_comment', 'to_comment', 'to_parent_comment'),
'like_count': self._get_count(node, 'likes', 'preview_like'), 'like_count': self._get_count(node, 'likes', 'preview_like'),
'user': other['other'] or None,
} }
def _extract_product_media(self, product_media): def _extract_product_media(self, product_media):
@ -469,7 +465,7 @@ class InstagramIE(InstagramBaseIE):
nodes = traverse_obj(media, ('edge_sidecar_to_children', 'edges', ..., 'node'), expected_type=dict) or [] nodes = traverse_obj(media, ('edge_sidecar_to_children', 'edges', ..., 'node'), expected_type=dict) or []
if nodes: if nodes:
return self.playlist_result( return self.playlist_result(
self._extract_nodes(nodes, True), video_id, self._extract_nodes(nodes), video_id,
format_field(username, None, 'Post by %s'), description) format_field(username, None, 'Post by %s'), description)
raise ExtractorError('There is no video in this post', expected=True) raise ExtractorError('There is no video in this post', expected=True)
@ -523,69 +519,73 @@ class InstagramIE(InstagramBaseIE):
class InstagramPlaylistBaseIE(InstagramBaseIE): class InstagramPlaylistBaseIE(InstagramBaseIE):
_gis_tmpl = None # used to cache GIS request type
def _parse_graphql(self, webpage, item_id): def _get_user_data(self, webpage):
# Reads a webpage and returns its GraphQL data. user_id = self._search_regex(
return self._parse_json( r'"user_id"\s*:\s*"([^"]+)"', webpage, 'user id',
self._search_regex( )
r'sharedData\s*=\s*({.+?})\s*;\s*[<\n]', webpage, 'data'), variables = {
item_id) 'enable_integrity_filters': True,
'id': user_id,
'render_surface': 'PROFILE',
'__relay_internal__pv__PolarisProjectCannesEnabledrelayprovider': True,
'__relay_internal__pv__PolarisProjectCannesLoggedInEnabledrelayprovider': True,
'__relay_internal__pv__PolarisCannesGuardianExperienceEnabledrelayprovider': True,
'__relay_internal__pv__PolarisCASB976ProfileEnabledrelayprovider': False,
'__relay_internal__pv__PolarisRepostsConsumptionEnabledrelayprovider': False,
}
data = self._download_json(f'https://www.instagram.com/graphql/query/?doc_id=25585291164389315&variables={json.dumps(variables)}', user_id, 'Download User data')
user_data = data['data']['user']
if not data:
return None
return {
'user_id': user_data['pk'],
'is_private': user_data['is_private'],
'bio_links': user_data['bio_links'],
'username': user_data['username'],
'profile_pic_url': user_data['profile_pic_url'],
'hd_profile_pic_url': user_data['hd_profile_pic_url_info']['url'],
'biography': user_data['biography'],
'full_name': user_data['full_name'],
'is_verified': user_data['is_verified'],
'follower_count': user_data['follower_count'],
'following_count': user_data['following_count'],
'media_count': user_data['media_count'],
}
def _extract_graphql(self, data, url): def _extract_graphql(self, url, **data):
# Parses GraphQL queries containing videos and generates a playlist. # Parses GraphQL queries containing videos and generates a playlist.
uploader_id = self._match_id(url) uploader_id = self._match_id(url)
csrf_token = data['config']['csrf_token']
rhx_gis = data.get('rhx_gis') or '3c7ca9dcefcf966d11dacf1f151335e8'
cursor = '' cursor = ''
for page_num in itertools.count(1): for page_num in itertools.count(1):
variables = {
'first': 12,
'after': cursor,
}
variables.update(self._query_vars_for(data))
variables = json.dumps(variables)
if self._gis_tmpl: try:
gis_tmpls = [self._gis_tmpl] variables = self._make_variables(uploader_id, cursor)
else:
gis_tmpls = [
f'{rhx_gis}',
'',
f'{rhx_gis}:{csrf_token}',
'{}:{}:{}'.format(rhx_gis, csrf_token, self.get_param('http_headers')['User-Agent']),
]
# try all of the ways to generate a GIS query, and not only use the json_data = self._download_json(
# first one that works, but cache it for future requests 'https://www.instagram.com/graphql/query', uploader_id,
for gis_tmpl in gis_tmpls: f'Downloading JSON page {page_num}', headers={
try: **self._api_headers,
json_data = self._download_json( 'X-Requested-With': 'XMLHttpRequest',
'https://www.instagram.com/graphql/query/', uploader_id, 'Accept': 'application/json',
f'Downloading JSON page {page_num}', headers={ 'Referer': 'https://www.instagram.com/',
'X-Requested-With': 'XMLHttpRequest', 'Origin': 'https://www.instagram.com',
'X-Instagram-GIS': hashlib.md5( }, query={
(f'{gis_tmpl}:{variables}').encode()).hexdigest(), 'doc_id': self._DOC_ID,
}, query={ 'variables': variables,
'query_hash': self._QUERY_HASH, })
'variables': variables,
}) media = self._parse_timeline_from(json_data)
media = self._parse_timeline_from(json_data) except ExtractorError:
self._gis_tmpl = gis_tmpl self.raise_login_required(
break 'This content is only available for registered users who follow this account')
except ExtractorError as e: raise
# if it's an error caused by a bad query, and there are
# more GIS templates to try, ignore it and keep trying
if isinstance(e.cause, HTTPError) and e.cause.status == 403:
if gis_tmpl != gis_tmpls[-1]:
continue
raise
nodes = traverse_obj(media, ('edges', ..., 'node'), expected_type=dict) or [] nodes = traverse_obj(media, ('edges', ..., 'node'), expected_type=dict) or []
if not nodes: if not nodes:
break break
yield from self._extract_nodes(nodes) yield from self._extract_nodes(nodes, other=data['data'])
has_next_page = traverse_obj(media, ('page_info', 'has_next_page')) has_next_page = traverse_obj(media, ('page_info', 'has_next_page'))
cursor = traverse_obj(media, ('page_info', 'end_cursor'), expected_type=str) cursor = traverse_obj(media, ('page_info', 'end_cursor'), expected_type=str)
@ -595,16 +595,16 @@ class InstagramPlaylistBaseIE(InstagramBaseIE):
def _real_extract(self, url): def _real_extract(self, url):
user_or_tag = self._match_id(url) user_or_tag = self._match_id(url)
webpage = self._download_webpage(url, user_or_tag) webpage = self._download_webpage(url, user_or_tag)
data = self._parse_graphql(webpage, user_or_tag)
self._set_cookie('instagram.com', 'ig_pr', '1') self._set_cookie('instagram.com', 'ig_pr', '1')
return self.playlist_result( return self.playlist_result(
self._extract_graphql(data, url), user_or_tag, user_or_tag) self._extract_graphql(url, data=self._get_user_data(webpage)),
playlist_id=user_or_tag,
title=user_or_tag,
)
class InstagramUserIE(InstagramPlaylistBaseIE): class InstagramUserIE(InstagramPlaylistBaseIE):
_WORKING = False _WORKING = True
_VALID_URL = r'https?://(?:www\.)?instagram\.com/(?P<id>[^/]{2,})/?(?:$|[?#])' _VALID_URL = r'https?://(?:www\.)?instagram\.com/(?P<id>[^/]{2,})/?(?:$|[?#])'
IE_DESC = 'Instagram user profile' IE_DESC = 'Instagram user profile'
IE_NAME = 'instagram:user' IE_NAME = 'instagram:user'
@ -622,20 +622,27 @@ class InstagramUserIE(InstagramPlaylistBaseIE):
}, },
}] }]
_QUERY_HASH = ('42323d64886122307be10013ad2dcc44',) _DOC_ID = 32787567760834226
@staticmethod
def _make_variables(username, cursor):
return json.dumps({
'after': cursor,
'data': {
'count': 12,
'include_reel_media_seen_timestamp': True,
'include_relationship_info': True,
'latest_besties_reel_media': True,
'latest_reel_media': True,
},
'username': username,
'__relay_internal__pv__PolarisIsLoggedInrelayprovider': True,
}, separators=(',', ':'))
@staticmethod @staticmethod
def _parse_timeline_from(data): def _parse_timeline_from(data):
# extracts the media timeline data from a GraphQL result # extracts the media timeline data from a GraphQL result
return data['data']['user']['edge_owner_to_timeline_media'] return data['data']['xdt_api__v1__feed__user_timeline_graphql_connection']
@staticmethod
def _query_vars_for(data):
# returns a dictionary of variables to add to the timeline query based
# on the GraphQL of the original page
return {
'id': data['entry_data']['ProfilePage'][0]['graphql']['user']['id'],
}
class InstagramTagIE(InstagramPlaylistBaseIE): class InstagramTagIE(InstagramPlaylistBaseIE):
@ -656,22 +663,21 @@ class InstagramTagIE(InstagramPlaylistBaseIE):
}, },
}] }]
_QUERY_HASH = ('f92f56d47dc7a55b606908374b43a314',) _DOC_ID = 17875800862117404
@staticmethod
def _make_variables(tag_name, cursor):
return json.dumps({
'next': cursor,
'first': 12,
'tag_name': tag_name,
})
@staticmethod @staticmethod
def _parse_timeline_from(data): def _parse_timeline_from(data):
# extracts the media timeline data from a GraphQL result # extracts the media timeline data from a GraphQL result
return data['data']['hashtag']['edge_hashtag_to_media'] return data['data']['hashtag']['edge_hashtag_to_media']
@staticmethod
def _query_vars_for(data):
# returns a dictionary of variables to add to the timeline query based
# on the GraphQL of the original page
return {
'tag_name':
data['entry_data']['TagPage'][0]['graphql']['hashtag']['name'],
}
class InstagramStoryIE(InstagramBaseIE): class InstagramStoryIE(InstagramBaseIE):
_VALID_URL = r'https?://(?:www\.)?instagram\.com/stories/(?P<user>[^/?#]+)(?:/(?P<id>\d+))?' _VALID_URL = r'https?://(?:www\.)?instagram\.com/stories/(?P<user>[^/?#]+)(?:/(?P<id>\d+))?'