Compare commits

...

12 Commits

Author SHA1 Message Date
df
af9e72507e Implement n-param descrambling using JSInterp
Fixes #29326, closes #29790, closes #30004, closes #30024, closes #30052,
closes #30088, closes #30097, closes #30102, closes #30109, closes #30119,
closes #30125, closes #30128, closes #30162, closes #30173, closes #30186,
closes #30192, closes #30221, closes #30239, closes #30539, closes #30552.
2022-01-31 00:19:58 +00:00
dirkf
6ca7b77696 Refactor JSInterpreter._separate
yt-dlp/yt-dlp/@06dfe0a, improve _MATCHING_PARENS
2022-01-30 00:05:54 +00:00
dirkf
9d142109f4 Back-port test_youtube_signature.py from yt-dlp and fix JSInterp accordingly 2022-01-30 00:05:54 +00:00
dirkf
1ca673bd98 Fix splice to handle float
Needed for new youtube js player f1ca6900
Add 57dbe8077f (diff-729b57caa8d006426f6a8960c061f519a8b6658682284015e069745af52ffb07)
2022-01-30 00:05:54 +00:00
df
e1eae16b56 Handle default in switch better
Add a1fc7ca074
Thanks coletdjnz
2022-01-30 00:05:54 +00:00
df
96f87aaa3b Back-port JS interpreter upgrade from yt-dlp PR #1437 2022-01-30 00:05:54 +00:00
df
5f5de51a49 Add compat_map/filter and use the former 2022-01-30 00:05:36 +00:00
df
39ca35e765 Fix test_youtube_flat_playlist_extraction 2022-01-29 20:00:21 +00:00
df
d76d59d99d Remove obsolete non-working test_youtube_toptracks 2022-01-29 20:00:21 +00:00
df
2c2c2bd348 Fix test_youtube_mix 2022-01-29 20:00:21 +00:00
df
46e0a729b2 Remove obsolete test_youtube_course 2022-01-29 20:00:21 +00:00
df
57044eaceb Fix test_youtube_playlist_noplaylist 2022-01-29 20:00:21 +00:00
6 changed files with 666 additions and 170 deletions

View File

@ -112,6 +112,72 @@ class TestJSInterpreter(unittest.TestCase):
''') ''')
self.assertEqual(jsi.call_function('z'), 5) self.assertEqual(jsi.call_function('z'), 5)
def test_for_loop(self):
# function x() { a=0; for (i=0; i-10; i++) {a++} a }
jsi = JSInterpreter('''
function x() { a=0; for (i=0; i-10; i = i + 1) {a++} a }
''')
self.assertEqual(jsi.call_function('x'), 10)
def test_switch(self):
jsi = JSInterpreter('''
function x(f) { switch(f){
case 1:f+=1;
case 2:f+=2;
case 3:f+=3;break;
case 4:f+=4;
default:f=0;
} return f }
''')
self.assertEqual(jsi.call_function('x', 1), 7)
self.assertEqual(jsi.call_function('x', 3), 6)
self.assertEqual(jsi.call_function('x', 5), 0)
def test_switch_default(self):
jsi = JSInterpreter('''
function x(f) { switch(f){
case 2: f+=2;
default: f-=1;
case 5:
case 6: f+=6;
case 0: break;
case 1: f+=1;
} return f }
''')
self.assertEqual(jsi.call_function('x', 1), 2)
self.assertEqual(jsi.call_function('x', 5), 11)
self.assertEqual(jsi.call_function('x', 9), 14)
def test_try(self):
jsi = JSInterpreter('''
function x() { try{return 10} catch(e){return 5} }
''')
self.assertEqual(jsi.call_function('x'), 10)
def test_for_loop_continue(self):
jsi = JSInterpreter('''
function x() { a=0; for (i=0; i-10; i++) { continue; a++ } a }
''')
self.assertEqual(jsi.call_function('x'), 0)
def test_for_loop_break(self):
jsi = JSInterpreter('''
function x() { a=0; for (i=0; i-10; i++) { break; a++ } a }
''')
self.assertEqual(jsi.call_function('x'), 0)
def test_literal_list(self):
jsi = JSInterpreter('''
function x() { [1, 2, "asdf", [5, 6, 7]][3] }
''')
self.assertEqual(jsi.call_function('x'), [5, 6, 7])
def test_comma(self):
jsi = JSInterpreter('''
function x() { a=5; a -= 1, a+=3; return a }
''')
self.assertEqual(jsi.call_function('x'), 7)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals from __future__ import unicode_literals
# Allow direct execution # Allow direct execution
@ -9,11 +10,10 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import FakeYDL from test.helper import FakeYDL
from youtube_dl.extractor import ( from youtube_dl.extractor import (
YoutubeIE,
YoutubePlaylistIE, YoutubePlaylistIE,
YoutubeTabIE, YoutubeTabIE,
YoutubeIE,
) )
@ -25,38 +25,23 @@ class TestYoutubeLists(unittest.TestCase):
def test_youtube_playlist_noplaylist(self): def test_youtube_playlist_noplaylist(self):
dl = FakeYDL() dl = FakeYDL()
dl.params['noplaylist'] = True dl.params['noplaylist'] = True
dl.params['format'] = 'best'
ie = YoutubePlaylistIE(dl) ie = YoutubePlaylistIE(dl)
result = ie.extract('https://www.youtube.com/watch?v=FXxLjLQi3Fg&list=PLwiyx1dc3P2JR9N8gQaQN_BCvlSlap7re') result = ie.extract('https://www.youtube.com/watch?v=FXxLjLQi3Fg&list=PLwiyx1dc3P2JR9N8gQaQN_BCvlSlap7re')
self.assertEqual(result['_type'], 'url') self.assertEqual(result['_type'], 'url')
result = dl.extract_info(result['url'], download=False, ie_key=result.get('ie_key'), process=False)
self.assertEqual(YoutubeIE().extract_id(result['url']), 'FXxLjLQi3Fg') self.assertEqual(YoutubeIE().extract_id(result['url']), 'FXxLjLQi3Fg')
def test_youtube_course(self):
dl = FakeYDL()
ie = YoutubePlaylistIE(dl)
# TODO find a > 100 (paginating?) videos course
result = ie.extract('https://www.youtube.com/course?list=ECUl4u3cNGP61MdtwGTqZA0MreSaDybji8')
entries = list(result['entries'])
self.assertEqual(YoutubeIE().extract_id(entries[0]['url']), 'j9WZyLZCBzs')
self.assertEqual(len(entries), 25)
self.assertEqual(YoutubeIE().extract_id(entries[-1]['url']), 'rYefUsYuEp0')
def test_youtube_mix(self): def test_youtube_mix(self):
dl = FakeYDL() dl = FakeYDL()
ie = YoutubePlaylistIE(dl) dl.params['format'] = 'best'
result = ie.extract('https://www.youtube.com/watch?v=W01L70IGBgE&index=2&list=RDOQpdSVF_k_w') ie = YoutubeTabIE(dl)
entries = result['entries'] result = dl.extract_info('https://www.youtube.com/watch?v=uVJ0Il5WvbE&list=PLhQjrBD2T381k8ul4WQ8SQ165XqY149WW',
download=False, ie_key=ie.ie_key(), process=True)
entries = (result or {}).get('entries', [{'id': 'not_found', }])
self.assertTrue(len(entries) >= 50) self.assertTrue(len(entries) >= 50)
original_video = entries[0] original_video = entries[0]
self.assertEqual(original_video['id'], 'OQpdSVF_k_w') self.assertEqual(original_video['id'], 'uVJ0Il5WvbE')
def test_youtube_toptracks(self):
print('Skipping: The playlist page gives error 500')
return
dl = FakeYDL()
ie = YoutubePlaylistIE(dl)
result = ie.extract('https://www.youtube.com/playlist?list=MCUS')
entries = result['entries']
self.assertEqual(len(entries), 100)
def test_youtube_flat_playlist_extraction(self): def test_youtube_flat_playlist_extraction(self):
dl = FakeYDL() dl = FakeYDL()
@ -67,7 +52,7 @@ class TestYoutubeLists(unittest.TestCase):
entries = list(result['entries']) entries = list(result['entries'])
self.assertTrue(len(entries) == 1) self.assertTrue(len(entries) == 1)
video = entries[0] video = entries[0]
self.assertEqual(video['_type'], 'url_transparent') self.assertEqual(video['_type'], 'url')
self.assertEqual(video['ie_key'], 'Youtube') self.assertEqual(video['ie_key'], 'Youtube')
self.assertEqual(video['id'], 'BaW_jenozKc') self.assertEqual(video['id'], 'BaW_jenozKc')
self.assertEqual(video['url'], 'BaW_jenozKc') self.assertEqual(video['url'], 'BaW_jenozKc')

View File

