import os
import ftplib
import ssl
import gzip
import shutil
import time
from threading import Thread
import sys
# 获取命令行参数
year = 2024 # 输入的年份
start_doy = 1 # 开始年积日
end_doy = 366 # 结束年积日
local_base_dir = os.path.abspath(r"../data/rinex") # 下载的根目录
# 测站列表
# station_list = [
# "hkcl", "hkfn", "hkkt", "hklm", "hklt", "hkmw", "hknp", "hkoh",
# "hkpc", "hkqt", "hksc", "hksl", "hkss", "hkst", "hktk", "hkws", "kyc1", "t430"
# ]
station_list = [
"hkcl"
]
# FTP服务器地址和端口号
host = 'rinex.geodetic.gov.hk'
port = 990
class ImplicitFTP_TLS(ftplib.FTP_TLS):
"""支持TLS1.2的FTP客户端"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._sock = None
self.context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
@property
def sock(self):
return self._sock
@sock.setter
def sock(self, value):
if value is not None and not isinstance(value, ssl.SSLSocket):
value = self.context.wrap_socket(value)
self._sock = value
def connect_with_retry(host, port):
"""连接FTP服务器,自动重连"""
while True:
try:
ftp = ImplicitFTP_TLS()
ftp.connect(host=host, port=port)
ftp.login()
ftp.prot_p()
print("FTP连接成功")
return ftp
except Exception as e:
print(f"连接失败: {e},正在重新尝试连接...")
time.sleep(5)
def download_with_timeout(ftp, remote_file_path, local_file_path, timeout=300):
"""下载文件,增加超时机制"""
def download():
try:
with open(local_file_path, 'wb') as local_file:
ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
print(f"文件下载成功: {remote_file_path}")
except Exception as e:
print(f"下载失败: {remote_file_path} - {e}")
download_thread = Thread(target=download)
download_thread.start()
download_thread.join(timeout=timeout)
# 如果超时未完成,记录失败并返回
if download_thread.is_alive():
print(f"文件下载超时,终止: {remote_file_path}")
return False
return True
def safe_nlst(ftp, remote_dir, retries=3):
"""安全列出远程目录内容,支持重试机制"""
for attempt in range(retries):
try:
ftp.cwd(remote_dir)
file_list = ftp.nlst()
print(f"成功列出目录内容: {remote_dir}")
return file_list
except Exception as e:
print(f"无法列出目录或访问失败: {remote_dir} - {e},正在重试 ({attempt + 1}/{retries})")
if attempt < retries - 1: # 若未到最后一次重试,则重新连接FTP
ftp = connect_with_retry(host, port)
else:
print(f"目录访问失败,跳过: {remote_dir}")
return []
def rename_file(original_file_name, year):
"""重命名文件为标准 RINEX 格式"""
base_name = original_file_name[:-3] # 去掉 .gz 后缀
if len(base_name) < 19:
raise ValueError(f"文件名长度不足,无法提取必要信息: {original_file_name}")
# 提取年积日信息
digits = base_name[14:19] # 假设文件名中第 14-18 个字符为年积日信息
year_suffix = int(digits[:2]) # 提取前两位年份信息(如 23 表示 2023)
doy_str = digits[2:5] # 提取后三位年积日
# 提取站点名(前 4 个字符)
station_name = base_name[:4].lower()
# 构造新文件名
new_file_name = f"{station_name}{doy_str}0.{year_suffix}d"
return new_file_name
def rename_and_extract(local_file_path, local_doy_dir):
"""解压并重命名文件"""
if local_file_path.endswith('.gz'):
try:
with gzip.open(local_file_path, 'rb') as f_in:
base_name = os.path.basename(local_file_path)[:-3]
new_file_name = rename_file(base_name, year)
new_file_path = os.path.join(local_doy_dir, new_file_name)
with open(new_file_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(local_file_path)
print(f"文件解压并重命名成功: {new_file_name}")
except Exception as e:
print(f"解压失败: {local_file_path} - {e}")
def file_exists(local_doy_dir, file_name):
"""检查本地目录中是否已存在解压后的文件"""
base_name = file_name[:-3] # 去掉 .gz 后缀
extracted_file_name = rename_file(base_name, year)
extracted_file_path = os.path.join(local_doy_dir, extracted_file_name)
return os.path.exists(extracted_file_path)
def download_and_process_file(ftp, remote_file_path, local_file_path, local_doy_dir, retries=3, timeout=300):
"""下载文件并处理,支持超时和重试机制"""
file_name = os.path.basename(remote_file_path)
if file_exists(local_doy_dir, file_name): # 检查文件是否已存在
print(f"文件已存在,跳过下载: {file_name}")
return # 如果存在,直接跳过
for attempt in range(retries):
try:
# 添加超时机制的下载
success = download_with_timeout(ftp, remote_file_path, local_file_path, timeout=timeout)
if not success:
raise TimeoutError(f"文件下载超时: {remote_file_path}")
rename_and_extract(local_file_path, local_doy_dir)
return # 下载成功则退出函数
except TimeoutError as e:
print(f"下载超时: {remote_file_path},正在重试 ({attempt + 1}/{retries}) - {e}")
except Exception as e:
print(f"下载或处理失败: {remote_file_path} - {e},正在重试 ({attempt + 1}/{retries})")
# 每次失败后重新连接FTP
ftp = connect_with_retry(host, port)
print(f"文件下载最终失败: {remote_file_path}")
def download_rinex_data(host, port, year, start_doy, end_doy):
"""下载RINEX数据"""
for doy in range(start_doy, end_doy + 1):
ddd = f"{doy:03d}"
local_doy_dir = os.path.join(local_base_dir, f"{ddd}")
if not os.path.exists(local_doy_dir):
os.makedirs(local_doy_dir)
for station_folder in station_list:
remote_station_dir = f"/rinex3/{year}/{ddd}/{station_folder}/30s"
print(f"正在处理测站: {station_folder},远程目录: {remote_station_dir}")
ftp = connect_with_retry(host, port)
file_list = safe_nlst(ftp, remote_station_dir) # 使用安全列出函数
ftp.quit()
if not file_list: # 若目录列出失败,则跳过该测站
continue
for file_name in file_list:
if file_name.endswith('.gz'):
remote_file_path = f"{remote_station_dir}/{file_name}"
local_file_path = os.path.join(local_doy_dir, file_name)
print(f"正在下载文件: {remote_file_path}")
ftp = connect_with_retry(host, port)
download_and_process_file(ftp, remote_file_path, local_file_path, local_doy_dir)
def check_and_clean_incomplete_files(local_base_dir):
"""检查并清理不完整的文件,包括残留的压缩包和解压后的文件"""
for root, _, files in os.walk(local_base_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
if file_name.endswith('.gz'): # 找到压缩包文件
base_name = file_name[:-3] # 去掉 .gz 后缀
extracted_file_path = os.path.join(root, rename_file(base_name, year)) # 找到解压后的文件
if os.path.exists(extracted_file_path): # 如果解压后的文件存在
print(f"发现不完整的文件对: 压缩包 {file_path} 和解压文件 {extracted_file_path}")
try:
os.remove(file_path) # 删除压缩包
os.remove(extracted_file_path) # 删除解压文件
print(f"已删除不完整文件对: {file_path} 和 {extracted_file_path}")
except Exception as e:
print(f"删除文件失败: {file_path} 或 {extracted_file_path} - {e}")
def reattempt_missing_stations(host, port, year, station_list, local_base_dir):
"""重新下载缺失的测站数据"""
for ddd in os.listdir(local_base_dir): # 遍历所有本地年积日文件夹
local_doy_dir = os.path.join(local_base_dir, ddd)
if not os.path.isdir(local_doy_dir): # 跳过非目录
continue
print(f"检查年积日 {ddd} 文件夹内容...")
missing_stations = []
# 检查缺失的测站数据
for station in station_list:
expected_files = [
file for file in os.listdir(local_doy_dir)
if file.startswith(station) and file.endswith('.23d')
]
# 如果该测站数据缺失,或者文件不完整(存在 .gz)
if not expected_files or any(file.endswith('.gz') for file in os.listdir(local_doy_dir)):
missing_stations.append(station)
if not missing_stations:
print(f"年积日 {ddd} 所有测站数据已完整。")
continue
print(f"年积日 {ddd} 缺失测站数据: {missing_stations}")
# 开始重新下载缺失的测站数据
for station_folder in missing_stations:
remote_station_dir = f"/rinex3/{year}/{ddd}/{station_folder}/30s"
print(f"重新处理测站: {station_folder},远程目录: {remote_station_dir}")
ftp = connect_with_retry(host, port)
try:
file_list = safe_nlst(ftp, remote_station_dir) # 列出远程目录
ftp.quit()
except Exception as e:
print(f"无法访问远程目录: {remote_station_dir} - {e}")
continue
for file_name in file_list:
if file_name.endswith('.gz'):
remote_file_path = f"{remote_station_dir}/{file_name}" # 生成远程文件路径
local_file_path = os.path.join(local_doy_dir, file_name) # 生成本地文件路径
print(f"正在重新下载文件: {remote_file_path}")
ftp = connect_with_retry(host, port)
download_and_process_file(ftp, remote_file_path, local_file_path, local_doy_dir)
# 调用主函数
download_rinex_data(host, port, year, start_doy, end_doy) # 初次下载
check_and_clean_incomplete_files(local_base_dir) # 清理不完整文件
reattempt_missing_stations(host, port, year, station_list, local_base_dir) # 重新检查并下载缺失数据
GNSS-ISSEG