diff --git a/youtube_api_cmd.py b/youtube_api_cmd.py index a5f4170..f013f52 100644 --- a/youtube_api_cmd.py +++ b/youtube_api_cmd.py @@ -1,46 +1,74 @@ + +#!/usr/bin/env python3 + """ -*- coding: utf-8 -*- ======================== Python YouTube API ======================== - Developed by: Chirag Rathod (Srce Cde) Email: chiragr83@gmail.com - -======================== """ - import json +import jsonlines import sys from urllib import * import argparse from urllib.parse import urlparse, urlencode, parse_qs from urllib.request import urlopen +import jsonlines + +saved_ids = [] +with jsonlines.open('../filtered_domains.jsonl', 'r') as reader: + for video in reader: + saved_ids.append(video["vid"]) YOUTUBE_COMMENT_URL = 'https://www.googleapis.com/youtube/v3/commentThreads' YOUTUBE_SEARCH_URL = 'https://www.googleapis.com/youtube/v3/search' - class YouTubeApi(): - def load_comments(self, mat): - for item in mat["items"]: - comment = item["snippet"]["topLevelComment"] - author = comment["snippet"]["authorDisplayName"] - text = comment["snippet"]["textDisplay"] - print("Comment by {}: {}".format(author, text)) - if 'replies' in item.keys(): - for reply in item['replies']['comments']: - rauthor = reply['snippet']['authorDisplayName'] - rtext = reply["snippet"]["textDisplay"] + def get_video_comment(self, vid): + print(vid) + def load_comments(self, vid): + + for item in mat["items"]: + comment = item["snippet"]["topLevelComment"] + author = comment["snippet"]["authorDisplayName"] + text = comment["snippet"]["textDisplay"] + with jsonlines.open('comments.jsonl', 'a') as writer: + writer.write( + { + "vID": vid, + "cID": str(comment["id"]), + "author": str(author), + "comment": str(text) + + } + ) + if 'replies' in item.keys(): + for reply in item['replies']['comments']: + + + rauthor = reply['snippet']['authorDisplayName'] + rtext = reply["snippet"]["textDisplay"] + + with jsonlines.open('comments.jsonl', 'a') as writer: + writer.write( + { + "vid": vid, + "cid": str(reply["id"]), + "author": str(rauthor), + "comment": str(rtext) + + } + ) + - print("\n\tReply by {}: {}".format(rauthor, rtext), "\n") - - def get_video_comment(self): parser = argparse.ArgumentParser() mxRes = 20 - vid = str() + parser.add_argument("--c", help="calls comment function by keyword function", action='store_true') parser.add_argument("--max", help="number of comments to return") parser.add_argument("--videourl", help="Required URL for which comments to return") @@ -51,19 +79,10 @@ def get_video_comment(self): if not args.max: args.max = mxRes - if not args.videourl: - exit("Please specify video URL using the --videourl=parameter.") if not args.key: exit("Please specify API key using the --key=parameter.") - try: - video_id = urlparse(str(args.videourl)) - q = parse_qs(video_id.query) - vid = q["v"][0] - - except: - print("Invalid YouTube URL") parms = { 'part': 'snippet,replies', @@ -81,17 +100,19 @@ def get_video_comment(self): nextPageToken = mat.get("nextPageToken") print("\nPage : 1") print("------------------------------------------------------------------") - self.load_comments(mat) + load_comments(self, vid) while nextPageToken: parms.update({'pageToken': nextPageToken}) matches = self.openURL(YOUTUBE_COMMENT_URL, parms) mat = json.loads(matches) nextPageToken = mat.get("nextPageToken") + if i > 10: + sys.exit() print("\nPage : ", i) print("------------------------------------------------------------------") - self.load_comments(mat) + load_comments(self, vid) i += 1 except KeyboardInterrupt: @@ -100,24 +121,24 @@ def get_video_comment(self): except: print("Cannot Open URL or Fetch comments at a moment") - def load_search_res(self, search_response): - videos, channels, playlists = [], [], [] - for search_result in search_response.get("items", []): - if search_result["id"]["kind"] == "youtube#video": - videos.append("{} ({})".format(search_result["snippet"]["title"], - search_result["id"]["videoId"])) - elif search_result["id"]["kind"] == "youtube#channel": - channels.append("{} ({})".format(search_result["snippet"]["title"], - search_result["id"]["channelId"])) - elif search_result["id"]["kind"] == "youtube#playlist": - playlists.append("{} ({})".format(search_result["snippet"]["title"], - search_result["id"]["playlistId"])) - - print("Videos:\n", "\n".join(videos), "\n") - print("Channels:\n", "\n".join(channels), "\n") - print("Playlists:\n", "\n".join(playlists), "\n") - def search_keyword(self): + + def load_search_res(self): + for search_result in search_response.get("items", []): + if search_result["id"]["kind"] == "youtube#video": + videos.append("{} ({})".format(search_result["snippet"]["title"], + search_result["id"]["videoId"])) + elif search_result["id"]["kind"] == "youtube#channel": + channels.append("{} ({})".format(search_result["snippet"]["title"], + search_result["id"]["channelId"])) + elif search_result["id"]["kind"] == "youtube#playlist": + playlists.append("{} ({})".format(search_result["snippet"]["title"], + search_result["id"]["playlistId"])) + + print("Videos:\n", "\n".join(videos), "\n") + print("Channels:\n", "\n".join(channels), "\n") + print("Playlists:\n", "\n".join(playlists), "\n") + parser = argparse.ArgumentParser() mxRes = 20 parser.add_argument("--s", help="calls the search by keyword function", action='store_true') @@ -150,9 +171,12 @@ def search_keyword(self): nextPageToken = search_response.get("nextPageToken") + videos = [] + channels = [] + playlists = [] print("\nPage : 1 --- Region : {}".format(args.r)) print("------------------------------------------------------------------") - self.load_search_res(search_response) + load_search_res(self) while nextPageToken: parms.update({'pageToken': nextPageToken}) @@ -163,7 +187,7 @@ def search_keyword(self): print("Page : {} --- Region : {}".format(i, args.r)) print("------------------------------------------------------------------") - self.load_search_res(search_response) + load_search_res(self) i += 1 @@ -173,16 +197,17 @@ def search_keyword(self): except: print("Cannot Open URL or Fetch comments at a moment") - def load_channel_vid(self, search_response): - videos = [] - for search_result in search_response.get("items", []): - if search_result["id"]["kind"] == "youtube#video": - videos.append("{} ({})".format(search_result["snippet"]["title"], - search_result["id"]["videoId"])) + def channel_videos(self): - print("###Videos:###\n", "\n".join(videos), "\n") + def load_channel_vid(self): + + for search_result in search_response.get("items", []): + if search_result["id"]["kind"] == "youtube#video": + videos.append("{} ({})".format(search_result["snippet"]["title"], + search_result["id"]["videoId"])) + + print("###Videos:###\n", "\n".join(videos), "\n") - def channel_videos(self): parser = argparse.ArgumentParser() mxRes = 20 parser.add_argument("--sc", help="calls the search by channel by keyword function", action='store_true') @@ -213,28 +238,32 @@ def channel_videos(self): search_response = json.loads(matches) + videos = [] i = 2 nextPageToken = search_response.get("nextPageToken") print("\nPage : 1") print("------------------------------------------------------------------") - self.load_channel_vid(search_response) + load_channel_vid(self) while nextPageToken: + parms.update({'pageToken': nextPageToken}) matches = self.openURL(YOUTUBE_SEARCH_URL, parms) search_response = json.loads(matches) nextPageToken = search_response.get("nextPageToken") + if i > 10: + sys.exit() print("Page : ", i) print("------------------------------------------------------------------") - self.load_channel_vid(search_response) + load_channel_vid(self) i += 1 - except KeyboardInterrupt: + except KeyboardI3nterrupt: print("User Aborted the Operation") except: @@ -254,7 +283,8 @@ def main(): if str(sys.argv[1]) == "--s": y.search_keyword() elif str(sys.argv[1]) == "--c": - y.get_video_comment() + for vid in saved_ids: + y.get_video_comment(vid) elif str(sys.argv[1]) == "--sc": y.channel_videos() else: