ArcGIS处理DEM栅格并入库

最近从课题组那里接手了一堆数据集,准备做数据入库处理。我拿到的是 1km(0.083°) 分辨率的 DEM 栅格数据、TIF 栅格数据等,包括地形、地表特征、气象参数等。由于我拿到的数据是偏向于建筑节能研究使用的数据,还需要经过一些前期处理,以及坐标确认。

0. 什么是 DEM 和 TIF?

DEM 是一种表示地形高度数据的数字模型。它通常是栅格格式,每个像元(像素)表示地球表面某一点的海拔高度。TIF 是一种通用的图像文件格式,它可以存储各种类型的图像数据,包括照片、地图、扫描图像等。简单来说,TIF 是 DEM 数据的多种储存格式之一。

1. 添加数据

使用 ArcGIS Map 在图层处右键「添加数据」,将地形、地表、气象参数的 .tif 文件全部添加进来。在 ArcMap 中,在 ArcToolbox 中找到 转换工具 > 栅格 > 栅格转点,使用 “栅格转点” (Raster to Point) 工具,将 .tif 栅格数据转换为点要素图层,转换完后你就会得到 .shp 文件,这个就是点文件。

我的数据点从地形和基地坐标来看,就大约有一千五百万个点。这个过程很漫长,得等好几个小时。我也不知道什么原因,同时电脑的 CPU、内存、硬盘读写占用都不太高,你可以去喝杯咖啡休息休息。我想,可能是由于 ArcGIS 在处理大量数据时,尤其是对点要素添加经纬度字段(即计算几何)的操作,可能会变得非常慢。尽管 CPU、内存和硬盘使用率看似不高。

如此重复,把所有的 .tif 数据都进行这样的处理。如果你的 .tif 数据包括了多个字段,很遗憾,ArcGIS 每次栅格转点只能支持一个「字段」。

2. 导入 .shp

在保留之前的基地坐标图层,然后继续在图层处右键「添加数据」,把所有的 .shp 文件添加进去,右键打开「属性表」,你就能看见所有的离散点以及对应的值。

其中,FID(Feature ID)是 ArcGIS 自动生成的字段,代表 Feature ID(要素 ID)。它是每个要素(点、线、面)的唯一标识符。FID 字段通常是从 0 开始递增的整数(类似于 MySQL 这类数据库的自增 id),再导出时可以忽略掉它。

如果你有逐个导出了多个「字段」的栅格转点,你可以将这些 .shp 文件整合起来,最后再一起生成经纬度。在 ArcToolbox 中,展开 Data Management Tools > General > Merge(这里我切英文版本了,因为这里我出现了 ERROR 999999: 执行函数时出错的问题,可能跟路径和环境语言有关),合并 .shp。当然你也可以逐个生成经纬度后逐个导出。

3. 生成经纬度

当你打开「属性表」的时候,可以发现是没有经纬度的,因为此时的数据在基地坐标上(我的基地坐标是 WGS 1984 坐标系),然后我们来生成经纬度。

在属性表中,点击右上角的「表选项」,选择「添加字段」(Add Field)。创建两个新字段,分别命名为 Longitude 和 Latitude,数据类型选择 Double(双精度)。

右键点击刚刚创建的 Longitude 字段,选择「计算几何」(Calculate Geometry)。在对话框中选择 X 坐标(Longitude),单位选择度。同理,对 Latitude 字段重复上述操作,但选择 Y 坐标(Latitude)。

同样,数据量大的话,需要经过漫长的等待,再去续杯咖啡吧~

3.1 坐标变换

如果有的图层用的不是一个坐标系,例如我的 LUCC.shp 地表分类栅格数据集,需要在生成经纬度时进行坐标转换。

4. 导出文本文件

在导出格式中就文本文件比较友好,最后由 Python 处理文本文件导入到数据库就行。

5. 导入数据库

这是一个 .txt 数据导入 MySQL 的参考代码,这里使用了多线程和线程池,你可以根据你服务器的配置进行调整。max_workers 是最大线程数,默认设置了 5;batch_size 是批量插入数据的大小,batch_size = 10000 是一次性写入 10000 条数据。支持断点续传,会自动从数据库最后一个数据接着上传。

import mysql.connector
from concurrent.futures import ThreadPoolExecutor
import threading
import time
from decimal import Decimal

# MySQL 数据库连接信息
db_config = {
    'host': '数据库地址',
    'user': '用户名',
    'password': '密码',
    'database': '数据库名',
    'port': 3306
}

# 定义批量插入的大小
batch_size = 10000

# 全局计数器
success_count = 0
failure_count = 0
lock = threading.Lock()
stop_event = threading.Event()

# 获取数据库中最后一个 pointid
def get_last_pointid():
    try:
        connection = mysql.connector.connect(**db_config)
        cursor = connection.cursor()
        cursor.execute("SELECT MAX(pointid) FROM db_fundamental_data")
        result = cursor.fetchone()
        cursor.close()
        connection.close()
        return result[0] if result[0] is not None else None
    except Exception as e:
        print(f"Error querying last pointid: {e}")
        return None

# 插入数据的函数
def insert_data_batch(data_batch):
    global success_count, failure_count
    try:
        connection = mysql.connector.connect(**db_config)
        cursor = connection.cursor()
        insert_query = """
        INSERT INTO db_fundamental_data (pointid, elevation, longitude, latitude, aspect, lucc, temp1, s1, rh1, rad1, geohash, temp_samehash, location) 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST _GeomFromText(%s))
        ON DUPLICATE KEY UPDATE 
        elevation = VALUES(elevation), longitude = VALUES(longitude), latitude = VALUES(latitude), 
        aspect = VALUES(aspect), lucc = VALUES(lucc), temp1 = VALUES(temp1), 
        s1 = VALUES(s1), rh1 = VALUES(rh1), rad1 = VALUES(rad1), 
        geohash = VALUES(geohash), temp_samehash = VALUES(temp_samehash), 
        location = VALUES(location)
        """
        cursor.executemany(insert_query, data_batch)
        connection.commit()
        with lock:
            success_count += len(data_batch)
    except Exception as e:
        with lock:
            failure_count += len(data_batch)
        print(f"Error inserting batch: {e}")
    finally:
        cursor.close()
        connection.close()

# 读取文件并分批插入数据
def process_file(filename, start_pointid=None):
    data_batch = []
    start_upload = start_pointid is None  # 如果没有指定起始 pointid,则从头开始上传
    with open(filename, 'r') as file:
        header = next(file)  # 读取并保留表头
        for line in file:
            values = line.strip().split(',')
            pointid = int(values[1])
            
            # 如果指定了起始 pointid,跳过未到达的记录
            if start_pointid and pointid <= start_pointid:
                continue  # 继续跳过直到找到大于 start_pointid 的记录
            
            # 达到起始点或没有指定起始点时,开始处理数据
            elevation = int(values[2])
            longitude = Decimal(values[3])  # 使用 Decimal 处理高精度
            latitude = Decimal(values[4])   # 使用 Decimal 处理高精度
            aspect = int(values[5])
            lucc = int(values[6])
            temp1 = float(values[7]) if values[7] != '' else None
            s1 = float(values[8]) if values[8] != '' else None
            rh1 = float(values[9]) if values[9] != '' else None
            rad1 = float(values[10]) if values[10] != '' else None
            geohash = values[11]  # geohash 字段
            temp_samehash = values[12]  # temp_samehash 字段
            location = values[13]  # location 字段
            data_batch.append((pointid, elevation, longitude, latitude, aspect, lucc, temp1, s1, rh1, rad1, geohash, temp_samehash, location))
            
            if len(data_batch) >= batch_size:
                insert_data_batch(data_batch)
                data_batch.clear()

    # 插入剩余的数据
    if data_batch:
        insert_data_batch(data_batch)
    
    # 标记为完成
    stop_event.set()

# 定期打印导入进度的函数
def print_progress():
    while not stop_event.is_set():
        with lock:
            print(f"已导入 {success_count} 行, 失败 {failure_count} 行")
        time.sleep(10)
    
    # 导入完成后的最终输出
    with lock:
        print(f"导入完成: 成功导入 {success_count} 行, 失败 {failure_count} 行")

if __name__ == '__main__':
    filename = 'Combine.txt'
    
    # 查询数据库中最后的 pointid
    last_pointid = get_last_pointid()
    
    # 提示信息
    if last_pointid:
        print(f"数据库中已存在数据,最后一个 pointid 为 {last_pointid},从该 pointid 后开始上传。")
    else:
        print("数据库中没有数据,将从文件的第一行开始上传。")
    
    # 启动一个线程来打印导入进度
    progress_thread = threading.Thread(target=print_progress)
    progress_thread.start()
    
    # 使用线程池进行并发插入
    with ThreadPoolExecutor(max_workers=5) as executor:
        executor.submit(process_file, filename, last_pointid)
    
    # 等待所有插入任务完成
    progress_thread.join()

大量数据导入后,可能会造成 MySQL 产生大量的日志,你可能需要手动进行清除。例如删除 2024年8月1日之前的日志 :

PURGE BINARY LOGS BEFORE '2024-08-01 00:00:00';

你可以在 MySQL 配置文件(my.cnfmy.ini)中添加以下参数,自动清理超过指定天数的日志:

expire_logs_days = 7
暂无评论

发送评论 编辑评论


|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