11import re
2- from typing import Optional , Literal , cast
3- from .utils .get_public_ip import get_public_ip
4- from .utils .get_device_ip import get_device_ip
5- from .utils .curl import send_query
2+ from typing import Dict , Literal , Optional , cast
3+
64from .AnonymityResult import AnonymityResult
5+ from .utils .curl import send_query
6+ from .utils .get_device_ip import get_device_ip
7+ from .utils .get_public_ip import get_public_ip
8+
79
10+ # ==============================
11+ # Privacy headers (fallback only)
12+ # ==============================
813PRIVACY_HEADERS = {
914 "VIA" ,
1015 "X-FORWARDED-FOR" ,
1621 "PROXY-CONNECTION" ,
1722}
1823
19- # Compile regex once for speed
2024PRIVACY_REGEX = re .compile (
2125 r"(?i)\b(" + "|" .join (re .escape (h ) for h in PRIVACY_HEADERS ) + r")\b"
2226)
2327
2428
29+ # ==============================
30+ # AZENV PARSER
31+ # ==============================
32+ def parse_azenv_to_dict (response : str ) -> Dict [str , str ]:
33+ match = re .search (r"<pre>(.*?)</pre>" , response , re .S )
34+ if not match :
35+ return {}
36+
37+ content = match .group (1 )
38+ headers : Dict [str , str ] = {}
39+
40+ for line in content .splitlines ():
41+ if "=" in line :
42+ k , v = line .split ("=" , 1 )
43+ headers [k .strip ()] = v .strip ()
44+
45+ return headers
46+
47+
48+ def extract_first_ip (value : str ) -> str :
49+ return value .split ("," )[0 ].strip ()
50+
51+
52+ def classify_proxy_from_azenv (
53+ response : str ,
54+ expected_ip : Optional [str ] = None ,
55+ ) -> AnonymityResult :
56+ headers = parse_azenv_to_dict (response )
57+
58+ # �� Not AZEnv response
59+ if not headers :
60+ return AnonymityResult (anonymity = None )
61+
62+ remote_ip = headers .get ("REMOTE_ADDR" )
63+ xff = headers .get ("HTTP_X_FORWARDED_FOR" , "" )
64+ real_ip = extract_first_ip (xff ) if xff else None
65+
66+ # CDN fallback (Cloudflare etc.)
67+ cf_ip = headers .get ("HTTP_CF_CONNECTING_IP" )
68+
69+ proxy_headers = [
70+ "HTTP_VIA" ,
71+ "HTTP_CF_RAY" ,
72+ "HTTP_CDN_LOOP" ,
73+ "HTTP_X_FORWARDED_PROTO" ,
74+ ]
75+
76+ has_proxy_header = any (h in headers for h in proxy_headers )
77+
78+ # Best guess of real/public IP
79+ public_ip = real_ip or cf_ip or remote_ip
80+
81+ result = AnonymityResult (
82+ anonymity = None ,
83+ remote_addr = remote_ip ,
84+ public_ip = public_ip ,
85+ )
86+
87+ # ==============================
88+ # 🚨 Transparent (real IP leaked)
89+ # ==============================
90+ if real_ip :
91+ if expected_ip :
92+ if real_ip != expected_ip :
93+ result .anonymity = "Transparent"
94+ return result
95+ else :
96+ result .anonymity = "Transparent"
97+ return result
98+
99+ # ==============================
100+ # 🟡 Anonymous (proxy detected)
101+ # ==============================
102+ if has_proxy_header :
103+ result .anonymity = "Anonymous"
104+ return result
105+
106+ # ==============================
107+ # 🟢 Elite (clean)
108+ # ==============================
109+ if expected_ip :
110+ if remote_ip == expected_ip :
111+ result .anonymity = "Elite"
112+ else :
113+ result .anonymity = "Anonymous"
114+ else :
115+ result .anonymity = "Elite"
116+
117+ return result
118+
119+
120+ # ==============================
121+ # MAIN CLASS
122+ # ==============================
25123class ProxyAnonymity :
26124 def __init__ (self ):
27125 self .proxy_judges = [
@@ -36,28 +134,35 @@ def __init__(self):
36134 def get_anonymity (
37135 self , proxy : str , verbose : bool = False , timeout : int = 60000
38136 ) -> AnonymityResult :
137+
138+ # Normalize proxy
39139 if "://" in proxy :
40140 proxy = proxy .split ("://" , 1 )[1 ]
41- public_ip = get_public_ip (proxy_info = {"proxy" : proxy })
141+
142+ proxy_ip = proxy .split (":" , 1 )[0 ]
143+
42144 device_ip = get_device_ip ()
43- proxy_ip = proxy .split (":" , 1 )[0 ].split ("://" , 1 )[- 1 ]
145+ public_ip = get_public_ip (proxy_info = {"proxy" : proxy })
146+
44147 if verbose :
45148 print (
46- f"Checking anonymity for proxy { proxy } (public_ip= { public_ip } , device_ip= { device_ip } , proxy_ip={ proxy_ip } ) "
149+ f"[INFO] proxy= { proxy } device_ip= { device_ip } public_ip= { public_ip } proxy_ip={ proxy_ip } "
47150 )
48151
49152 body = None
50153 protocols = ["http" , "socks4" , "socks5" , "https" ]
51154 tls_versions = ["1.0" , "1.1" , "1.2" , "1.3" ]
52155
53- # Try all protocols / TLS / judges until one returns body
156+ # ==============================
157+ # Try all combinations
158+ # ==============================
54159 for protocol in protocols :
55160 proxy_url = f"{ protocol } ://{ proxy } "
56161
57162 for tls in tls_versions :
58163 for judge in self .proxy_judges :
59164 if verbose :
60- print (f"Query { judge } via { proxy_url } TLS { tls } " )
165+ print (f"[TRY] { judge } via { proxy_url } TLS { tls } " )
61166
62167 result = send_query (
63168 url = judge ,
@@ -72,25 +177,32 @@ def get_anonymity(
72177 if result .response :
73178 body = result .response
74179 break
180+
75181 if body :
76182 break
77183 if body :
78184 break
79185
186+ # ❌ No response at all
80187 if not body :
81188 return AnonymityResult (
82- anonymity = None , remote_addr = None , device_ip = device_ip
189+ anonymity = None ,
190+ remote_addr = None ,
191+ device_ip = device_ip ,
192+ public_ip = public_ip ,
83193 )
84194
85195 return self .parse_anonymity (
86196 body = body ,
197+ proxy = proxy ,
87198 public_ip = public_ip ,
88199 device_ip = device_ip ,
89- proxy = proxy ,
90200 verbose = verbose ,
91201 )
92202
93-
203+ # ==============================
204+ # CORE PARSER
205+ # ==============================
94206 def parse_anonymity (
95207 self ,
96208 body : str ,
@@ -99,22 +211,46 @@ def parse_anonymity(
99211 device_ip : Optional [str ] = None ,
100212 verbose : bool = False ,
101213 ) -> AnonymityResult :
102- has_privacy = bool (PRIVACY_REGEX .search (body ))
103214
104215 if public_ip is None :
105216 public_ip = get_public_ip (proxy_info = {"proxy" : proxy })
106217 if device_ip is None :
107218 device_ip = get_device_ip ()
108219
109- if public_ip == device_ip :
110- anonymity = "Transparent"
111- else :
112- anonymity = "Anonymous" if has_privacy else "Elite"
220+ proxy_ip = proxy .split (":" , 1 )[0 ]
221+
222+ # ==============================
223+ # 🚀 Primary: AZEnv classification
224+ # ==============================
225+ result = classify_proxy_from_azenv (
226+ response = body ,
227+ expected_ip = proxy_ip ,
228+ )
229+
230+ # Inject known values
231+ result .device_ip = device_ip
232+
233+ # Prefer detected public_ip, fallback to external
234+ if not result .public_ip :
235+ result .public_ip = public_ip
236+
237+ # ==============================
238+ # 🧠 Fallback (only if AZEnv failed)
239+ # ==============================
240+ if result .anonymity is None :
241+ has_privacy = bool (PRIVACY_REGEX .search (body ))
242+
243+ if public_ip == device_ip :
244+ result .anonymity = "Transparent"
245+ else :
246+ result .anonymity = "Anonymous" if has_privacy else "Elite"
113247
114248 if verbose :
115- minified_body = body .replace ("\n " , " " ).replace ("\r " , " " )
249+ minified = body .replace ("\n " , " " ).replace ("\r " , " " )
116250 print (
117- f"Anonymity result for proxy { proxy } : { anonymity } (public_ip={ public_ip } , device_ip={ device_ip } , has_privacy_headers={ has_privacy } ), body={ minified_body } ..."
251+ f"[RESULT] proxy={ proxy } → { result .anonymity } "
252+ f"(remote={ result .remote_addr } , public={ result .public_ip } , device={ result .device_ip } ) "
253+ f"body={ minified [:300 ]} ..."
118254 )
119255
120- return AnonymityResult ( anonymity = anonymity , remote_addr = None , device_ip = device_ip )
256+ return result
0 commit comments