Skip to content
191 changes: 138 additions & 53 deletions snscrape/modules/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import snscrape.base
import typing
import urllib.parse
import unittest
import threading

_logger = logging.getLogger(__name__)
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
Expand Down Expand Up @@ -152,7 +154,7 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
continue

if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
Expand All @@ -161,48 +163,23 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue

if link.text.startswith('@'):
mentions.append(link.text.strip('@'))
continue

if link.text.startswith('#'):
hashtags.append(link.text.strip('#'))
continue

if 'tgme_widget_message_voice_player' in link.get('class', []):
media.append(_parse_voice_message(link))

if 'tgme_widget_message_video_player' in link.get('class', []):
media.append(_parse_video_message(link))

href = urllib.parse.urljoin(pageUrl, link['href'])
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
outlinks.append(href)

for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = _durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]

media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))

for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = _durationStrToSeconds(durationStr)
media.append(cls(**mKwargs))

linkPreview = None
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
kwargs = {}
Expand All @@ -219,8 +196,6 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
else:
_logger.warning(f'Could not process link preview image on {url}')
linkPreview = LinkPreview(**kwargs)
if kwargs['href'] in outlinks:
outlinks.remove(kwargs['href'])

viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
views = None if viewsSpan is None else _parse_num(viewsSpan.text)
Expand All @@ -239,24 +214,20 @@ def get_items(self):
return
nextPageUrl = ''
while True:
if soup.find("div", class_ = "tme_no_messages_found"):
break
yield from self._soup_to_items(soup, r.url)
try:
if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1':
dateElt = soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)
if dateElt and 'href' in dateElt.attrs:
urlPieces = dateElt['href'].split('/')
if urlPieces and urlPieces[-1] == '1':
# if message 1 is the first message in the page, terminate scraping
break
except:
pass
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
if not pageLink:
# some pages are missing a "tme_messages_more" tag, causing early termination
if '=' not in nextPageUrl:
nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href']
nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20
if nextPostIndex > 20:
pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'}
else:
break
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
if pageLink := soup.find('link', attrs = {'rel': 'prev'}, href = True):
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
else:
nextPostIndex = int(soup.find('div', attrs = {'class': 'tgme_widget_message', 'data-post': True})["data-post"].split("/")[-1])
nextPageUrl = urllib.parse.urljoin(r.url, r.url.split('?')[0] + f'?before={nextPostIndex}')
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = _telegramResponseOkCallback)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
Expand Down Expand Up @@ -333,4 +304,118 @@ def _telegramResponseOkCallback(r):
if r.status_code == 200:
return (True, None)
return (False, f'{r.status_code=}')


def _parse_voice_message(voicePlayer):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = _durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
return VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)

def _parse_video_message(videoPlayer):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
# Example of duration-less GIF: https://t.me/thisisatestchannel19451923/3
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = _durationStrToSeconds(durationStr)
return cls(**mKwargs)


class TestTelegramChannelScraper(unittest.TestCase):
"""Run suite by directly running this file."""

@staticmethod
def execute_with_timeout(func, timeout=10):
"""
Executes a function in a separate thread and enforces a timeout.
If provided function throws an error, it's re-raised in main thread.
Used to detect infinite loops in finite time, works cross-platform.

:param func: The function to execute. This function should accept no arguments.
:param timeout: The timeout in seconds.
"""
exceptions=[]
def func_passing_exceptions():
try:
func()
except Exception as e:
exceptions.append((e.__class__, e, e.__traceback__))

thread = threading.Thread(target=func_passing_exceptions)
thread.start()
thread.join(timeout=timeout)

if exceptions:
exc_class, exc_instance, traceback = exceptions[0]
raise exc_class(exc_instance).with_traceback(traceback)

if thread.is_alive():
raise TimeoutError(f"Function didn't complete within {timeout} seconds")

def test_scraping_termination_missing_prev(self):
"""Test scraping always terminates, even if the page's prev link is missing."""

def scrape_two_pages():
scraper = TelegramChannelScraper('WLM_USA_TEXAS?before=3766')
items = list()
num_items_on_page = 20
for item in scraper.get_items():
items.append(item)
if len(items) > 2 * num_items_on_page:
break

self.execute_with_timeout(scrape_two_pages)

def test_scraping_termination_small_post_count(self):
"""Test scraping always terminates, even with small number of posts. This channel's highest ID is 28."""

def scrape_small_channel():
scraper = TelegramChannelScraper('AKCPB')
items = list(scraper.get_items())
return items

self.execute_with_timeout(scrape_small_channel)

def test_scraping_termination_pages_without_posts(self):
"""Test scraping gracefully handles pages without any posts."""

def scrape_empty_page():
scraper = TelegramChannelScraper('BREAKDCODE?before=3')
for _ in scraper.get_items():
pass

self.execute_with_timeout(scrape_empty_page)

def test_media_order_preservation(self):
"""Test scraped media appears in the same order as in the post."""
scraper = TelegramChannelScraper('nexta_live?before=43103')
item = next(scraper.get_items(), None)
self.assertIsNotNone(item, "Failed to scrape any posts.")

# This particular post is known to include media [Video, Photo, Video]
self.assertEqual(item.url, "https://t.me/s/nexta_live/43102")

expected_types = [Video, Photo, Video]
actual_types = [type(media) for media in item.media] if item.media else []

self.assertEqual(actual_types, expected_types, "Media did not appear in the expected order.")


if __name__ == '__main__':
unittest.main()