@ -14,9 +14,10 @@ import string
from test.helper import FakeYDL from test.helper import FakeYDL
from youtube_dl.extractor import YoutubeIE from youtube_dl.extractor import YoutubeIE
from youtube_dl.jsinterp import JSInterpreter
from youtube_dl.compat import compat_str, compat_urlretrieve from youtube_dl.compat import compat_str, compat_urlretrieve
_TESTS = [ _SIG_TESTS = [
( (
'https://s.ytimg.com/yts/jsbin/html5player-vflHOr_nV.js', 'https://s.ytimg.com/yts/jsbin/html5player-vflHOr_nV.js',
86, 86,
@ -64,6 +65,25 @@ _TESTS = [
) )
] ]
_NSIG_TESTS = [
(
'https://www.youtube.com/s/player/9216d1f7/player_ias.vflset/en_US/base.js',
'SLp9F5bwjAdhE9F-', 'gWnb9IK2DJ8Q1w',
),
(
'https://www.youtube.com/s/player/f8cb7a3b/player_ias.vflset/en_US/base.js',
'oBo2h5euWy6osrUt', 'ivXHpm7qJjJN',
),
(
'https://www.youtube.com/s/player/2dfe380c/player_ias.vflset/en_US/base.js',
'oBo2h5euWy6osrUt', '3DIBbn3qdQ',
),
(
'https://www.youtube.com/s/player/f1ca6900/player_ias.vflset/en_US/base.js',
'cu3wyu6LQn2hse', 'jvxetvmlI9AN9Q',
),
]
class TestPlayerInfo(unittest.TestCase): class TestPlayerInfo(unittest.TestCase):
def test_youtube_extract_player_info(self): def test_youtube_extract_player_info(self):
@ -95,35 +115,54 @@ class TestSignature(unittest.TestCase):
os.mkdir(self.TESTDATA_DIR) os.mkdir(self.TESTDATA_DIR)
def make_tfunc(url, sig_input, expected_sig): def t_factory(name, sig_func, url_pattern):
m = re.match(r'.*-([a-zA-Z0-9_-]+)(?:/watch_as3|/html5player)?\.[a-z]+$', url) def make_tfunc(url, sig_input, expected_sig):
assert m, '%r should follow URL format' % url m = url_pattern.match(url)
test_id = m.group(1) assert m, '%r should follow URL format' % url
test_id = m.group('id')
def test_func(self): def test_func(self):
basename = 'player-%s.js' % test_id basename = 'player-{0}-{1}.js'.format(name, test_id)
fn = os.path.join(self.TESTDATA_DIR, basename) fn = os.path.join(self.TESTDATA_DIR, basename)
if not os.path.exists(fn): if not os.path.exists(fn):
compat_urlretrieve(url, fn) compat_urlretrieve(url, fn)
with io.open(fn, encoding='utf-8') as testf:
jscode = testf.read()
self.assertEqual(sig_func(jscode, sig_input), expected_sig)
ydl = FakeYDL() test_func.__name__ = str('test_{0}_js_{1}'.format(name, test_id))
ie = YoutubeIE(ydl) setattr(TestSignature, test_func.__name__, test_func)
with io.open(fn, encoding='utf-8') as testf: return make_tfunc
jscode = testf.read()
func = ie._parse_sig_js(jscode)
src_sig = (
compat_str(string.printable[:sig_input])
if isinstance(sig_input, int) else sig_input)
got_sig = func(src_sig)
self.assertEqual(got_sig, expected_sig)
test_func.__name__ = str('test_signature_js_' + test_id)
setattr(TestSignature, test_func.__name__, test_func)
for test_spec in _TESTS: def signature(jscode, sig_input):
make_tfunc(*test_spec) func = YoutubeIE(FakeYDL())._parse_sig_js(jscode)
src_sig = (
compat_str(string.printable[:sig_input])
if isinstance(sig_input, int) else sig_input)
return func(src_sig)
def n_sig(jscode, sig_input):
# Pending implementation of _extract_n_function_name() or similar in
# youtube.py, hard-code here
# funcname = YoutubeIE(FakeYDL())._extract_n_function_name(jscode)
import re
funcname = re.search(r'[=(,&|](\w+)\(\w+\),\w+\.set\("n",', jscode)
funcname = funcname and funcname.group(1)
return JSInterpreter(jscode).call_function(funcname, sig_input)
make_sig_test = t_factory(
'signature', signature, re.compile(r'.*-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player)?\.[a-z]+$'))
for test_spec in _SIG_TESTS:
make_sig_test(*test_spec)
make_nsig_test = t_factory(
'nsig', n_sig, re.compile(r'.+/player/(?P<id>[a-zA-Z0-9_-]+)/.+.js$'))
for test_spec in _NSIG_TESTS:
make_nsig_test(*test_spec)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -21,6 +21,10 @@ import subprocess
import sys import sys
import xml.etree.ElementTree import xml.etree.ElementTree
try:
import collections.abc as compat_collections_abc
except ImportError:
import collections as compat_collections_abc
try: try:
import urllib.request as compat_urllib_request import urllib.request as compat_urllib_request
@ -2962,6 +2966,25 @@ else:
compat_Struct = struct.Struct compat_Struct = struct.Struct
# compat_map/filter() returning an iterator, supposedly the
# same versioning as for zip below
try:
from future_builtins import map as compat_map
except ImportError:
try:
from itertools import imap as compat_map
except ImportError:
compat_map = map
try:
from future_builtins import filter as compat_filter
except ImportError:
try:
from itertools import ifilter as compat_filter
except ImportError:
compat_filter = filter
try: try:
from future_builtins import zip as compat_zip from future_builtins import zip as compat_zip
except ImportError: # not 2.6+ or is 3.x except ImportError: # not 2.6+ or is 3.x
@ -3006,6 +3029,7 @@ __all__ = [
'compat_b64decode', 'compat_b64decode',
'compat_basestring', 'compat_basestring',
'compat_chr', 'compat_chr',
'compat_collections_abc',
'compat_cookiejar', 'compat_cookiejar',
'compat_cookiejar_Cookie', 'compat_cookiejar_Cookie',
'compat_cookies', 'compat_cookies',
@ -3015,6 +3039,7 @@ __all__ = [
'compat_etree_fromstring', 'compat_etree_fromstring',
'compat_etree_register_namespace', 'compat_etree_register_namespace',
'compat_expanduser', 'compat_expanduser',
'compat_filter',
'compat_get_terminal_size', 'compat_get_terminal_size',
'compat_getenv', 'compat_getenv',
'compat_getpass', 'compat_getpass',
@ -3026,6 +3051,7 @@ __all__ = [
'compat_integer_types', 'compat_integer_types',
'compat_itertools_count', 'compat_itertools_count',
'compat_kwargs', 'compat_kwargs',
'compat_map',
'compat_numeric_types', 'compat_numeric_types',
'compat_ord', 'compat_ord',
'compat_os_name', 'compat_os_name',

View File

@ -13,6 +13,7 @@ from .common import InfoExtractor, SearchInfoExtractor
from ..compat import ( from ..compat import (
compat_chr, compat_chr,
compat_HTTPError, compat_HTTPError,
compat_map as map,
compat_parse_qs, compat_parse_qs,
compat_str, compat_str,
compat_urllib_parse_unquote_plus, compat_urllib_parse_unquote_plus,
@ -1253,6 +1254,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
raise ExtractorError('Cannot identify player %r' % player_url) raise ExtractorError('Cannot identify player %r' % player_url)
return id_m.group('id') return id_m.group('id')
def _get_player_code(self, video_id, player_url, player_id=None):
if not player_id:
player_id = self._extract_player_info(player_url)
if player_id not in self._code_cache:
self._code_cache[player_id] = self._download_webpage(
player_url, video_id,
note='Downloading player ' + player_id,
errnote='Download of %s failed' % player_url)
return self._code_cache[player_id]
def _extract_signature_function(self, video_id, player_url, example_sig): def _extract_signature_function(self, video_id, player_url, example_sig):
player_id = self._extract_player_info(player_url) player_id = self._extract_player_info(player_url)
@ -1265,12 +1277,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if cache_spec is not None: if cache_spec is not None:
return lambda s: ''.join(s[i] for i in cache_spec) return lambda s: ''.join(s[i] for i in cache_spec)
if player_id not in self._code_cache: code = self._get_player_code(video_id, player_url, player_id)
self._code_cache[player_id] = self._download_webpage(
player_url, video_id,
note='Downloading player ' + player_id,
errnote='Download of %s failed' % player_url)
code = self._code_cache[player_id]
res = self._parse_sig_js(code) res = self._parse_sig_js(code)
test_string = ''.join(map(compat_chr, range(len(example_sig)))) test_string = ''.join(map(compat_chr, range(len(example_sig))))
@ -1349,11 +1356,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if player_url is None: if player_url is None:
raise ExtractorError('Cannot decrypt signature without player_url') raise ExtractorError('Cannot decrypt signature without player_url')
if player_url.startswith('//'):
player_url = 'https:' + player_url
elif not re.match(r'https?://', player_url):
player_url = compat_urlparse.urljoin(
'https://www.youtube.com', player_url)
try: try:
player_id = (player_url, self._signature_cache_id(s)) player_id = (player_url, self._signature_cache_id(s))
if player_id not in self._player_cache: if player_id not in self._player_cache:
@ -1370,6 +1372,88 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
raise ExtractorError( raise ExtractorError(
'Signature extraction failed: ' + tb, cause=e) 'Signature extraction failed: ' + tb, cause=e)
def _extract_player_url(self, webpage):
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage or '', 'player URL', fatal=False)
if not player_url:
return
if player_url.startswith('//'):
player_url = 'https:' + player_url
elif not re.match(r'https?://', player_url):
player_url = compat_urlparse.urljoin(
'https://www.youtube.com', player_url)
return player_url
# from yt-dlp
# See also:
# 1. https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419
# 2. https://code.videolan.org/videolan/vlc/-/blob/4fb284e5af69aa9ac2100ccbdd3b88debec9987f/share/lua/playlist/youtube.lua#L116
# 3. https://github.com/ytdl-org/youtube-dl/issues/30097#issuecomment-950157377
def _extract_n_function_name(self, jscode):
return self._search_regex(
(r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]{3})\([a-zA-Z0-9]\)',),
jscode, 'Initial JS player n function name', group='nfunc')
def _extract_n_function(self, video_id, player_url):
player_id = self._extract_player_info(player_url)
func_code = self._downloader.cache.load('youtube-nsig', player_id)
if func_code:
jsi = JSInterpreter(func_code)
else:
player_id = self._extract_player_info(player_url)
jscode = self._get_player_code(video_id, player_url, player_id)
funcname = self._extract_n_function_name(jscode)
jsi = JSInterpreter(jscode)
func_code = jsi.extract_function_code(funcname)
self._downloader.cache.store('youtube-nsig', player_id, func_code)
if self._downloader.params.get('youtube_print_sig_code'):
self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(player_id, func_code[1]))
return lambda s: jsi.extract_function_from_code(*func_code)([s])
def _n_descramble(self, n_param, player_url, video_id):
"""Compute the response to YT's "n" parameter challenge
Args:
n_param -- challenge string that is the value of the
URL's "n" query parameter
player_url -- URL of YT player JS
video_id
"""
sig_id = ('nsig_value', n_param)
if sig_id in self._player_cache:
return self._player_cache[sig_id]
try:
player_id = ('nsig', player_url)
if player_id not in self._player_cache:
self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
func = self._player_cache[player_id]
self._player_cache[sig_id] = func(n_param)
if self._downloader.params.get('verbose', False):
self._downloader.to_screen('[debug] [%s] %s' % (self.IE_NAME, 'Decrypted nsig {0} => {1}'.format(n_param, self._player_cache[sig_id])))
return self._player_cache[sig_id]
except Exception as e:
raise ExtractorError(traceback.format_exc(), cause=e, video_id=video_id)
def _unthrottle_format_urls(self, video_id, player_url, formats):
for fmt in formats:
parsed_fmt_url = compat_urlparse.urlparse(fmt['url'])
qs = compat_urlparse.parse_qs(parsed_fmt_url.query)
n_param = qs.get('n')
if not n_param:
continue
n_param = n_param[-1]
n_response = self._n_descramble(n_param, player_url, video_id)
if n_response:
qs['n'] = [n_response]
fmt['url'] = compat_urlparse.urlunparse(
parsed_fmt_url._replace(query=compat_urllib_parse_urlencode(qs, True)))
def _mark_watched(self, video_id, player_response): def _mark_watched(self, video_id, player_response):
playback_url = url_or_none(try_get( playback_url = url_or_none(try_get(
player_response, player_response,
@ -1631,11 +1715,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if not (sc and fmt_url and encrypted_sig): if not (sc and fmt_url and encrypted_sig):
continue continue
if not player_url: if not player_url:
if not webpage: player_url = self._extract_player_url(webpage)
continue
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage, 'player URL', fatal=False)
if not player_url: if not player_url:
continue continue
signature = self._decrypt_signature(sc['s'][0], video_id, player_url) signature = self._decrypt_signature(sc['s'][0], video_id, player_url)
@ -1781,6 +1861,10 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
is_live = video_details.get('isLive') is_live = video_details.get('isLive')
owner_profile_url = microformat.get('ownerProfileUrl') owner_profile_url = microformat.get('ownerProfileUrl')
if not player_url:
player_url = self._extract_player_url(webpage)
self._unthrottle_format_urls(video_id, player_url, formats)
info = { info = {
'id': video_id, 'id': video_id,
'title': self._live_title(video_title) if is_live else video_title, 'title': self._live_title(video_title) if is_live else video_title,

View File

@ -8,6 +8,16 @@ from .utils import (
ExtractorError, ExtractorError,
remove_quotes, remove_quotes,
) )
from .compat import (
compat_collections_abc,
compat_str,
)
MutableMapping = compat_collections_abc.MutableMapping
class Nonlocal:
pass
_OPERATORS = [ _OPERATORS = [
('|', operator.or_), ('|', operator.or_),
@ -22,10 +32,56 @@ _OPERATORS = [
('*', operator.mul), ('*', operator.mul),
] ]
_ASSIGN_OPERATORS = [(op + '=', opfunc) for op, opfunc in _OPERATORS] _ASSIGN_OPERATORS = [(op + '=', opfunc) for op, opfunc in _OPERATORS]
_ASSIGN_OPERATORS.append(('=', lambda cur, right: right)) _ASSIGN_OPERATORS.append(('=', (lambda cur, right: right)))
_NAME_RE = r'[a-zA-Z_$][a-zA-Z_$0-9]*' _NAME_RE = r'[a-zA-Z_$][a-zA-Z_$0-9]*'
_MATCHING_PARENS = dict(zip(*zip('()', '{}', '[]')))
class JS_Break(ExtractorError):
def __init__(self):
ExtractorError.__init__(self, 'Invalid break')
class JS_Continue(ExtractorError):
def __init__(self):
ExtractorError.__init__(self, 'Invalid continue')
class LocalNameSpace(MutableMapping):
def __init__(self, *stack):
self.stack = tuple(stack)
def __getitem__(self, key):
for scope in self.stack:
if key in scope:
return scope[key]
raise KeyError(key)
def __setitem__(self, key, value):
for scope in self.stack:
if key in scope:
scope[key] = value
break
else:
self.stack[0][key] = value
return value
def __delitem__(self, key):
raise NotImplementedError('Deleting is not supported')
def __iter__(self):
for scope in self.stack:
for scope_item in iter(scope):
yield scope_item
def __len__(self, key):
return len(iter(self))
def __repr__(self):
return 'LocalNameSpace%s' % (self.stack, )
class JSInterpreter(object): class JSInterpreter(object):
def __init__(self, code, objects=None): def __init__(self, code, objects=None):
@ -34,11 +90,56 @@ class JSInterpreter(object):
self.code = code self.code = code
self._functions = {} self._functions = {}
self._objects = objects self._objects = objects
self.__named_object_counter = 0
def _named_object(self, namespace, obj):
self.__named_object_counter += 1
name = '__youtube_dl_jsinterp_obj%s' % (self.__named_object_counter, )
namespace[name] = obj
return name
@staticmethod
def _separate(expr, delim=',', max_split=None):
if not expr:
return
counters = {k: 0 for k in _MATCHING_PARENS.values()}
start, splits, pos, delim_len = 0, 0, 0, len(delim) - 1
for idx, char in enumerate(expr):
if char in _MATCHING_PARENS:
counters[_MATCHING_PARENS[char]] += 1
elif char in counters:
counters[char] -= 1
if char != delim[pos] or any(counters.values()):
pos = 0
continue
elif pos != delim_len:
pos += 1
continue
yield expr[start: idx - delim_len]
start, pos = idx + 1, 0
splits += 1
if max_split and splits >= max_split:
break
yield expr[start:]
@staticmethod
def _separate_at_paren(expr, delim):
separated = list(JSInterpreter._separate(expr, delim, 1))
if len(separated) < 2:
raise ExtractorError('No terminating paren {0} in {1}'.format(delim, expr))
return separated[0][1:].strip(), separated[1].strip()
def interpret_statement(self, stmt, local_vars, allow_recursion=100): def interpret_statement(self, stmt, local_vars, allow_recursion=100):
if allow_recursion < 0: if allow_recursion < 0:
raise ExtractorError('Recursion limit reached') raise ExtractorError('Recursion limit reached')
sub_statements = list(self._separate(stmt, ';'))
stmt = (sub_statements or ['']).pop()
for sub_stmt in sub_statements:
ret, should_abort = self.interpret_statement(sub_stmt, local_vars, allow_recursion - 1)
if should_abort:
return ret
should_abort = False should_abort = False
stmt = stmt.lstrip() stmt = stmt.lstrip()
stmt_m = re.match(r'var\s', stmt) stmt_m = re.match(r'var\s', stmt)
@ -61,25 +162,124 @@ class JSInterpreter(object):
if expr == '': # Empty expression if expr == '': # Empty expression
return None return None
if expr.startswith('('): if expr.startswith('{'):
parens_count = 0 inner, outer = self._separate_at_paren(expr, '}')
for m in re.finditer(r'[()]', expr): inner, should_abort = self.interpret_statement(inner, local_vars, allow_recursion - 1)
if m.group(0) == '(': if not outer or should_abort:
parens_count += 1 return inner
else:
parens_count -= 1
if parens_count == 0:
sub_expr = expr[1:m.start()]
sub_result = self.interpret_expression(
sub_expr, local_vars, allow_recursion)
remaining_expr = expr[m.end():].strip()
if not remaining_expr:
return sub_result
else:
expr = json.dumps(sub_result) + remaining_expr
break
else: else:
raise ExtractorError('Premature end of parens in %r' % expr) expr = json.dumps(inner) + outer
if expr.startswith('('):
inner, outer = self._separate_at_paren(expr, ')')
inner = self.interpret_expression(inner, local_vars, allow_recursion)
if not outer:
return inner
else:
expr = json.dumps(inner) + outer
if expr.startswith('['):
inner, outer = self._separate_at_paren(expr, ']')
name = self._named_object(local_vars, [
self.interpret_expression(item, local_vars, allow_recursion)
for item in self._separate(inner)])
expr = name + outer
m = re.match(r'try\s*', expr)
if m:
if expr[m.end()] == '{':
try_expr, expr = self._separate_at_paren(expr[m.end():], '}')
else:
try_expr, expr = expr[m.end() - 1:], ''
ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion - 1)
if should_abort:
return ret
return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
m = re.match(r'(?:(?P<catch>catch)|(?P<for>for)|(?P<switch>switch))\s*\(', expr)
md = m.groupdict() if m else {}
if md.get('catch'):
# We ignore the catch block
_, expr = self._separate_at_paren(expr, '}')
return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
elif md.get('for'):
def raise_constructor_error(c):
raise ExtractorError(
'Premature return in the initialization of a for loop in {0!r}'.format(c))
constructor, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
if remaining.startswith('{'):
body, expr = self._separate_at_paren(remaining, '}')
else:
m = re.match(r'switch\s*\(', remaining) # FIXME
if m:
switch_val, remaining = self._separate_at_paren(remaining[m.end() - 1:], ')')
body, expr = self._separate_at_paren(remaining, '}')
body = 'switch(%s){%s}' % (switch_val, body)
else:
body, expr = remaining, ''
start, cndn, increment = self._separate(constructor, ';')
if self.interpret_statement(start, local_vars, allow_recursion - 1)[1]:
raise_constructor_error(constructor)
while True:
if not self.interpret_expression(cndn, local_vars, allow_recursion):
break
try:
ret, should_abort = self.interpret_statement(body, local_vars, allow_recursion - 1)
if should_abort:
return ret
except JS_Break:
break
except JS_Continue:
pass
if self.interpret_statement(increment, local_vars, allow_recursion - 1)[1]:
raise_constructor_error(constructor)
return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
elif md.get('switch'):
switch_val, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
switch_val = self.interpret_expression(switch_val, local_vars, allow_recursion)
body, expr = self._separate_at_paren(remaining, '}')
items = body.replace('default:', 'case default:').split('case ')[1:]
for default in (False, True):
matched = False
for item in items:
case, stmt = [i.strip() for i in self._separate(item, ':', 1)]
if default:
matched = matched or case == 'default'
elif not matched:
matched = (case != 'default'
and switch_val == self.interpret_expression(case, local_vars, allow_recursion))
if not matched:
continue
try:
ret, should_abort = self.interpret_statement(stmt, local_vars, allow_recursion - 1)
if should_abort:
return ret
except JS_Break:
break
if matched:
break
return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
# Comma separated statements
sub_expressions = list(self._separate(expr))
expr = sub_expressions.pop().strip() if sub_expressions else ''
for sub_expr in sub_expressions:
self.interpret_expression(sub_expr, local_vars, allow_recursion)
for m in re.finditer(r'''(?x)
(?P<pre_sign>\+\+|--)(?P<var1>%(_NAME_RE)s)|
(?P<var2>%(_NAME_RE)s)(?P<post_sign>\+\+|--)''' % globals(), expr):
var = m.group('var1') or m.group('var2')
start, end = m.span()
sign = m.group('pre_sign') or m.group('post_sign')
ret = local_vars[var]
local_vars[var] += 1 if sign[0] == '+' else -1
if m.group('pre_sign'):
ret = local_vars[var]
expr = expr[:start] + json.dumps(ret) + expr[end:]
for op, opfunc in _ASSIGN_OPERATORS: for op, opfunc in _ASSIGN_OPERATORS:
m = re.match(r'''(?x) m = re.match(r'''(?x)
@ -88,14 +288,13 @@ class JSInterpreter(object):
(?P<expr>.*)$''' % (_NAME_RE, re.escape(op)), expr) (?P<expr>.*)$''' % (_NAME_RE, re.escape(op)), expr)
if not m: if not m:
continue continue
right_val = self.interpret_expression( right_val = self.interpret_expression(m.group('expr'), local_vars, allow_recursion)
m.group('expr'), local_vars, allow_recursion - 1)
if m.groupdict().get('index'): if m.groupdict().get('index'):
lvar = local_vars[m.group('out')] lvar = local_vars[m.group('out')]
idx = self.interpret_expression( idx = self.interpret_expression(m.group('index'), local_vars, allow_recursion)
m.group('index'), local_vars, allow_recursion) if not isinstance(idx, int):
assert isinstance(idx, int) raise ExtractorError('List indices must be integers: %s' % (idx, ))
cur = lvar[idx] cur = lvar[idx]
val = opfunc(cur, right_val) val = opfunc(cur, right_val)
lvar[idx] = val lvar[idx] = val
@ -109,8 +308,13 @@ class JSInterpreter(object):
if expr.isdigit(): if expr.isdigit():
return int(expr) return int(expr)
if expr == 'break':
raise JS_Break()
elif expr == 'continue':
raise JS_Continue()
var_m = re.match( var_m = re.match(
r'(?!if|return|true|false)(?P<name>%s)$' % _NAME_RE, r'(?!if|return|true|false|null)(?P<name>%s)$' % _NAME_RE,
expr) expr)
if var_m: if var_m:
return local_vars[var_m.group('name')] return local_vars[var_m.group('name')]
@ -124,91 +328,161 @@ class JSInterpreter(object):
r'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE, expr) r'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE, expr)
if m: if m:
val = local_vars[m.group('in')] val = local_vars[m.group('in')]
idx = self.interpret_expression( idx = self.interpret_expression(m.group('idx'), local_vars, allow_recursion)
m.group('idx'), local_vars, allow_recursion - 1)
return val[idx] return val[idx]
def raise_expr_error(where, op, exp):
raise ExtractorError('Premature {0} return of {1} in {2!r}'.format(where, op, exp))
for op, opfunc in _OPERATORS:
separated = list(self._separate(expr, op))
if len(separated) < 2:
continue
right_val = separated.pop()
left_val = op.join(separated)
left_val, should_abort = self.interpret_statement(
left_val, local_vars, allow_recursion - 1)
if should_abort:
raise_expr_error('left-side', op, expr)
right_val, should_abort = self.interpret_statement(
right_val, local_vars, allow_recursion - 1)
if should_abort:
raise_expr_error('right-side', op, expr)
return opfunc(left_val or 0, right_val)
m = re.match( m = re.match(
r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*(?:\(+(?P<args>[^()]*)\))?$' % _NAME_RE, r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*' % _NAME_RE,
expr) expr)
if m: if m:
variable = m.group('var') variable = m.group('var')
member = remove_quotes(m.group('member') or m.group('member2')) nl = Nonlocal()
arg_str = m.group('args')
if variable in local_vars: nl.member = remove_quotes(m.group('member') or m.group('member2'))
obj = local_vars[variable] arg_str = expr[m.end():]
if arg_str.startswith('('):
arg_str, remaining = self._separate_at_paren(arg_str, ')')
else: else:
if variable not in self._objects: arg_str, remaining = None, arg_str
self._objects[variable] = self.extract_object(variable)
obj = self._objects[variable]
if arg_str is None: def assertion(cndn, msg):
# Member access """ assert, but without risk of getting optimized out """
if member == 'length': if not cndn:
return len(obj) raise ExtractorError('{0} {1}: {2}'.format(nl.member, msg, expr))
return obj[member]
assert expr.endswith(')') def eval_method():
# Function call # nonlocal member
if arg_str == '': member = nl.member
argvals = tuple() if variable == 'String':
else: obj = compat_str
argvals = tuple([ elif variable in local_vars:
obj = local_vars[variable]
else:
if variable not in self._objects:
self._objects[variable] = self.extract_object(variable)
obj = self._objects[variable]
if arg_str is None:
# Member access
if member == 'length':
return len(obj)
return obj[member]
# Function call
argvals = [
self.interpret_expression(v, local_vars, allow_recursion) self.interpret_expression(v, local_vars, allow_recursion)
for v in arg_str.split(',')]) for v in self._separate(arg_str)]
if member == 'split': if obj == compat_str:
assert argvals == ('',) if member == 'fromCharCode':
return list(obj) assertion(argvals, 'takes one or more arguments')
if member == 'join': return ''.join(map(chr, argvals))
assert len(argvals) == 1 raise ExtractorError('Unsupported string method %s' % (member, ))
return argvals[0].join(obj)
if member == 'reverse':
assert len(argvals) == 0
obj.reverse()
return obj
if member == 'slice':
assert len(argvals) == 1
return obj[argvals[0]:]
if member == 'splice':
assert isinstance(obj, list)
index, howMany = argvals
res = []
for i in range(index, min(index + howMany, len(obj))):
res.append(obj.pop(index))
return res
return obj[member](argvals) if member == 'split':
assertion(argvals, 'takes one or more arguments')
assertion(argvals == [''], 'with arguments is not implemented')
return list(obj)
elif member == 'join':
assertion(isinstance(obj, list), 'must be applied on a list')
assertion(len(argvals) == 1, 'takes exactly one argument')
return argvals[0].join(obj)
elif member == 'reverse':
assertion(not argvals, 'does not take any arguments')
obj.reverse()
return obj
elif member == 'slice':
assertion(isinstance(obj, list), 'must be applied on a list')
assertion(len(argvals) == 1, 'takes exactly one argument')
return obj[argvals[0]:]
elif member == 'splice':
assertion(isinstance(obj, list), 'must be applied on a list')
assertion(argvals, 'takes one or more arguments')
index, howMany = map(int, (argvals + [len(obj)])[:2])
if index < 0:
index += len(obj)
add_items = argvals[2:]
res = []
for i in range(index, min(index + howMany, len(obj))):
res.append(obj.pop(index))
for i, item in enumerate(add_items):
obj.insert(index + i, item)
return res
elif member == 'unshift':
assertion(isinstance(obj, list), 'must be applied on a list')
assertion(argvals, 'takes one or more arguments')
for item in reversed(argvals):
obj.insert(0, item)
return obj
elif member == 'pop':
assertion(isinstance(obj, list), 'must be applied on a list')
assertion(not argvals, 'does not take any arguments')
if not obj:
return
return obj.pop()
elif member == 'push':
assertion(argvals, 'takes one or more arguments')
obj.extend(argvals)
return obj
elif member == 'forEach':
assertion(argvals, 'takes one or more arguments')
assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
f, this = (argvals + [''])[:2]
return [f((item, idx, obj), this=this) for idx, item in enumerate(obj)]
elif member == 'indexOf':
assertion(argvals, 'takes one or more arguments')
assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
idx, start = (argvals + [0])[:2]
try:
return obj.index(idx, start)
except ValueError:
return -1
for op, opfunc in _OPERATORS: if isinstance(obj, list):
m = re.match(r'(?P<x>.+?)%s(?P<y>.+)' % re.escape(op), expr) member = int(member)
if not m: nl.member = member
continue return obj[member](argvals)
x, abort = self.interpret_statement(
m.group('x'), local_vars, allow_recursion - 1)
if abort:
raise ExtractorError(
'Premature left-side return of %s in %r' % (op, expr))
y, abort = self.interpret_statement(
m.group('y'), local_vars, allow_recursion - 1)
if abort:
raise ExtractorError(
'Premature right-side return of %s in %r' % (op, expr))
return opfunc(x, y)
m = re.match( if remaining:
r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr) return self.interpret_expression(
self._named_object(local_vars, eval_method()) + remaining,
local_vars, allow_recursion)
else:
return eval_method()
m = re.match(r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr)
if m: if m:
fname = m.group('func') fname = m.group('func')
argvals = tuple([ argvals = tuple([
int(v) if v.isdigit() else local_vars[v] int(v) if v.isdigit() else local_vars[v]
for v in m.group('args').split(',')]) if len(m.group('args')) > 0 else tuple() for v in self._separate(m.group('args'))])
if fname not in self._functions: if fname in local_vars:
return local_vars[fname](argvals)
elif fname not in self._functions:
self._functions[fname] = self.extract_function(fname) self._functions[fname] = self.extract_function(fname)
return self._functions[fname](argvals) return self._functions[fname](argvals)
raise ExtractorError('Unsupported JS expression %r' % expr) if expr:
raise ExtractorError('Unsupported JS expression %r' % expr)
def extract_object(self, objname): def extract_object(self, objname):
_FUNC_NAME_RE = r'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')''' _FUNC_NAME_RE = r'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
@ -233,30 +507,52 @@ class JSInterpreter(object):
return obj return obj
def extract_function(self, funcname): def extract_function_code(self, funcname):
""" @returns argnames, code """
func_m = re.search( func_m = re.search(
r'''(?x) r'''(?x)
(?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s* (?:function\s+%(f_n)s|[{;,]\s*%(f_n)s\s*=\s*function|var\s+%(f_n)s\s*=\s*function)\s*
\((?P<args>[^)]*)\)\s* \((?P<args>[^)]*)\)\s*
\{(?P<code>[^}]+)\}''' % ( (?P<code>\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % {'f_n': re.escape(funcname), },
re.escape(funcname), re.escape(funcname), re.escape(funcname)),
self.code) self.code)
code, _ = self._separate_at_paren(func_m.group('code'), '}') # refine the match
if func_m is None: if func_m is None:
raise ExtractorError('Could not find JS function %r' % funcname) raise ExtractorError('Could not find JS function %r' % funcname)
argnames = func_m.group('args').split(',') return func_m.group('args').split(','), code
return self.build_function(argnames, func_m.group('code')) def extract_function(self, funcname):
return self.extract_function_from_code(*self.extract_function_code(funcname))
def extract_function_from_code(self, argnames, code, *global_stack):
local_vars = {}
while True:
mobj = re.search(r'function\((?P<args>[^)]*)\)\s*{', code)
if mobj is None:
break
start, body_start = mobj.span()
body, remaining = self._separate_at_paren(code[body_start - 1:], '}')
name = self._named_object(
local_vars,
self.extract_function_from_code(
[x.strip() for x in mobj.group('args').split(',')],
body, local_vars, *global_stack))
code = code[:start] + name + remaining
return self.build_function(argnames, code, local_vars, *global_stack)
def call_function(self, funcname, *args): def call_function(self, funcname, *args):
f = self.extract_function(funcname) return self.extract_function(funcname)(args)
return f(args)
def build_function(self, argnames, code): def build_function(self, argnames, code, *global_stack):
def resf(args): global_stack = list(global_stack) or [{}]
local_vars = dict(zip(argnames, args)) local_vars = global_stack.pop(0)
for stmt in code.split(';'):
res, abort = self.interpret_statement(stmt, local_vars) def resf(args, **kwargs):
if abort: local_vars.update(dict(zip(argnames, args)))
local_vars.update(kwargs)
var_stack = LocalNameSpace(local_vars, *global_stack)
for stmt in self._separate(code.replace('\n', ''), ';'):
ret, should_abort = self.interpret_statement(stmt, var_stack)
if should_abort:
break break
return res return ret
return resf return resf