Skip to main content

hello word

JSON数据源IP归属地查询工具 - API池增强版(修复版

py tools get ip local

28 min read

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
JSON数据源IP归属地查询工具 - API池增强版(修复版)
功能:从JSON的`root>outbounds`数组中提取IP数据,使用API池自动切换查询归属地
输出格式:服务器tag | IP地址 | 中文归属地
"""
import os
import json
import requests
import time
from datetime import datetime
from random import shuffle
# ==================== 用户配置区域 ====================
# 请在此处修改配置,无需修改代码主体部分
# JSON数据文件路径(支持绝对路径和相对路径)
JSON_FILE_PATH = r"C:\Users\c\Videos\ip\subscription.txt"  # 推荐:原始字符串
# JSON_FILE_PATH = "C:\\Users\\c\\Videos\\ip\\subscription.txt"  # 双反斜杠
# JSON_FILE_PATH = "C:/Users/c/Videos/ip/subscription.txt"  # 正斜杠
# 从第几个元素开始提取IP(索引从0开始)
START_INDEX = 2  # 跳过前2项(0-1),从第2项开始
# 结果输出目录(相对路径)
OUTPUT_DIR = "results"  # 查询结果将保存在此目录
# API查询配置
API_TIMEOUT = 10  # 每个API的超时时间(秒)
API_DELAY = 1.0  # API查询间隔(秒)
MAX_RETRIES = 3  # 每个IP的最大重试次数
# ======================================================
class IPAPIPool:
    """IP查询API池,自动切换API直到成功"""
    
    def __init__(self):
        # 国际IP查询API列表(已测试稳定,替换国内不可用API)
        self.api_pool = [
            # ip-api.com(免费版,测试成功,响应0.722秒,支持中文)
            {
                'name': 'ip-api.com',
                'url': 'http://ip-api.com/json/',
                'params': {'fields': 'status,country,regionName,city,isp', 'lang': 'zh-CN'},
                'parse': self.parse_ip_api_com
            },
            # api.ip.sb(测试可用,响应0.537秒,支持IPv6)
            {
                'name': 'api.ip.sb',
                'url': 'https://api.ip.sb/geoip/',
                'params': {},
                'parse': self.parse_ip_sb
            },
            # ipapi.co(测试可用,免费1000次/天)
            {
                'name': 'ipapi.co',
                'url': 'https://ipapi.co/',
                'params': {'format': 'json'},
                'parse': self.parse_ipapi_co
            },
            # IPinfo.io(行业稳定,需注册获取免费令牌,暂时注释)
            # {
            #     'name': 'IPinfo.io',
            #     'url': 'https://ipinfo.io/',
            #     'params': {'token': 'YOUR_TOKEN'},  # 替换为实际令牌
            #     'parse': self.parse_ipinfo_io
            # }
        ]
#        # 随机打乱API顺序,避免集中请求同一个API
#        shuffle(self.api_pool)
    
    def query(self, ip):
        """查询IP归属地,自动切换API直到成功"""
        for api in self.api_pool:
            try:
                print(f"🔍 尝试使用{api['name']}查询 {ip}...")
                response = requests.get(
                    f"{api['url']}{ip}",
                    params=api['params'],
                    timeout=10,
                    headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
                )
                response.raise_for_status()
                location, isp = api['parse'](response)
                if location:
                    print(f"✅ {api['name']}查询成功")
                    return location, isp
            except Exception as e:
                print(f"❌ {api['name']}失败: {str(e)}")
                continue
        return "查询失败", "未知"
    
    def parse_ip_api_com(self, response):
        """解析ip-api.com返回结果(测试成功)"""
        try:
            data = response.json()
            if data.get('status') == 'success':
                country = data.get('country', '未知')
                province = data.get('regionName', '未知')
                city = data.get('city', '未知')
                isp = data.get('isp', '未知')
                return f"{country}{province}{city}", isp
            return "未知", "未知"
        except Exception as e:
            print(f"解析ip-api.com失败:{e}")
            return "未知", "未知"
    
    def parse_ip_sb(self, response):
        """解析api.ip.sb返回结果(测试可用)"""
        try:
            data = response.json()
            country = data.get('country', '未知')
            province = data.get('region', '未知')
            city = data.get('city', '未知')
            isp = data.get('isp', '未知')
            return f"{country}{province}{city}", isp
        except Exception as e:
            print(f"解析api.ip.sb失败:{e}")
            return "未知", "未知"
    
    def parse_ipapi_co(self, response):
        """解析ipapi.co返回结果(测试可用)"""
        try:
            # 先检查响应内容是否为JSON
            if response.headers.get('Content-Type', '').startswith('application/json'):
                data = response.json()
                country = data.get('country_name', '未知')
                province = data.get('region', '未知')
                city = data.get('city', '未知')
                isp = data.get('org', '未知')
                return f"{country}{province}{city}", isp
            else:
                print(f"⚠️ ipapi.co返回非JSON格式:{response.text[:100]}...")
                return "未知", "未知"
        except Exception as e:
            print(f"解析ipapi.co失败:{e}")
            return "未知", "未知"
    
    def parse_ipinfo_io(self, response):
        """解析IPinfo.io返回结果(行业稳定)"""
        try:
            data = response.json()
            country = data.get('country', '未知')
            province = data.get('region', '未知')
            city = data.get('city', '未知')
            isp = data.get('org', '未知')
            return f"{country}{province}{city}", isp
        except Exception as e:
            print(f"解析IPinfo.io失败:{e}")
            return "未知", "未知"
class IPLocationQuery:
    """IP归属地查询器(使用API池自动切换)"""
    
    def __init__(self, json_file_path, output_dir, start_index=2):
        self.json_file_path = json_file_path
        self.output_dir = output_dir
        self.start_index = start_index  # 从第几个元素开始提取
        self.server_list = []  # 存储 [tag, ip] 对
        self.api_pool = IPAPIPool()  # 初始化API池
        
    def is_valid_ipv4(self, ip):
        """严格校验IPv4地址格式"""
        parts = ip.split('.')
        if len(parts) != 4:
            return False
        for part in parts:
            if not part.isdigit():
                return False
            num = int(part)
            if num < 0 or num > 255:
                return False
            if len(part) > 1 and part.startswith('0'):  # 禁止前导零
                return False
        return True
    
    def extract_ip_from_json(self):
        """从JSON的`root>outbounds`数组中提取IP数据"""
        if not os.path.exists(self.json_file_path):
            error_msg = f"❌ JSON文件 {self.json_file_path} 不存在"
            print(error_msg)
            return False
        
        try:
            with open(self.json_file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            # 定位到outbounds数组
            if not isinstance(data, dict) or 'outbounds' not in data:
                print("❌ JSON结构中未找到outbounds数组")
                return False
            
            outbounds = data.get('outbounds', [])
            if not isinstance(outbounds, list):
                print("❌ outbounds不是数组类型")
                return False
            
            total_items = len(outbounds)
            if self.start_index >= total_items:
                print(f"❌ 开始索引{self.start_index}超出数组长度{total_items}")
                return False
            
            # 从指定索引开始提取IP数据
            extracted_count = 0
            skipped_count = 0
            skipped_indices = []
            
            print(f"📊 从索引{self.start_index}开始提取IP数据(共{total_items-self.start_index}项)")
            
            for idx, item in enumerate(outbounds[self.start_index:], start=self.start_index):
                if not isinstance(item, dict):
                    skipped_count += 1
                    skipped_indices.append(idx)
                    continue
                    
                # 从settings中获取IP地址(Xray/V2Ray格式)
                ip = None
                if 'settings' in item and isinstance(item['settings'], dict):
                    settings = item['settings']
                    # 处理VLESS/Vmess格式
                    if 'vnext' in settings and isinstance(settings['vnext'], list):
                        for vnext in settings['vnext']:
                            if 'address' in vnext:
                                ip = str(vnext['address']).strip()
                                break
                    # 处理Shadowsocks格式
                    elif 'servers' in settings and isinstance(settings['servers'], list):
                        for server in settings['servers']:
                            if 'address' in server:
                                ip = str(server['address']).strip()
                                break
                    # 处理Trojan格式
                    elif 'servers' in settings and isinstance(settings['servers'], list):
                        for server in settings['servers']:
                            if 'address' in server:
                                ip = str(server['address']).strip()
                                break
                # 直接从address字段获取IP(简单格式)
                elif 'address' in item:
                    ip = str(item['address']).strip()
                # 直接从server字段获取IP(兼容之前格式)
                elif 'server' in item:
                    ip = str(item['server']).strip()
                
                # 获取服务器名(优先从tag字段,其次是name,最后是索引)
                tag = None
                if 'tag' in item:
                    tag = str(item['tag']).strip()
                elif 'name' in item:
                    tag = str(item['name']).strip()
                else:
                    tag = f"服务器{idx}"
                
                if ip and self.is_valid_ipv4(ip):
                    self.server_list.append([tag, ip])
                    extracted_count += 1
                    print(f"✅ 提取成功: [{idx}] {tag} -> {ip}")
                elif ip:
                    skipped_count += 1
                    skipped_indices.append(idx)
                    print(f"⚠️ 跳过无效IP: [{idx}] {tag} -> {ip}")
                else:
                    skipped_count += 1
                    skipped_indices.append(idx)
                    print(f"⚠️ 未找到IP: [{idx}] {tag}")
            
            if not self.server_list:
                print("⚠️ 未找到有效IPv4地址,将使用示例数据")
                self.server_list = [
                    ["示例服务器-1", "114.114.114.114"],
                    ["示例服务器-2", "8.8.8.8"],
                    ["示例服务器-3", "220.181.38.148"]
                ]
            
            print(f"\n📊 提取完成:成功 {extracted_count} 个,跳过 {skipped_count} 个")
            if skipped_indices:
                print(f"⚠️ 跳过的索引: {skipped_indices}")
            
            return True
            
        except json.JSONDecodeError as e:
            error_msg = f"❌ JSON解析失败:{str(e)}"
            print(error_msg)
            return False
        except Exception as e:
            error_msg = f"❌ 读取文件失败:{str(e)}"
            print(error_msg)
            return False
    
    def query_ip_location(self):
        """批量查询IP归属地并实时显示结果"""
        results = []
        print(f"\n🌍 开始查询 {len(self.server_list)} 个服务器IP归属地...")
        print("="*60)
        
        for idx, (tag, ip) in enumerate(self.server_list, 1):
            try:
                # 限速控制
                if idx > 1:
                    time.sleep(API_DELAY)
                
                location, isp = self.api_pool.query(ip)
                output_line = f"{tag} | {ip} | {location}"
                print(f"✅ {output_line}")
                
                results.append({
                    'tag': tag,
                    'ip': ip,
                    'location': location,
                    'isp': isp
                })
                
            except Exception as e:
                output_line = f"{tag} | {ip} | 查询异常: {str(e)}"
                print(f"❌ {output_line}")
                results.append({
                    'tag': tag,
                    'ip': ip,
                    'location': f"查询异常: {str(e)}",
                    'isp': '未知'
                })
        
        return results
    
    def format_results(self, results):
        """格式化查询结果为精简文本"""
        formatted = [
            f"服务器IP归属地查询结果",
            f"JSON文件: {os.path.basename(self.json_file_path)}",
            f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            f"从索引{self.start_index}开始提取",
            f"{'='*50}",
            ""
        ]
        
        for result in results:
            formatted.append(f"{result['tag']} | {result['ip']} | {result['location']}")
            if result['isp'] and result['isp'] != '未知':
                formatted.append(f"   网络运营商: {result['isp']}")
            formatted.append(f"{'-'*60}")
        
        return "\n".join(formatted)
    
    def save_results(self, formatted_results):
        """保存结果到文件"""
        os.makedirs(self.output_dir, exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = os.path.join(self.output_dir, f"server_ip_pool_{timestamp}.txt")
        
        try:
            # 保存历史归档
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(formatted_results)
            
            # 保存最新结果
            latest_file = "server_ip_pool_latest.txt"
            with open(latest_file, 'w', encoding='utf-8') as f:
                f.write(formatted_results)
                
            success_msg = (
                f"\n🎉 结果已保存至:\n"
                f"   📁 最新结果: {latest_file}\n"
                f"   📁 历史归档: {output_file}"
            )
            print(f"\n{success_msg}")
            return True, success_msg
            
        except Exception as e:
            error_msg = f"❌ 保存失败: {str(e)}"
            print(f"\n{error_msg}")
            return False, error_msg
    
    def create_sample_json(self):
        """创建示例JSON文件(Xray/V2Ray格式)"""
        sample_data = {
            "outbounds": [
                {
                    "tag": "direct",
                    "protocol": "freedom",
                    "settings": {
                        "domainStrategy": "UseIP"
                    }
                },
                {
                    "tag": "block",
                    "protocol": "blackhole",
                    "settings": {}
                },
                {
                    "tag": "xray-1",
                    "protocol": "vless",
                    "settings": {
                        "vnext": [
                            {
                                "address": "114.114.114.114",
                                "port": 443,
                                "users": [
                                    {
                                        "id": "xxxx-xxxx-xxxx-xxxx",
                                        "encryption": "none"
                                    }
                                ]
                            }
                        ]
                    }
                },
                {
                    "tag": "xray-2",
                    "protocol": "vmess",
                    "settings": {
                        "vnext": [
                            {
                                "address": "8.8.8.8",
                                "port": 443,
                                "users": [
                                    {
                                        "id": "xxxx-xxxx-xxxx-xxxx",
                                        "alterId": 0
                                    }
                                ]
                            }
                        ]
                    }
                },
                {
                    "tag": "xray-3",
                    "protocol": "shadowsocks",
                    "settings": {
                        "servers": [
                            {
                                "address": "220.181.38.148",
                                "port": 443,
                                "method": "chacha20-ietf-poly1305",
                                "password": "xxxx"
                            }
                        ]
                    }
                }
            ]
        }
        
        try:
            # 先检查并创建目录
            dir_path = os.path.dirname(self.json_file_path)
            if dir_path and not os.path.exists(dir_path):
                os.makedirs(dir_path, exist_ok=True)
                print(f"💡 创建目录: {dir_path}")
            
            with open(self.json_file_path, 'w', encoding='utf-8') as f:
                json.dump(sample_data, f, ensure_ascii=False, indent=2)
            print(f"💡 示例JSON文件已创建: {self.json_file_path}")
            return True
        except PermissionError:
            error_msg = f"❌ 没有权限写入到目录: {os.path.dirname(self.json_file_path)}"
            print(error_msg)
            # 备选方案:创建到当前工作目录
            current_dir = os.getcwd()
            sample_file = os.path.join(current_dir, "sample_subscription.txt")
            with open(sample_file, 'w', encoding='utf-8') as f:
                json.dump(sample_data, f, ensure_ascii=False, indent=2)
            print(f"💡 示例文件已创建到当前目录: {sample_file}")
            print(f"⚠️ 请手动将文件复制到指定位置: {self.json_file_path}")
            return False
        except Exception as e:
            error_msg = f"❌ 创建示例文件失败: {str(e)}"
            print(error_msg)
            # 备选方案:创建到当前工作目录
            current_dir = os.getcwd()
            sample_file = os.path.join(current_dir, "sample_subscription.txt")
            try:
                with open(sample_file, 'w', encoding='utf-8') as f:
                    json.dump(sample_data, f, ensure_ascii=False, indent=2)
                print(f"💡 示例文件已创建到当前目录: {sample_file}")
                print(f"⚠️ 请手动将文件复制到指定位置: {self.json_file_path}")
            except Exception as e2:
                print(f"❌ 创建备选示例文件也失败: {str(e2)}")
            return False
    
    def run(self):
        """主执行流程"""
        print("🚀 JSON数据源IP归属地查询工具 - API池增强版")
        print(f"📁 JSON文件路径: {self.json_file_path}")
        print(f"📁 从索引{self.start_index}开始提取IP数据")
        print(f"📁 API池包含: {', '.join(api['name'] for api in self.api_pool.api_pool)}")
        print("-" * 60)
        
        # 检查并创建示例文件
        if not os.path.exists(self.json_file_path):
            print(f"💡 JSON文件不存在,将创建示例文件(Xray/V2Ray格式)")
            if not self.create_sample_json():
                print("⚠️ 示例文件创建失败,请手动准备JSON文件")
                # 直接使用内置示例数据进行演示
                print("📋 将使用内置示例数据进行演示")
                self.server_list = [
                    ["xray-1", "114.114.114.114"],
                    ["xray-2", "8.8.8.8"],
                    ["xray-3", "220.181.38.148"]
                ]
                # 继续执行查询流程
                results = self.query_ip_location()
                formatted = self.format_results(results)
                self.save_results(formatted)
                return True
        
        # 提取IP和服务器名
        if not self.extract_ip_from_json():
            return False
        
        if not self.server_list:
            print("❌ 未找到有效服务器数据")
            return False
        
        # 查询IP归属地并实时显示
        results = self.query_ip_location()
        
        # 格式化并保存结果
        formatted = self.format_results(results)
        success, message = self.save_results(formatted)
        
        if success:
            print(f"\n🎉 任务完成!共处理 {len(self.server_list)} 个服务器")
        
        return success
def main():
    """主函数"""
    query_tool = IPLocationQuery(JSON_FILE_PATH, OUTPUT_DIR, START_INDEX)
    success = query_tool.run()
    
    if success:
        print("\n🌟 程序正常退出")
        exit(0)
    else:
        print("\n💥 程序异常退出")
        exit(1)
if __name__ == "__main__":
    main()

JSON数据源IP归属地查询工具 - API池增强版(修复版)
功能:从JSON的`root>outbounds`数组中提取IP数据,使用API池自动切换查询归属地
输出格式:服务器tag | IP地址 | 中文归属地