Python开发者实战指南:从零部署Apache Doris集群到数据操作全流程

Python开发者实战指南:从零部署Apache Doris集群到数据操作全流程
在数据分析与实时数仓领域Apache Doris 凭借其极速的查询性能和易用的特性正成为越来越多开发者和企业的选择。对于 Python 开发者而言无论是进行数据科学探索、构建数据应用还是集成到现有的 Python 技术栈中掌握 Doris 的部署与使用都至关重要。本文将手把手带你完成从零部署 Doris 集群到使用 Python 连接并进行数据操作的完整流程涵盖部署避坑指南、核心概念解析以及实战代码示例助你快速将 Doris 应用到实际项目中。1. Doris 核心概念与架构解析在开始部署之前理解 Doris 的基本架构和核心概念能帮助我们更好地规划集群和排查问题。1.1 什么是 Apache DorisApache Doris 是一个基于 MPP大规模并行处理架构的高性能、实时的分析型数据库。它最初由百度开发并开源旨在解决海量数据的实时分析问题。其主要特点包括极速查询 针对多表关联、复杂聚合等分析型查询进行了深度优化响应速度极快。易用性强 兼容 MySQL 协议用户可以使用标准的 SQL 和常见的 BI 工具如 Tableau, Superset直接连接。统一架构 简化了传统 Hadoop 生态中复杂的 Lambda 架构一个系统同时支持高吞吐的批量数据导入和低延迟的实时数据查询。弹性扩展 支持在线动态增删节点扩容缩容对业务透明。对于 Python 开发者这意味着你可以像操作 MySQL 一样使用pymysql或sqlalchemy等库来操作 Doris大大降低了学习成本。1.2 Doris 核心架构FE 与 BEDoris 采用 FrontendFE和 BackendBE分离的架构理解这两类节点是部署和运维的基础。Frontend (FE):角色 Doris 的“大脑”负责元数据管理、集群调度、查询解析和规划。类型 分为 Leader FE、Follower FE 和 Observer FE。Leader FE 主节点负责元数据的写入和集群管理。一个集群只有一个 Leader。Follower FE 追随者节点参与选举可提供读服务用于高可用。Observer FE 观察者节点仅同步元数据用于扩展集群的读能力。对外接口 接收 MySQL 客户端连接默认端口为9030。Backend (BE):角色 Doris 的“肌肉”负责数据存储、查询执行和计算。功能 数据以 Tablet数据分片为单位存储在 BE 上查询时由 FE 将任务调度到多个 BE 上并行执行。对外接口 用于数据导入、BE 间通信等默认端口为9050。一个生产环境的最小高可用集群通常包含1个 Leader FE 2个 Follower FE 3个 BE。对于学习和测试我们可以从单机部署开始。2. 环境准备与部署规划2.1 软硬件环境要求在开始安装前请确保你的环境满足以下基本要求操作系统 CentOS 7、Ubuntu 16.04 或 macOS。本文以CentOS 7.9为例。Java Doris FE 和 BE 都依赖 Java 运行环境。推荐安装JDK 81.8并确保JAVA_HOME环境变量已正确设置。# 检查Java版本 java -version # 输出应类似openjdk version 1.8.0_352硬件测试环境 至少 2核 CPU4GB 内存20GB 磁盘空间。生产环境 建议 BE 节点配备 SSD 磁盘和大内存FE 节点对磁盘 IO 要求不高但需要稳定。网络 集群内所有机器需要时钟同步可使用 NTP且网络互通防火墙需开放相关端口FE: 8030, 9020, 9030; BE: 8040, 9050, 9060。2.2 部署模式选择根据你的资源情况可以选择以下部署模式单机伪集群 在一台机器上启动多个 FE 和 BE 进程用于功能测试和学习。单机单节点 一台机器只启动一个 FE 和一个 BE最简单但无高可用。多机分布式集群 多台机器分别部署 FE 和 BE用于生产环境。为了全面学习我们将演示单机伪集群的部署方式这能让你在一台机器上体验 Doris 的核心功能。后续扩展到多机集群只需修改 IP 配置即可。3. 单机伪集群部署实战我们将在一台 CentOS 7.9 的机器上部署 1个 FE (Leader) 和 2个 BE。3.1 下载与解压首先访问 Apache Doris 官网下载页 获取最新稳定版二进制包。本文以apache-doris-2.0.4-bin-x64.tar.gz为例。# 1. 创建安装目录并进入 sudo mkdir -p /opt/doris sudo chown whoami:whoami /opt/doris cd /opt/doris # 2. 下载请替换为实际下载链接 wget https://archive.apache.org/dist/doris/2.0.4/apache-doris-2.0.4-bin-x64.tar.gz # 3. 解压 tar -zxvf apache-doris-2.0.4-bin-x64.tar.gz cd apache-doris-2.0.4-bin-x64解压后目录结构如下apache-doris-2.0.4-bin-x64/ ├── be/ # Backend 目录 ├── fe/ # Frontend 目录 ├── udf/ # 用户自定义函数目录 └── dependencies/ # 第三方依赖3.2 配置并启动 Frontend (FE)步骤1配置 FE进入 FE 的配置文件目录并复制模板配置文件。cd /opt/doris/apache-doris-2.0.4-bin-x64/fe cp conf/fe.conf.template conf/fe.conf编辑conf/fe.conf主要关注以下配置项单机伪集群需修改# 取消注释并配置元数据目录 meta_dir ${DORIS_HOME}/doris-meta # 取消注释并配置 FE 的 IP 地址请改为本机内网 IP不要用 127.0.0.1 priority_networks 192.168.1.100/24 # 单机伪集群需要指定所有 FE 的地址。这里我们只启动一个所以只写自己。 # 格式fe_host:edit_log_port 默认 edit_log_port 是 9010。 # 如果是多机部署需要列出所有 Follower FE 的 host:port。 # 例如192.168.1.100:9010,192.168.1.101:9010,192.168.1.102:9010对于单机伪集群测试一个更简单的做法是直接使用127.0.0.1但生产环境必须使用真实 IP。步骤2启动 FE# 在 fe 目录下执行 ./bin/start_fe.sh --daemon检查是否启动成功# 查看日志看到 thrift server started 字样即表示成功 tail -f log/fe.log # 或者查看进程 jps | grep PaloFe步骤3初次访问并设置 root 密码FE 启动后默认会在9030端口监听 MySQL 协议。我们使用 MySQL 客户端连接。# 安装 MySQL 客户端如果尚未安装 sudo yum install mysql -y # 连接 Doris FE初始无密码 mysql -h 127.0.0.1 -P 9030 -uroot连接成功后执行以下 SQL 修改 root 密码并退出SET PASSWORD FOR root PASSWORD(your_new_password); -- 例如SET PASSWORD FOR root PASSWORD(Doris2024); exit;3.3 配置并启动第一个 Backend (BE)步骤1配置 BEcd /opt/doris/apache-doris-2.0.4-bin-x64/be cp conf/be.conf.template conf/be.conf编辑conf/be.conf# 配置 BE 数据存储目录确保目录存在且有权限 storage_root_path /opt/doris/data1;/opt/doris/data2 # 配置 BE 的 IP 地址同样改为本机内网 IP priority_networks 192.168.1.100/24 # 配置 webserver 端口可选用于监控 webserver_port 8040 # 配置 heartbeat_service 端口BE 与 FE 通信 heartbeat_service_port 9050 # 配置 brpc 端口BE 间通信 brpc_port 8060创建数据目录sudo mkdir -p /opt/doris/data1 /opt/doris/data2 sudo chown -R whoami:whoami /opt/doris/data*步骤2启动 BE./bin/start_be.sh --daemon检查是否启动成功tail -f log/be.log # 看到 heartbeat success 等信息表示启动正常 jps | grep doris_be3.4 将 BE 节点添加到集群BE 启动后需要告知 FE 这个新节点的存在。# 再次使用 MySQL 客户端连接 Doris使用新密码 mysql -h 127.0.0.1 -P 9030 -uroot -p执行以下 SQL 添加 BE 节点192.168.1.100:9050是 BE 的heartbeat_service_portALTER SYSTEM ADD BACKEND 192.168.1.100:9050;查询 BE 状态确认添加成功且健康状态为trueSHOW BACKENDS\G输出中Alive列应为true。3.5 启动第二个 BE伪集群为了模拟多 BE 环境我们在同一台机器上启动第二个 BE 进程。需要复制一份 BE 目录并修改配置。# 1. 复制 BE 目录 cd /opt/doris/apache-doris-2.0.4-bin-x64 cp -r be be2 # 2. 修改 be2 的配置文件 cd be2 vim conf/be.conf修改以下端口避免与第一个 BE 冲突# 修改这些端口号例如在原端口上加偏移量 be_port 9060 # 原 be 是 9060 检查第一个be的配置默认是9060这里改为9061 webserver_port 8041 # 原 8040 - 8041 heartbeat_service_port 9051 # 原 9050 - 9051 brpc_port 8061 # 原 8060 - 8061 # 修改数据存储路径 storage_root_path /opt/doris/data3;/opt/doris/data4创建新的数据目录sudo mkdir -p /opt/doris/data3 /opt/doris/data4 sudo chown -R whoami:whoami /opt/doris/data*步骤3启动第二个 BE 并添加到集群./bin/start_be.sh --daemon再次连接 MySQL 客户端添加第二个 BEALTER SYSTEM ADD BACKEND 192.168.1.100:9051; SHOW BACKENDS\G现在你应该能看到两个Alive状态为true的 BE 节点。4. 基础使用通过 MySQL 客户端操作 Doris集群搭建完成后我们可以通过 MySQL 客户端进行基本的数据库操作这有助于理解 Doris 的 SQL 语法。4.1 创建数据库与用户-- 1. 创建数据库 CREATE DATABASE IF NOT EXISTS demo_db; USE demo_db; -- 2. 创建普通用户并授权 CREATE USER test_user IDENTIFIED BY Test123!; GRANT ALL ON demo_db.* TO test_user;4.2 创建表与数据模型Doris 支持多种数据模型Duplicate, Aggregate, Unique这里以最常用的Aggregate 模型为例创建一张用户行为表。-- 3. 创建表 CREATE TABLE IF NOT EXISTS user_behavior ( user_id INT NOT NULL, date DATE NOT NULL, city VARCHAR(20) DEFAULT NULL, device VARCHAR(10) DEFAULT NULL, -- 维度列 category_id INT NOT NULL, -- 指标列使用聚合函数 pv BIGINT SUM DEFAULT 0, -- 页面浏览量求和 cost BIGINT SUM DEFAULT 0 -- 消费金额求和 ) ENGINEolap AGGREGATE KEY(user_id, date, city, device, category_id) -- 聚合键 DISTRIBUTED BY HASH(user_id) BUCKETS 10 -- 分桶方式 PROPERTIES ( replication_num 2 -- 副本数我们有两个BE所以设置为2 );关键概念解释AGGREGATE KEY 指定了数据的聚合维度。查询时所有未在 KEY 中指定的列如pv,cost会根据其定义的聚合函数如SUM进行预聚合这极大地提升了查询性能。DISTRIBUTED BY HASH ... BUCKETS 指定了数据在多个 BE 间的分布方式。user_id相同的行会被分配到同一个桶Bucket中。replication_num 数据副本数用于高可用和负载均衡。设置为 2 意味着数据会在两个 BE 上各存一份。4.3 数据导入与查询Doris 支持多种数据导入方式这里演示最简单的INSERT INTO和流式导入Stream Load。方式一使用 INSERT INTO适合小批量测试数据INSERT INTO user_behavior (user_id, date, city, device, category_id, pv, cost) VALUES (1001, 2024-05-01, 北京, Mobile, 1, 5, 50), (1001, 2024-05-01, 北京, PC, 2, 3, 30), (1002, 2024-05-01, 上海, Mobile, 1, 2, 20), (1001, 2024-05-02, 北京, Mobile, 1, 1, 10);方式二使用 Stream Load适合文件导入首先准备一个数据文件data.csv1003,2024-05-01,广州,PC,3,10,100 1004,2024-05-02,深圳,Mobile,2,8,80然后通过 curl 命令执行 Stream Loadcurl --location-trusted -u test_user:Test123! \ -H label:label1 \ -H column_separator:, \ -T /path/to/data.csv \ http://192.168.1.100:8030/api/demo_db/user_behavior/_stream_load8030是 FE 的 HTTP 端口用于接收数据导入请求。执行查询-- 查看所有数据 SELECT * FROM user_behavior ORDER BY date, user_id; -- 按用户和日期聚合查询 SELECT user_id, date, SUM(pv) as total_pv, SUM(cost) as total_cost FROM user_behavior GROUP BY user_id, date ORDER BY total_pv DESC; -- 条件查询 SELECT city, SUM(pv) as city_pv FROM user_behavior WHERE date 2024-05-01 GROUP BY city;5. 使用 Python 连接与操作 Doris这是 Python 开发者最关心的部分。我们将使用pymysql这个通用的 MySQL 客户端库来连接 Doris因为 Doris 完全兼容 MySQL 协议。5.1 环境准备与依赖安装确保你的 Python 环境已安装pymysql。pip install pymysql # 如果需要使用 pandas 进行数据分析也可以安装 pip install pandas sqlalchemy5.2 基础连接与查询创建一个 Python 脚本doris_demo.py# doris_demo.py import pymysql import pandas as pd from sqlalchemy import create_engine # 1. 使用 pymysql 进行连接和操作 def connect_with_pymysql(): connection None try: # 建立连接 connection pymysql.connect( host192.168.1.100, # 你的 FE 节点 IP port9030, # FE 的 MySQL 协议端口 usertest_user, passwordTest123!, databasedemo_db, # 指定数据库 charsetutf8mb4, cursorclasspymysql.cursors.DictCursor # 返回字典格式的结果 ) print(连接 Doris 成功) with connection.cursor() as cursor: # 执行查询 sql SELECT user_id, date, city, SUM(pv) as total_pv FROM user_behavior GROUP BY user_id, date, city cursor.execute(sql) results cursor.fetchall() print(查询结果) for row in results: print(row) # 执行插入 insert_sql INSERT INTO user_behavior (user_id, date, city, device, category_id, pv, cost) VALUES (%s, %s, %s, %s, %s, %s, %s) data (1005, 2024-05-03, 杭州, Mobile, 4, 7, 70) cursor.execute(insert_sql, data) connection.commit() # 提交事务 print(数据插入成功) except pymysql.Error as e: print(f数据库操作失败: {e}) finally: if connection: connection.close() print(连接已关闭。) if __name__ __main__: connect_with_pymysql()5.3 使用 SQLAlchemy 与 Pandas 进行数据分析对于数据科学工作流结合sqlalchemy和pandas会更加方便。# 2. 使用 SQLAlchemy 和 Pandas def connect_with_sqlalchemy(): # 创建连接字符串格式mysqlpymysql://user:passwordhost:port/database engine create_engine(mysqlpymysql://test_user:Test123!192.168.1.100:9030/demo_db) try: # 将查询结果直接读入 pandas DataFrame query SELECT city, device, SUM(pv) as total_pv, SUM(cost) as total_cost, AVG(cost) as avg_cost_per_pv FROM user_behavior WHERE date BETWEEN 2024-05-01 AND 2024-05-03 GROUP BY city, device ORDER BY total_pv DESC df pd.read_sql(query, engine) print(使用 Pandas 读取的数据) print(df) print(\n数据类型) print(df.dtypes) # 进行简单的数据分析 print(f\n总PV: {df[total_pv].sum()}) print(f总消费: {df[total_cost].sum()}) print(f\n按城市汇总PV) print(df.groupby(city)[total_pv].sum()) # 将处理后的数据写回 Doris 的新表可选 # df.to_sql(analysis_result, conengine, if_existsreplace, indexFalse) except Exception as e: print(f操作失败: {e}) finally: engine.dispose() if __name__ __main__: connect_with_pymysql() print(\n *50 \n) connect_with_sqlalchemy()5.4 使用 Stream Load 进行高效数据导入对于大批量数据使用INSERT语句效率很低。推荐使用Stream LoadPython 可以通过requests库调用其 HTTP API。# doris_stream_load.py import requests import json def stream_load_to_doris(csv_file_path, table_nameuser_behavior): 使用 Stream Load 方式导入 CSV 文件到 Doris url fhttp://192.168.1.100:8030/api/demo_db/{table_name}/_stream_load headers { Authorization: Basic base64.b64encode(btest_user:Test123!).decode(), Expect: 100-continue, label: fstream_load_{int(time.time())}, # Label 必须唯一 column_separator: ,, } try: with open(csv_file_path, rb) as f: response requests.put(url, headersheaders, dataf, timeout30) result response.json() print(Stream Load 响应, json.dumps(result, indent2)) if result.get(Status) Success: print(f数据导入成功导入行数{result.get(NumberLoadedRows)}) else: print(f数据导入失败错误信息{result.get(ErrorURL)}) # 可以访问 ErrorURL 查看详细错误 except Exception as e: print(f导入过程发生异常: {e}) if __name__ __main__: # 准备一个大的 CSV 文件进行测试 stream_load_to_doris(/path/to/your/large_data.csv)6. 常见问题与排查指南在部署和使用 Doris 过程中你可能会遇到以下问题。6.1 部署启动问题问题现象可能原因排查思路与解决方案FE 启动失败日志报java.net.BindException: Address already in use端口被占用。FE 默认使用 8030(HTTP), 9020(BRPC), 9030(MySQL), 9010(Edit Log) 等端口。1.netstat -tlnp | grep 端口号查看占用进程。2. 停止冲突进程或修改fe.conf中的对应端口如query_port,rpc_port。BE 启动失败日志报Fail to get master client fromBE 无法连接到 FE。1. 检查 FE 是否已成功启动 (jps | grep PaloFe)。2. 检查fe.conf和be.conf中的priority_networks配置确保 IP 正确且网络互通。3. 检查防火墙是否开放了 FE 的9010和9030端口。SHOW BACKENDS显示Alive为falseBE 与 FE 心跳失败。1. 检查 BE 日志log/be.INFO看是否有错误。2. 检查 BE 的heartbeat_service_port(默认9050) 是否被防火墙阻挡。3. 在 FE 节点尝试telnet BE_IP 9050测试连通性。创建表时失败报Failed to create partition副本数replication_num设置大于可用 BE 数量。1.SHOW BACKENDS确认可用 BE 数量。2. 建表时设置PROPERTIES (“replication_num” “1”)或增加 BE 节点。6.2 Python 连接与操作问题问题现象可能原因排查思路与解决方案pymysql.err.OperationalError: (2003, “Can’t connect to MySQL server”)网络不通或 FE 未启动。1. 检查 FE 的9030端口是否监听netstat -tlnp | grep 9030。2. 检查客户端到 FE 服务器的网络和防火墙规则。3. 确认连接参数host, port, user, password正确。pymysql.err.ProgrammingError: (1142, “INSERT command denied to user”)用户权限不足。1. 使用 root 用户登录检查并授予相应用户对数据库和表的权限GRANT ALL ON demo_db.* TO ‘test_user’;2. 执行FLUSH PRIVILEGES;。Stream Load 返回Status: Fail,Message: “Label [xxx] already used”Stream Load 的 Label 重复。Label 用于保证数据导入的幂等性。1. 确保每次 Stream Load 请求使用唯一的 Label。可以使用时间戳或 UUID。2. 如果确实需要重试可以等待一段时间Label 默认保留1小时或更换 Label。查询速度慢1. 数据未合理分桶。2. 没有合适的索引。3. 聚合模型未有效利用。1. 检查表的分桶键DISTRIBUTED BY是否选择了高基数列如 user_id。2. 对常用查询条件列创建 Rollup 索引物化视图。3. 确保查询条件能命中聚合模型的预聚合结果。6.3 数据导入与查询优化建议批量导入 务必使用Stream Load、Broker Load或Insert Into with VALUES批量插入避免单条INSERT。合理分桶 分桶数建议在10-100个之间每个 Bucket 的数据量在100MB-1GB为宜。分桶键应选择高基数列值种类多且常作为查询条件或连接条件。利用 Rollup 对于频繁的、固定维度的聚合查询可以创建 Rollup物化视图来显著提升查询速度。例如为上文的user_behavior表创建一个按city, date预聚合的 RollupALTER TABLE user_behavior ADD ROLLUP rollup_city_date (city, date, pv, cost); -- 构建 Rollup BUILD ROLLUP rollup_city_date ON user_behavior;之后查询SELECT city, date, SUM(pv) FROM user_behavior GROUP BY city, date;会自动命中 Rollup速度极快。**避免 SELECT *** 分析型数据库列存特性明显只查询需要的列可以大幅减少 IO。7. 生产环境最佳实践与进阶学习当你准备将 Doris 用于生产环境时以下建议至关重要。7.1 集群规划与高可用FE 高可用 生产环境至少部署1 Leader 2 Follower。使用 VIP 或负载均衡器对外提供统一的连接地址9030端口避免单点故障。BE 横向扩展 BE 是存储和计算节点可以轻松水平扩展。建议从3个BE开始根据数据量和查询压力逐步增加。资源隔离 如果机器资源充足可以考虑将 FE 和 BE 部署在不同的物理机或虚拟机上避免资源竞争。监控与告警 集成 Prometheus 和 Grafana 监控 Doris 集群状态FE/BE 存活、查询延迟、内存使用等。Doris 提供了标准的 Metrics 接口。7.2 数据模型与表设计模型选择Aggregate 模型 适用于有预聚合需求的报表场景如 SUM, COUNT, MAX。Unique 模型 适用于有主键唯一约束的场景实现数据的 Upsert更新或插入。Duplicate 模型 适用于需要保留原始明细数据的日志分析场景无聚合。分区与分桶使用PARTITION BY RANGE对时间列进行分区可以高效地进行数据生命周期管理如按天删除旧分区。分桶是数据在节点间分布和并行计算的基础设计需谨慎。索引策略 除了前缀索引由建表时列的顺序决定合理使用BITMAP索引适用于低基数列的等值查询和Bloom Filter索引适用于高基数列的等值查询。7.3 Python 工程化集成连接池管理 在生产应用中使用DBUtils或SQLAlchemy的连接池来管理数据库连接避免频繁创建连接的开销。from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool engine create_engine( mysqlpymysql://user:passhost:port/db, poolclassQueuePool, pool_size5, max_overflow10, pool_pre_pingTrue # 防止连接失效 )错误重试与降级 网络波动或集群维护可能导致短暂不可用代码中应有重试机制和降级策略。配置外部化 将数据库连接信息、表名等配置放在环境变量或配置文件中不要硬编码在代码里。7.4 后续学习路径掌握基础部署和使用后你可以进一步探索数据导入 学习Broker Load从 HDFS/S3 导入、Routine Load从 Kafka 实时导入、Spark Load通过 Spark 进行 ETL 后导入。查询优化 学习使用EXPLAIN命令分析查询计划理解Shuffle Join、Broadcast Join等分布式查询原理。备份与恢复 学习使用BACKUP和RESTORE命令进行集群数据备份。多租户与资源隔离 学习使用Resource Group来管理不同用户的查询资源避免慢查询影响线上业务。从单机测试到分布式生产部署从基础 SQL 到 Python 深度集成Apache Doris 为 Python 开发者提供了一个强大而友好的实时分析数据库选择。关键在于理解其架构原理遵循最佳实践进行建模和查询从而在数据驱动的应用中发挥其最大价值。