import json
import httpx
import os
from typing import Any
from mcp.server.fastmcp import FastMCP
import mcp.types as types

# 初始化 MCP 服务器
mcp = FastMCP(name="WeatherServer", host="0.0.0.0", port=8000)

# 从环境变量获取 API 密钥
API_KEY = os.getenv("OPENWEATHER_API_KEY")
if not API_KEY:
    raise ValueError("❌ 未设置 OPENWEATHER_API_KEY 环境变量")

# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
USER_AGENT = "weather-server/1.0"

async def fetch_weather(city: str) -> dict[str, Any]:
    """
    从 OpenWeather API 获取天气信息。
    :param city: 城市名称（支持英文或中文）
    :return: 天气数据字典
    """
    # 支持中文城市名查询
    if not city.isascii():
        # 中文城市名需要转换为拼音
        from pypinyin import pinyin, Style
        city_pinyin = ''.join([item[0] for item in pinyin(city, style=Style.NORMAL)])
        city = city_pinyin

    params = {
        "q": city,
        "appid": API_KEY,
        "units": "metric",
        "lang": "zh_cn"
    }
    headers = {"User-Agent": USER_AGENT}

    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(
                OPENWEATHER_API_BASE, 
                params=params, 
                headers=headers, 
                timeout=30.0
            )
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            error_msg = f"HTTP 错误: {e.response.status_code}"
            if e.response.status_code == 404:
                error_msg = "未找到该城市，请检查城市名称是否正确"
            elif e.response.status_code == 401:
                error_msg = "API 密钥无效，请检查 OPENWEATHER_API_KEY"
            return {"error": error_msg}
        except httpx.TimeoutException:
            return {"error": "请求超时，请稍后重试"}
        except Exception as e:
            return {"error": f"请求失败: {str(e)}"}

def format_weather(data: dict[str, Any]) -> str:
    """
    将天气数据格式化为易读文本。
    :param data: 天气数据字典
    :return: 格式化后的天气信息字符串
    """
    # 如果数据中包含错误信息，直接返回错误提示
    if "error" in data:
        return f"⚠️ {data['error']}"

    # 提取数据时做容错处理
    city = data.get("name", "未知")
    country = data.get("sys", {}).get("country", "未知")
    temp = data.get("main", {}).get("temp", "N/A")
    feels_like = data.get("main", {}).get("feels_like", "N/A")
    humidity = data.get("main", {}).get("humidity", "N/A")
    pressure = data.get("main", {}).get("pressure", "N/A")
    wind_speed = data.get("wind", {}).get("speed", "N/A")
    wind_deg = data.get("wind", {}).get("deg", "N/A")
    cloudiness = data.get("clouds", {}).get("all", "N/A")
    
    # 天气描述
    weather_list = data.get("weather", [{}])
    description = weather_list[0].get("description", "未知") if weather_list else "未知"
    
    # 日出日落时间
    sunrise = data.get("sys", {}).get("sunrise", None)
    sunset = data.get("sys", {}).get("sunset", None)
    
    from datetime import datetime
    if sunrise:
        sunrise = datetime.fromtimestamp(sunrise).strftime("%H:%M")
    if sunset:
        sunset = datetime.fromtimestamp(sunset).strftime("%H:%M")

    # 格式化输出
    return (
        f"🌍 {city}, {country}\n"
        f"🌤 天气: {description}\n"
        f"🌡 温度: {temp}°C (体感 {feels_like}°C)\n"
        f"💧 湿度: {humidity}%\n"
        f"📊 气压: {pressure} hPa\n"
        f"🌬 风速: {wind_speed} m/s, 风向: {wind_deg}°\n"
        f"☁️ 云量: {cloudiness}%\n"
        f"🌅 日出: {sunrise or '未知'}, 日落: {sunset or '未知'}"
    )

async def query_weather(
    name: str, 
    arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """
    查询指定城市的天气信息。
    :param city: 城市名称（支持英文或中文）
    :return: 格式化后的天气信息
    """
    city = arguments.get("city", "")
    if not city:
        return [types.TextContent(type="text", text="❌ 请提供城市名称")]
    
    data = await fetch_weather(city)
    formatted = format_weather(data)
    return [types.TextContent(type="text", text=formatted)]

async def query_weather_by_coords(
    name: str, 
    arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """
    根据经纬度查询天气信息。
    :param lat: 纬度
    :param lon: 经度
    :return: 格式化后的天气信息
    """
    lat = arguments.get("lat")
    lon = arguments.get("lon")
    
    if lat is None or lon is None:
        return [types.TextContent(type="text", text="❌ 请提供纬度和经度")]
    
    params = {
        "lat": lat,
        "lon": lon,
        "appid": API_KEY,
        "units": "metric",
        "lang": "zh_cn"
    }
    headers = {"User-Agent": USER_AGENT}

    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(
                OPENWEATHER_API_BASE, 
                params=params, 
                headers=headers, 
                timeout=30.0
            )
            response.raise_for_status()
            data = response.json()
            formatted = format_weather(data)
            return [types.TextContent(type="text", text=formatted)]
        except Exception as e:
            return [types.TextContent(type="text", text=f"⚠️ 查询失败: {str(e)}")]

async def list_tools() -> list[types.Tool]:
    """
    列出所有可用的天气查询工具。
    """
    return [
        types.Tool(
            name="query_weather",
            description="查询指定城市的天气信息，支持中文和英文城市名",
            inputSchema={
                "type": "object",
                "required": ["city"],
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称（如：北京、Tokyo、New York）",
                    }
                },
            }
        ),
        types.Tool(
            name="query_weather_by_coords",
            description="根据经纬度坐标查询天气信息",
            inputSchema={
                "type": "object",
                "required": ["lat", "lon"],
                "properties": {
                    "lat": {
                        "type": "number",
                        "description": "纬度（-90 到 90）",
                    },
                    "lon": {
                        "type": "number",
                        "description": "经度（-180 到 180）",
                    },
                },
            }
        )
    ]

async def dispatch(
    name: str, 
    arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """
    根据工具名称调度对应的天气查询函数。
    """
    match name:
        case "query_weather":
            return await query_weather(name, arguments)
        case "query_weather_by_coords":
            return await query_weather_by_coords(name, arguments)
        case _:
            return [types.TextContent(type="text", text=f"⚠️ 未知工具: {name}")]

# 注册工具列表函数
mcp._mcp_server.list_tools()(list_tools)
# 注册工具调度函数
mcp._mcp_server.call_tool()(dispatch)

if __name__ == "__main__":
    mcp.run(transport='stdio')