hello word
28 min read
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
JSON数据源IP归属地查询工具 - API池增强版(修复版)
功能:从JSON的`root>outbounds`数组中提取IP数据,使用API池自动切换查询归属地
输出格式:服务器tag | IP地址 | 中文归属地
"""
import os
import json
import requests
import time
from datetime import datetime
from random import shuffle
# ==================== 用户配置区域 ====================
# 请在此处修改配置,无需修改代码主体部分
# JSON数据文件路径(支持绝对路径和相对路径)
JSON_FILE_PATH = r"C:\Users\c\Videos\ip\subscription.txt" # 推荐:原始字符串
# JSON_FILE_PATH = "C:\\Users\\c\\Videos\\ip\\subscription.txt" # 双反斜杠
# JSON_FILE_PATH = "C:/Users/c/Videos/ip/subscription.txt" # 正斜杠
# 从第几个元素开始提取IP(索引从0开始)
START_INDEX = 2 # 跳过前2项(0-1),从第2项开始
# 结果输出目录(相对路径)
OUTPUT_DIR = "results" # 查询结果将保存在此目录
# API查询配置
API_TIMEOUT = 10 # 每个API的超时时间(秒)
API_DELAY = 1.0 # API查询间隔(秒)
MAX_RETRIES = 3 # 每个IP的最大重试次数
# ======================================================
class IPAPIPool:
"""IP查询API池,自动切换API直到成功"""
def __init__(self):
# 国际IP查询API列表(已测试稳定,替换国内不可用API)
self.api_pool = [
# ip-api.com(免费版,测试成功,响应0.722秒,支持中文)
{
'name': 'ip-api.com',
'url': 'http://ip-api.com/json/',
'params': {'fields': 'status,country,regionName,city,isp', 'lang': 'zh-CN'},
'parse': self.parse_ip_api_com
},
# api.ip.sb(测试可用,响应0.537秒,支持IPv6)
{
'name': 'api.ip.sb',
'url': 'https://api.ip.sb/geoip/',
'params': {},
'parse': self.parse_ip_sb
},
# ipapi.co(测试可用,免费1000次/天)
{
'name': 'ipapi.co',
'url': 'https://ipapi.co/',
'params': {'format': 'json'},
'parse': self.parse_ipapi_co
},
# IPinfo.io(行业稳定,需注册获取免费令牌,暂时注释)
# {
# 'name': 'IPinfo.io',
# 'url': 'https://ipinfo.io/',
# 'params': {'token': 'YOUR_TOKEN'}, # 替换为实际令牌
# 'parse': self.parse_ipinfo_io
# }
]
# # 随机打乱API顺序,避免集中请求同一个API
# shuffle(self.api_pool)
def query(self, ip):
"""查询IP归属地,自动切换API直到成功"""
for api in self.api_pool:
try:
print(f"🔍 尝试使用{api['name']}查询 {ip}...")
response = requests.get(
f"{api['url']}{ip}",
params=api['params'],
timeout=10,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
)
response.raise_for_status()
location, isp = api['parse'](response)
if location:
print(f"✅ {api['name']}查询成功")
return location, isp
except Exception as e:
print(f"❌ {api['name']}失败: {str(e)}")
continue
return "查询失败", "未知"
def parse_ip_api_com(self, response):
"""解析ip-api.com返回结果(测试成功)"""
try:
data = response.json()
if data.get('status') == 'success':
country = data.get('country', '未知')
province = data.get('regionName', '未知')
city = data.get('city', '未知')
isp = data.get('isp', '未知')
return f"{country}{province}{city}", isp
return "未知", "未知"
except Exception as e:
print(f"解析ip-api.com失败:{e}")
return "未知", "未知"
def parse_ip_sb(self, response):
"""解析api.ip.sb返回结果(测试可用)"""
try:
data = response.json()
country = data.get('country', '未知')
province = data.get('region', '未知')
city = data.get('city', '未知')
isp = data.get('isp', '未知')
return f"{country}{province}{city}", isp
except Exception as e:
print(f"解析api.ip.sb失败:{e}")
return "未知", "未知"
def parse_ipapi_co(self, response):
"""解析ipapi.co返回结果(测试可用)"""
try:
# 先检查响应内容是否为JSON
if response.headers.get('Content-Type', '').startswith('application/json'):
data = response.json()
country = data.get('country_name', '未知')
province = data.get('region', '未知')
city = data.get('city', '未知')
isp = data.get('org', '未知')
return f"{country}{province}{city}", isp
else:
print(f"⚠️ ipapi.co返回非JSON格式:{response.text[:100]}...")
return "未知", "未知"
except Exception as e:
print(f"解析ipapi.co失败:{e}")
return "未知", "未知"
def parse_ipinfo_io(self, response):
"""解析IPinfo.io返回结果(行业稳定)"""
try:
data = response.json()
country = data.get('country', '未知')
province = data.get('region', '未知')
city = data.get('city', '未知')
isp = data.get('org', '未知')
return f"{country}{province}{city}", isp
except Exception as e:
print(f"解析IPinfo.io失败:{e}")
return "未知", "未知"
class IPLocationQuery:
"""IP归属地查询器(使用API池自动切换)"""
def __init__(self, json_file_path, output_dir, start_index=2):
self.json_file_path = json_file_path
self.output_dir = output_dir
self.start_index = start_index # 从第几个元素开始提取
self.server_list = [] # 存储 [tag, ip] 对
self.api_pool = IPAPIPool() # 初始化API池
def is_valid_ipv4(self, ip):
"""严格校验IPv4地址格式"""
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
num = int(part)
if num < 0 or num > 255:
return False
if len(part) > 1 and part.startswith('0'): # 禁止前导零
return False
return True
def extract_ip_from_json(self):
"""从JSON的`root>outbounds`数组中提取IP数据"""
if not os.path.exists(self.json_file_path):
error_msg = f"❌ JSON文件 {self.json_file_path} 不存在"
print(error_msg)
return False
try:
with open(self.json_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 定位到outbounds数组
if not isinstance(data, dict) or 'outbounds' not in data:
print("❌ JSON结构中未找到outbounds数组")
return False
outbounds = data.get('outbounds', [])
if not isinstance(outbounds, list):
print("❌ outbounds不是数组类型")
return False
total_items = len(outbounds)
if self.start_index >= total_items:
print(f"❌ 开始索引{self.start_index}超出数组长度{total_items}")
return False
# 从指定索引开始提取IP数据
extracted_count = 0
skipped_count = 0
skipped_indices = []
print(f"📊 从索引{self.start_index}开始提取IP数据(共{total_items-self.start_index}项)")
for idx, item in enumerate(outbounds[self.start_index:], start=self.start_index):
if not isinstance(item, dict):
skipped_count += 1
skipped_indices.append(idx)
continue
# 从settings中获取IP地址(Xray/V2Ray格式)
ip = None
if 'settings' in item and isinstance(item['settings'], dict):
settings = item['settings']
# 处理VLESS/Vmess格式
if 'vnext' in settings and isinstance(settings['vnext'], list):
for vnext in settings['vnext']:
if 'address' in vnext:
ip = str(vnext['address']).strip()
break
# 处理Shadowsocks格式
elif 'servers' in settings and isinstance(settings['servers'], list):
for server in settings['servers']:
if 'address' in server:
ip = str(server['address']).strip()
break
# 处理Trojan格式
elif 'servers' in settings and isinstance(settings['servers'], list):
for server in settings['servers']:
if 'address' in server:
ip = str(server['address']).strip()
break
# 直接从address字段获取IP(简单格式)
elif 'address' in item:
ip = str(item['address']).strip()
# 直接从server字段获取IP(兼容之前格式)
elif 'server' in item:
ip = str(item['server']).strip()
# 获取服务器名(优先从tag字段,其次是name,最后是索引)
tag = None
if 'tag' in item:
tag = str(item['tag']).strip()
elif 'name' in item:
tag = str(item['name']).strip()
else:
tag = f"服务器{idx}"
if ip and self.is_valid_ipv4(ip):
self.server_list.append([tag, ip])
extracted_count += 1
print(f"✅ 提取成功: [{idx}] {tag} -> {ip}")
elif ip:
skipped_count += 1
skipped_indices.append(idx)
print(f"⚠️ 跳过无效IP: [{idx}] {tag} -> {ip}")
else:
skipped_count += 1
skipped_indices.append(idx)
print(f"⚠️ 未找到IP: [{idx}] {tag}")
if not self.server_list:
print("⚠️ 未找到有效IPv4地址,将使用示例数据")
self.server_list = [
["示例服务器-1", "114.114.114.114"],
["示例服务器-2", "8.8.8.8"],
["示例服务器-3", "220.181.38.148"]
]
print(f"\n📊 提取完成:成功 {extracted_count} 个,跳过 {skipped_count} 个")
if skipped_indices:
print(f"⚠️ 跳过的索引: {skipped_indices}")
return True
except json.JSONDecodeError as e:
error_msg = f"❌ JSON解析失败:{str(e)}"
print(error_msg)
return False
except Exception as e:
error_msg = f"❌ 读取文件失败:{str(e)}"
print(error_msg)
return False
def query_ip_location(self):
"""批量查询IP归属地并实时显示结果"""
results = []
print(f"\n🌍 开始查询 {len(self.server_list)} 个服务器IP归属地...")
print("="*60)
for idx, (tag, ip) in enumerate(self.server_list, 1):
try:
# 限速控制
if idx > 1:
time.sleep(API_DELAY)
location, isp = self.api_pool.query(ip)
output_line = f"{tag} | {ip} | {location}"
print(f"✅ {output_line}")
results.append({
'tag': tag,
'ip': ip,
'location': location,
'isp': isp
})
except Exception as e:
output_line = f"{tag} | {ip} | 查询异常: {str(e)}"
print(f"❌ {output_line}")
results.append({
'tag': tag,
'ip': ip,
'location': f"查询异常: {str(e)}",
'isp': '未知'
})
return results
def format_results(self, results):
"""格式化查询结果为精简文本"""
formatted = [
f"服务器IP归属地查询结果",
f"JSON文件: {os.path.basename(self.json_file_path)}",
f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"从索引{self.start_index}开始提取",
f"{'='*50}",
""
]
for result in results:
formatted.append(f"{result['tag']} | {result['ip']} | {result['location']}")
if result['isp'] and result['isp'] != '未知':
formatted.append(f" 网络运营商: {result['isp']}")
formatted.append(f"{'-'*60}")
return "\n".join(formatted)
def save_results(self, formatted_results):
"""保存结果到文件"""
os.makedirs(self.output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(self.output_dir, f"server_ip_pool_{timestamp}.txt")
try:
# 保存历史归档
with open(output_file, 'w', encoding='utf-8') as f:
f.write(formatted_results)
# 保存最新结果
latest_file = "server_ip_pool_latest.txt"
with open(latest_file, 'w', encoding='utf-8') as f:
f.write(formatted_results)
success_msg = (
f"\n🎉 结果已保存至:\n"
f" 📁 最新结果: {latest_file}\n"
f" 📁 历史归档: {output_file}"
)
print(f"\n{success_msg}")
return True, success_msg
except Exception as e:
error_msg = f"❌ 保存失败: {str(e)}"
print(f"\n{error_msg}")
return False, error_msg
def create_sample_json(self):
"""创建示例JSON文件(Xray/V2Ray格式)"""
sample_data = {
"outbounds": [
{
"tag": "direct",
"protocol": "freedom",
"settings": {
"domainStrategy": "UseIP"
}
},
{
"tag": "block",
"protocol": "blackhole",
"settings": {}
},
{
"tag": "xray-1",
"protocol": "vless",
"settings": {
"vnext": [
{
"address": "114.114.114.114",
"port": 443,
"users": [
{
"id": "xxxx-xxxx-xxxx-xxxx",
"encryption": "none"
}
]
}
]
}
},
{
"tag": "xray-2",
"protocol": "vmess",
"settings": {
"vnext": [
{
"address": "8.8.8.8",
"port": 443,
"users": [
{
"id": "xxxx-xxxx-xxxx-xxxx",
"alterId": 0
}
]
}
]
}
},
{
"tag": "xray-3",
"protocol": "shadowsocks",
"settings": {
"servers": [
{
"address": "220.181.38.148",
"port": 443,
"method": "chacha20-ietf-poly1305",
"password": "xxxx"
}
]
}
}
]
}
try:
# 先检查并创建目录
dir_path = os.path.dirname(self.json_file_path)
if dir_path and not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
print(f"💡 创建目录: {dir_path}")
with open(self.json_file_path, 'w', encoding='utf-8') as f:
json.dump(sample_data, f, ensure_ascii=False, indent=2)
print(f"💡 示例JSON文件已创建: {self.json_file_path}")
return True
except PermissionError:
error_msg = f"❌ 没有权限写入到目录: {os.path.dirname(self.json_file_path)}"
print(error_msg)
# 备选方案:创建到当前工作目录
current_dir = os.getcwd()
sample_file = os.path.join(current_dir, "sample_subscription.txt")
with open(sample_file, 'w', encoding='utf-8') as f:
json.dump(sample_data, f, ensure_ascii=False, indent=2)
print(f"💡 示例文件已创建到当前目录: {sample_file}")
print(f"⚠️ 请手动将文件复制到指定位置: {self.json_file_path}")
return False
except Exception as e:
error_msg = f"❌ 创建示例文件失败: {str(e)}"
print(error_msg)
# 备选方案:创建到当前工作目录
current_dir = os.getcwd()
sample_file = os.path.join(current_dir, "sample_subscription.txt")
try:
with open(sample_file, 'w', encoding='utf-8') as f:
json.dump(sample_data, f, ensure_ascii=False, indent=2)
print(f"💡 示例文件已创建到当前目录: {sample_file}")
print(f"⚠️ 请手动将文件复制到指定位置: {self.json_file_path}")
except Exception as e2:
print(f"❌ 创建备选示例文件也失败: {str(e2)}")
return False
def run(self):
"""主执行流程"""
print("🚀 JSON数据源IP归属地查询工具 - API池增强版")
print(f"📁 JSON文件路径: {self.json_file_path}")
print(f"📁 从索引{self.start_index}开始提取IP数据")
print(f"📁 API池包含: {', '.join(api['name'] for api in self.api_pool.api_pool)}")
print("-" * 60)
# 检查并创建示例文件
if not os.path.exists(self.json_file_path):
print(f"💡 JSON文件不存在,将创建示例文件(Xray/V2Ray格式)")
if not self.create_sample_json():
print("⚠️ 示例文件创建失败,请手动准备JSON文件")
# 直接使用内置示例数据进行演示
print("📋 将使用内置示例数据进行演示")
self.server_list = [
["xray-1", "114.114.114.114"],
["xray-2", "8.8.8.8"],
["xray-3", "220.181.38.148"]
]
# 继续执行查询流程
results = self.query_ip_location()
formatted = self.format_results(results)
self.save_results(formatted)
return True
# 提取IP和服务器名
if not self.extract_ip_from_json():
return False
if not self.server_list:
print("❌ 未找到有效服务器数据")
return False
# 查询IP归属地并实时显示
results = self.query_ip_location()
# 格式化并保存结果
formatted = self.format_results(results)
success, message = self.save_results(formatted)
if success:
print(f"\n🎉 任务完成!共处理 {len(self.server_list)} 个服务器")
return success
def main():
"""主函数"""
query_tool = IPLocationQuery(JSON_FILE_PATH, OUTPUT_DIR, START_INDEX)
success = query_tool.run()
if success:
print("\n🌟 程序正常退出")
exit(0)
else:
print("\n💥 程序异常退出")
exit(1)
if __name__ == "__main__":
main()
JSON数据源IP归属地查询工具 - API池增强版(修复版)
功能:从JSON的`root>outbounds`数组中提取IP数据,使用API池自动切换查询归属地
输出格式:服务器tag | IP地址 | 中文归属地