最近从课题组那里接手了一堆数据集,准备做数据入库处理。我拿到的是 1km(0.083°) 分辨率的 DEM 栅格数据、TIF 栅格数据等,包括地形、地表特征、气象参数等。由于我拿到的数据是偏向于建筑节能研究使用的数据,还需要经过一些前期处理,以及坐标确认。
0. 什么是 DEM 和 TIF?
DEM 是一种表示地形高度数据的数字模型。它通常是栅格格式,每个像元(像素)表示地球表面某一点的海拔高度。TIF 是一种通用的图像文件格式,它可以存储各种类型的图像数据,包括照片、地图、扫描图像等。简单来说,TIF 是 DEM 数据的多种储存格式之一。
1. 添加数据
使用 ArcGIS Map 在图层处右键「添加数据」,将地形、地表、气象参数的 .tif 文件全部添加进来。在 ArcMap 中,在 ArcToolbox 中找到 转换工具 > 栅格 > 栅格转点,使用 “栅格转点” (Raster to Point) 工具,将 .tif 栅格数据转换为点要素图层,转换完后你就会得到 .shp 文件,这个就是点文件。
我的数据点从地形和基地坐标来看,就大约有一千五百万个点。这个过程很漫长,得等好几个小时。我也不知道什么原因,同时电脑的 CPU、内存、硬盘读写占用都不太高,你可以去喝杯咖啡休息休息。我想,可能是由于 ArcGIS 在处理大量数据时,尤其是对点要素添加经纬度字段(即计算几何)的操作,可能会变得非常慢。尽管 CPU、内存和硬盘使用率看似不高。
如此重复,把所有的 .tif 数据都进行这样的处理。如果你的 .tif 数据包括了多个字段,很遗憾,ArcGIS 每次栅格转点只能支持一个「字段」。
2. 导入 .shp
在保留之前的基地坐标图层,然后继续在图层处右键「添加数据」,把所有的 .shp 文件添加进去,右键打开「属性表」,你就能看见所有的离散点以及对应的值。
其中,FID(Feature ID)是 ArcGIS 自动生成的字段,代表 Feature ID(要素 ID)。它是每个要素(点、线、面)的唯一标识符。FID 字段通常是从 0 开始递增的整数(类似于 MySQL 这类数据库的自增 id),再导出时可以忽略掉它。
如果你有逐个导出了多个「字段」的栅格转点,你可以将这些 .shp 文件整合起来,最后再一起生成经纬度。在 ArcToolbox 中,展开 Data Management Tools > General > Merge(这里我切英文版本了,因为这里我出现了 ERROR 999999: 执行函数时出错的问题,可能跟路径和环境语言有关),合并 .shp。当然你也可以逐个生成经纬度后逐个导出。
3. 生成经纬度
当你打开「属性表」的时候,可以发现是没有经纬度的,因为此时的数据在基地坐标上(我的基地坐标是 WGS 1984 坐标系),然后我们来生成经纬度。
在属性表中,点击右上角的「表选项」,选择「添加字段」(Add Field)。创建两个新字段,分别命名为 Longitude 和 Latitude,数据类型选择 Double(双精度)。
右键点击刚刚创建的 Longitude 字段,选择「计算几何」(Calculate Geometry)。在对话框中选择 X 坐标(Longitude),单位选择度。同理,对 Latitude 字段重复上述操作,但选择 Y 坐标(Latitude)。
同样,数据量大的话,需要经过漫长的等待,再去续杯咖啡吧~
3.1 坐标变换
如果有的图层用的不是一个坐标系,例如我的 LUCC.shp 地表分类栅格数据集,需要在生成经纬度时进行坐标转换。
4. 导出文本文件
在导出格式中就文本文件比较友好,最后由 Python 处理文本文件导入到数据库就行。
5. 导入数据库
这是一个 .txt 数据导入 MySQL 的参考代码,这里使用了多线程和线程池,你可以根据你服务器的配置进行调整。max_workers 是最大线程数,默认设置了 5;batch_size 是批量插入数据的大小,batch_size = 10000 是一次性写入 10000 条数据。支持断点续传,会自动从数据库最后一个数据接着上传。
import mysql.connector
from concurrent.futures import ThreadPoolExecutor
import threading
import time
from decimal import Decimal
# MySQL 数据库连接信息
db_config = {
'host': '数据库地址',
'user': '用户名',
'password': '密码',
'database': '数据库名',
'port': 3306
}
# 定义批量插入的大小
batch_size = 10000
# 全局计数器
success_count = 0
failure_count = 0
lock = threading.Lock()
stop_event = threading.Event()
# 获取数据库中最后一个 pointid
def get_last_pointid():
try:
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
cursor.execute("SELECT MAX(pointid) FROM db_fundamental_data")
result = cursor.fetchone()
cursor.close()
connection.close()
return result[0] if result[0] is not None else None
except Exception as e:
print(f"Error querying last pointid: {e}")
return None
# 插入数据的函数
def insert_data_batch(data_batch):
global success_count, failure_count
try:
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
insert_query = """
INSERT INTO db_fundamental_data (pointid, elevation, longitude, latitude, aspect, lucc, temp1, s1, rh1, rad1, geohash, temp_samehash, location)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST _GeomFromText(%s))
ON DUPLICATE KEY UPDATE
elevation = VALUES(elevation), longitude = VALUES(longitude), latitude = VALUES(latitude),
aspect = VALUES(aspect), lucc = VALUES(lucc), temp1 = VALUES(temp1),
s1 = VALUES(s1), rh1 = VALUES(rh1), rad1 = VALUES(rad1),
geohash = VALUES(geohash), temp_samehash = VALUES(temp_samehash),
location = VALUES(location)
"""
cursor.executemany(insert_query, data_batch)
connection.commit()
with lock:
success_count += len(data_batch)
except Exception as e:
with lock:
failure_count += len(data_batch)
print(f"Error inserting batch: {e}")
finally:
cursor.close()
connection.close()
# 读取文件并分批插入数据
def process_file(filename, start_pointid=None):
data_batch = []
start_upload = start_pointid is None # 如果没有指定起始 pointid,则从头开始上传
with open(filename, 'r') as file:
header = next(file) # 读取并保留表头
for line in file:
values = line.strip().split(',')
pointid = int(values[1])
# 如果指定了起始 pointid,跳过未到达的记录
if start_pointid and pointid <= start_pointid:
continue # 继续跳过直到找到大于 start_pointid 的记录
# 达到起始点或没有指定起始点时,开始处理数据
elevation = int(values[2])
longitude = Decimal(values[3]) # 使用 Decimal 处理高精度
latitude = Decimal(values[4]) # 使用 Decimal 处理高精度
aspect = int(values[5])
lucc = int(values[6])
temp1 = float(values[7]) if values[7] != '' else None
s1 = float(values[8]) if values[8] != '' else None
rh1 = float(values[9]) if values[9] != '' else None
rad1 = float(values[10]) if values[10] != '' else None
geohash = values[11] # geohash 字段
temp_samehash = values[12] # temp_samehash 字段
location = values[13] # location 字段
data_batch.append((pointid, elevation, longitude, latitude, aspect, lucc, temp1, s1, rh1, rad1, geohash, temp_samehash, location))
if len(data_batch) >= batch_size:
insert_data_batch(data_batch)
data_batch.clear()
# 插入剩余的数据
if data_batch:
insert_data_batch(data_batch)
# 标记为完成
stop_event.set()
# 定期打印导入进度的函数
def print_progress():
while not stop_event.is_set():
with lock:
print(f"已导入 {success_count} 行, 失败 {failure_count} 行")
time.sleep(10)
# 导入完成后的最终输出
with lock:
print(f"导入完成: 成功导入 {success_count} 行, 失败 {failure_count} 行")
if __name__ == '__main__':
filename = 'Combine.txt'
# 查询数据库中最后的 pointid
last_pointid = get_last_pointid()
# 提示信息
if last_pointid:
print(f"数据库中已存在数据,最后一个 pointid 为 {last_pointid},从该 pointid 后开始上传。")
else:
print("数据库中没有数据,将从文件的第一行开始上传。")
# 启动一个线程来打印导入进度
progress_thread = threading.Thread(target=print_progress)
progress_thread.start()
# 使用线程池进行并发插入
with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(process_file, filename, last_pointid)
# 等待所有插入任务完成
progress_thread.join()
大量数据导入后,可能会造成 MySQL 产生大量的日志,你可能需要手动进行清除。例如删除 2024年8月1日之前的日志 :
PURGE BINARY LOGS BEFORE '2024-08-01 00:00:00';
你可以在 MySQL 配置文件(my.cnf
或 my.ini
)中添加以下参数,自动清理超过指定天数的日志:
expire_logs_days = 7