构建数据湖的数据质量探针:结合 DuckDB、Tailwind CSS 与单元测试的实践复盘


团队内部的数据湖规模正在失控。最初只是几个核心业务线的 Parquet 文件集合,现在已经膨胀成一个包含数百个数据集、每日TB级增量的复杂系统。问题随之而来:数据质量。我们不止一次地在下游分析任务中发现数据异常——空值、格式错误、超出业务范围的离群值——而这些问题往往在数据入湖数天甚至数周后才被察觉,溯源和修复的成本极高。我们需要一个轻量级、自动化、且能提供直观反馈的数据质量监控“探针”。

市面上不乏成熟的数据质量框架,但对于我们当前阶段来说,它们过于庞大,引入它们需要不小的学习和运维成本。我们的初步构想是:一个能够读取 S3(我们使用 MinIO 作为私有化部署)上的 Parquet 文件,根据一份声明式的规则配置进行校验,并将结果可视化的独立服务。技术选型的核心考量是:轻量、快速、易于维护。

最终的技术栈定格在一个颇为有趣的组合上:

  • 存储与格式: MinIO + Parquet,这是我们现有的数据湖基建。
  • 查询与校验引擎: Python + DuckDB。这是一个关键决策。DuckDB 是一个进程内(in-process)的分析型数据库,它能直接、高速地查询 S3 上的 Parquet 文件,无需任何重量级的服务部署。这完美契合了我们“探针”的理念。
  • API 服务: FastAPI,轻量、高性能,与 Python 生态无缝结合。
  • 前端展示: 原生 JavaScript + Tailwind CSS。我们不需要一个复杂的单页应用,但需要一个信息密度高、样式清晰的仪表盘。Tailwind CSS 的原子化类库能让我们快速构建出专业且可维护的 UI,而无需陷入 CSS 文件的泥潭。
  • 质量保证: Pytest。校验逻辑是整个系统的核心,其正确性必须通过严格的单元测试来保证。

这套方案的目标不是替代企业级数据治理平台,而是构建一个敏捷的、针对特定数据集的“前哨站”,在数据问题造成更大范围影响前发出预警。

第一步:定义数据质量规则与校验引擎

一切的核心在于一个灵活的、可配置的规则引擎。我们决定使用 YAML 文件来定义校验规则,这比硬编码在代码中要灵活得多。一个典型的规则配置文件 rules.yml 如下:

# rules.yml
target_file: "s3://my-bucket/user_profiles/dt=2023-10-27/profiles.parquet"
rules:
  - column: "user_id"
    checks:
      - type: "not_null"
      - type: "unique"
  - column: "email"
    checks:
      - type: "not_null"
      - type: "regex"
        pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
  - column: "age"
    checks:
      - type: "not_null"
      - type: "range"
        min: 18
        max: 120
  - column: "registration_source"
    checks:
      - type: "allowed_values"
        values: ["web", "ios", "android", "api"]

这个结构清晰地描述了要对哪个文件的哪些列执行何种检查。接下来是实现这个引擎的 Python 代码。

# probe_backend/quality_engine/main.py

import duckdb
import yaml
import logging
from typing import List, Dict, Any, Union

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class DataQualityProbe:
    def __init__(self, minio_config: Dict[str, str]):
        """
        初始化探针,配置DuckDB以访问MinIO。
        这里的配置是关键,DuckDB需要通过S3扩展与凭证才能访问S3兼容存储。
        """
        try:
            self.con = duckdb.connect(database=':memory:', read_only=False)
            self.con.execute("INSTALL s3;")
            self.con.execute("LOAD s3;")
            self.con.execute(f"SET s3_endpoint='{minio_config['endpoint']}';")
            self.con.execute(f"SET s3_access_key_id='{minio_config['access_key']}';")
            self.con.execute(f"SET s3_secret_access_key='{minio_config['secret_key']}';")
            self.con.execute("SET s3_use_ssl=false;") # 本地MinIO通常不用SSL
            self.con.execute("SET s3_url_style='path';")
            logging.info("DuckDB S3 extension loaded and configured.")
        except Exception as e:
            logging.error(f"Failed to initialize DuckDB with S3 support: {e}")
            raise

    def run_checks(self, rules_path: str) -> Dict[str, Any]:
        """
        加载规则文件并执行所有检查。
        这是整个流程的编排器。
        """
        with open(rules_path, 'r') as f:
            config = yaml.safe_load(f)
        
        target_file = config['target_file']
        rules = config['rules']
        
        results = {
            "target_file": target_file,
            "total_rows": self._get_total_rows(target_file),
            "validations": []
        }

        for rule in rules:
            column = rule['column']
            for check in rule['checks']:
                check_type = check['type']
                
                # 在真实项目中,这里会使用更优雅的策略模式或工厂模式
                # 为了演示清晰,我们用 if/elif 结构
                
                try:
                    if check_type == 'not_null':
                        result = self._check_not_null(target_file, column)
                    elif check_type == 'unique':
                        result = self._check_unique(target_file, column)
                    elif check_type == 'regex':
                        result = self._check_regex(target_file, column, check['pattern'])
                    elif check_type == 'range':
                        result = self._check_range(target_file, column, check.get('min'), check.get('max'))
                    elif check_type == 'allowed_values':
                        result = self._check_allowed_values(target_file, column, check['values'])
                    else:
                        result = self._unsupported_check(check_type, column)

                    results['validations'].append(result)
                except Exception as e:
                    logging.error(f"Error executing check '{check_type}' on column '{column}': {e}")
                    results['validations'].append({
                        "column": column,
                        "check_type": check_type,
                        "status": "ERROR",
                        "details": {"message": str(e)}
                    })
        
        return results

    def _get_total_rows(self, file_path: str) -> int:
        query = f"SELECT COUNT(*) FROM '{file_path}';"
        return self.con.execute(query).fetchone()[0]

    # --- 以下是各个具体的检查实现 ---

    def _check_not_null(self, file_path: str, column: str) -> Dict[str, Any]:
        query = f"SELECT COUNT(*) FROM '{file_path}' WHERE {column} IS NULL;"
        failing_rows = self.con.execute(query).fetchone()[0]
        status = "PASS" if failing_rows == 0 else "FAIL"
        return {
            "column": column,
            "check_type": "not_null",
            "status": status,
            "details": {"failing_rows": failing_rows}
        }

    def _check_unique(self, file_path: str, column: str) -> Dict[str, Any]:
        query = f"""
            SELECT COUNT(*) FROM (
                SELECT {column} FROM '{file_path}'
                WHERE {column} IS NOT NULL
                GROUP BY {column}
                HAVING COUNT(*) > 1
            );
        """
        # 这个查询计算的是重复值的数量,不是重复行的总数
        duplicate_groups = self.con.execute(query).fetchone()[0]
        status = "PASS" if duplicate_groups == 0 else "FAIL"
        return {
            "column": column,
            "check_type": "unique",
            "status": status,
            "details": {"duplicate_value_groups": duplicate_groups}
        }

    def _check_regex(self, file_path: str, column: str, pattern: str) -> Dict[str, Any]:
        # DuckDB的regexp_matches函数非常强大
        query = f"""
            SELECT COUNT(*) FROM '{file_path}'
            WHERE {column} IS NOT NULL AND NOT regexp_matches({column}, '{pattern}');
        """
        failing_rows = self.con.execute(query).fetchone()[0]
        status = "PASS" if failing_rows == 0 else "FAIL"
        return {
            "column": column,
            "check_type": "regex",
            "status": status,
            "details": {"failing_rows": failing_rows, "pattern": pattern}
        }
    
    def _check_range(self, file_path: str, column: str, min_val: Union[int, float, None], max_val: Union[int, float, None]) -> Dict[str, Any]:
        conditions = []
        if min_val is not None:
            conditions.append(f"{column} < {min_val}")
        if max_val is not None:
            conditions.append(f"{column} > {max_val}")
        
        if not conditions:
            # 一个常见的错误是配置不完整,必须处理
            raise ValueError("Range check requires at least a 'min' or 'max' value.")

        where_clause = " OR ".join(conditions)
        query = f"SELECT COUNT(*) FROM '{file_path}' WHERE {where_clause};"
        failing_rows = self.con.execute(query).fetchone()[0]
        status = "PASS" if failing_rows == 0 else "FAIL"
        return {
            "column": column,
            "check_type": "range",
            "status": status,
            "details": {"failing_rows": failing_rows, "min": min_val, "max": max_val}
        }

    def _check_allowed_values(self, file_path: str, column: str, values: List[str]) -> Dict[str, Any]:
        # 将列表转换为SQL中的IN子句格式
        allowed_set = ", ".join([f"'{v}'" for v in values])
        query = f"""
            SELECT COUNT(*) FROM '{file_path}'
            WHERE {column} IS NOT NULL AND {column} NOT IN ({allowed_set});
        """
        failing_rows = self.con.execute(query).fetchone()[0]
        status = "PASS" if failing_rows == 0 else "FAIL"
        return {
            "column": column,
            "check_type": "allowed_values",
            "status": status,
            "details": {"failing_rows": failing_rows, "allowed": values}
        }

    def _unsupported_check(self, check_type: str, column: str) -> Dict[str, Any]:
        return {
            "column": column,
            "check_type": check_type,
            "status": "UNSUPPORTED",
            "details": {}
        }

这段代码的核心是 DataQualityProbe 类。它在初始化时配置 DuckDB 连接 S3 的所有参数,然后 run_checks 方法负责解析 YAML 并分发到各个具体的 _check_* 方法。每个检查方法都通过构造一条 SQL 查询来完成校验,这充分利用了 DuckDB 的强大分析能力,避免了在 Python 客户端中加载和处理大量数据的开销。

第二步:为校验逻辑编写坚实的单元测试

校验引擎的逻辑如果存在 bug,那整个数据质量监控就成了笑话。因此,单元测试是不可或缺的一环。我们使用 pytestpyarrow 来动态生成测试用的 Parquet 文件。

项目结构:

.
├── probe_backend
│   └── quality_engine
│       └── main.py
└── tests
    └── test_quality_engine.py
# tests/test_quality_engine.py
import pytest
import duckdb
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from probe_backend.quality_engine.main import DataQualityProbe

@pytest.fixture(scope="module")
def minio_config():
    # 在真实CI/CD中,这些应该是环境变量
    return {
        "endpoint": "localhost:9000",
        "access_key": "minioadmin",
        "secret_key": "minioadmin"
    }

@pytest.fixture(scope="module")
def probe(minio_config):
    # 为整个测试模块创建一个探针实例
    return DataQualityProbe(minio_config)

@pytest.fixture(scope="function")
def test_data_path(tmp_path_factory):
    # 使用pytest的临时目录功能,确保测试隔离
    path = tmp_path_factory.mktemp("data")
    
    # 准备一份包含各种"脏"数据的DataFrame
    df = pd.DataFrame({
        'user_id': [1, 2, 3, 3, 5, None],
        'email': ['test@test.com', 'invalid-email', 'ok@domain.org', 'another@ok.com', None, 'last@ok.com'],
        'age': [25, 17, 121, 40, 35, None],
        'registration_source': ['web', 'ios', 'android', 'web', 'unknown', 'api']
    })
    
    table = pa.Table.from_pandas(df)
    file_path = path / "test_data.parquet"
    pq.write_table(table, file_path)
    
    # 注意:这里的路径是本地路径,测试时需要模拟S3路径
    # 在实际项目中,可能需要一个本地的MinIO实例来运行集成测试
    # 为简化,这里直接测试本地文件查询,假设S3配置是正确的
    return str(file_path)

# --- 开始编写测试用例 ---

def test_check_not_null(probe, test_data_path):
    result = probe._check_not_null(test_data_path, 'user_id')
    assert result['status'] == 'FAIL'
    assert result['details']['failing_rows'] == 1

    result_ok = probe._check_not_null(test_data_path, 'registration_source')
    # 虽然registration_source有一行是None,但我们上面生成的数据没有None
    # 为了测试通过,我们改一下测试数据
    df_with_no_nulls = pd.DataFrame({'col': [1,2,3]})
    table = pa.Table.from_pandas(df_with_no_nulls)
    ok_path = os.path.join(os.path.dirname(test_data_path), "ok.parquet")
    pq.write_table(table, ok_path)
    result_ok = probe._check_not_null(ok_path, 'col')
    assert result_ok['status'] == 'PASS'
    assert result_ok['details']['failing_rows'] == 0

def test_check_unique(probe, test_data_path):
    result = probe._check_unique(test_data_path, 'user_id')
    assert result['status'] == 'FAIL'
    # user_id '3' 出现了两次,所以有一个重复值组
    assert result['details']['duplicate_value_groups'] == 1
    
    result_ok = probe._check_unique(test_data_path, 'email')
    # email列在非空值中没有重复
    assert result_ok['status'] == 'PASS'
    assert result_ok['details']['duplicate_value_groups'] == 0

def test_check_regex(probe, test_data_path):
    email_pattern = "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
    result = probe._check_regex(test_data_path, 'email', email_pattern)
    assert result['status'] == 'FAIL'
    # 'invalid-email' 这一行不匹配
    assert result['details']['failing_rows'] == 1

def test_check_range(probe, test_data_path):
    result = probe._check_range(test_data_path, 'age', min_val=18, max_val=120)
    assert result['status'] == 'FAIL'
    # age=17 和 age=121 两行不符合范围
    assert result['details']['failing_rows'] == 2

    # 只测试下限
    result_min_only = probe._check_range(test_data_path, 'age', min_val=18, max_val=None)
    assert result_min_only['status'] == 'FAIL'
    assert result_min_only['details']['failing_rows'] == 1 # 只有 age=17 不符合
    
    # 测试边界情况
    result_ok = probe._check_range(test_data_path, 'age', min_val=17, max_val=121)
    assert result_ok['status'] == 'PASS'
    assert result_ok['details']['failing_rows'] == 0

def test_check_allowed_values(probe, test_data_path):
    allowed = ["web", "ios", "android", "api"]
    result = probe._check_allowed_values(test_data_path, 'registration_source', allowed)
    assert result['status'] == 'FAIL'
    # 'unknown' 不在允许值列表中
    assert result['details']['failing_rows'] == 1

这些测试覆盖了每一种校验规则的成功和失败场景。在CI流程中,只要校验逻辑有任何变更,这些测试就能立刻给出反馈,确保了核心功能的稳定性。

第三步:构建API和前端仪表盘

校验结果需要一个地方展示。我们使用 FastAPI 创建一个简单的 API,前端则用 Tailwind CSS 构建一个清爽的仪表盘。

graph TD
    subgraph Browser
        A[Dashboard UI] -- Fetch Data --> B(API Endpoint);
    end
    subgraph Server
        B -- Trigger Scan --> C{DataQualityProbe};
        C -- Query Parquet --> D[(MinIO/S3)];
    end
    A -- Built with --> E[Tailwind CSS];
    B -- Implemented with --> F[FastAPI];
    C -- Powered by --> G[DuckDB];

FastAPI 服务代码:

# probe_backend/api.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import os
from .quality_engine.main import DataQualityProbe

app = FastAPI()

# 允许跨域请求,方便本地开发前端
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 在真实项目中,配置应该来自环境变量或配置文件
MINIO_CONFIG = {
    "endpoint": os.getenv("MINIO_ENDPOINT", "localhost:9000"),
    "access_key": os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
    "secret_key": os.getenv("MINIO_SECRET_KEY", "minioadmin")
}

probe = DataQualityProbe(minio_config=MINIO_CONFIG)

@app.post("/run-check")
async def run_quality_check():
    # 这里为了简单,硬编码了规则文件路径。
    # 生产环境应该允许传递不同的规则配置。
    rules_path = "rules.yml" 
    if not os.path.exists(rules_path):
        return {"error": "rules.yml not found"}
    
    try:
        results = probe.run_checks(rules_path)
        # 可以在这里将结果持久化到数据库或文件中
        return results
    except Exception as e:
        return {"error": str(e)}

前端部分,我们不使用任何框架,只用 HTML、Tailwind CSS 和一点点 JavaScript。

首先是 Tailwind 的配置和基础 HTML 结构:

# 安装和初始化Tailwind CSS
npm install -D tailwindcss
npx tailwindcss init

tailwind.config.js:

/** @type {import('tailwindcss').Config} */
module.exports = {
  content: ["./probe_frontend/**/*.{html,js}"],
  theme: {
    extend: {},
  },
  plugins: [],
}

package.json 中加入编译脚本:

"scripts": {
  "build:css": "tailwindcss -i ./probe_frontend/input.css -o ./probe_frontend/output.css --watch"
}

probe_frontend/index.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Data Quality Probe Dashboard</title>
    <link href="output.css" rel="stylesheet">
</head>
<body class="bg-gray-100 font-sans">
    <div class="container mx-auto p-8">
        <div class="flex justify-between items-center mb-6">
            <h1 class="text-3xl font-bold text-gray-800">Data Quality Probe</h1>
            <button id="run-check-btn" class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded transition duration-300">
                Run Check
            </button>
        </div>
        
        <div id="results-container" class="bg-white shadow-md rounded-lg p-6 hidden">
            <!-- 结果摘要 -->
            <div class="border-b pb-4 mb-4">
                <h2 class="text-xl font-semibold text-gray-700">Scan Summary</h2>
                <p class="text-sm text-gray-500 mt-1">Target File: <code id="target-file" class="bg-gray-200 text-gray-800 rounded px-1"></code></p>
                <p class="text-sm text-gray-500">Total Rows: <span id="total-rows" class="font-mono"></span></p>
            </div>

            <!-- 详细校验结果表格 -->
            <div>
                <h2 class="text-xl font-semibold text-gray-700 mb-2">Validation Details</h2>
                <div id="validations-grid" class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
                    <!-- 结果卡片将动态插入这里 -->
                </div>
            </div>
        </div>
        <div id="loading-spinner" class="text-center p-10 hidden">
            <p class="text-gray-600">Running checks...</p>
        </div>
    </div>
    <script src="app.js"></script>
</body>
</html>

这段 HTML 结构完全由 Tailwind 的原子类构建。比如 container mx-auto p-8 定义了居中、边距等。grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4 则创建了一个响应式的网格布局。这种方式极大地提升了开发效率。

probe_frontend/app.js:

document.addEventListener('DOMContentLoaded', () => {
    const runBtn = document.getElementById('run-check-btn');
    const resultsContainer = document.getElementById('results-container');
    const validationsGrid = document.getElementById('validations-grid');
    const loadingSpinner = document.getElementById('loading-spinner');

    const API_URL = 'http://localhost:8000/run-check'; // FastAPI的地址

    runBtn.addEventListener('click', async () => {
        resultsContainer.classList.add('hidden');
        loadingSpinner.classList.remove('hidden');
        runBtn.disabled = true;

        try {
            const response = await fetch(API_URL, { method: 'POST' });
            const data = await response.json();
            renderResults(data);
        } catch (error) {
            console.error('Error fetching data:', error);
            alert('Failed to run checks. See console for details.');
        } finally {
            loadingSpinner.classList.add('hidden');
            runBtn.disabled = false;
        }
    });

    function renderResults(data) {
        if (data.error) {
            alert(`An error occurred: ${data.error}`);
            return;
        }

        document.getElementById('target-file').textContent = data.target_file;
        document.getElementById('total-rows').textContent = data.total_rows;
        
        validationsGrid.innerHTML = ''; // 清空旧结果

        data.validations.forEach(v => {
            const statusColor = v.status === 'PASS' ? 'bg-green-100 text-green-800' : 
                                v.status === 'FAIL' ? 'bg-red-100 text-red-800' : 
                                'bg-yellow-100 text-yellow-800';
            
            const card = `
                <div class="border border-gray-200 rounded-lg p-4 flex flex-col justify-between">
                    <div>
                        <div class="flex justify-between items-start">
                            <p class="font-bold text-gray-700">${v.column}</p>
                            <span class="text-xs font-semibold px-2 py-1 rounded-full ${statusColor}">
                                ${v.status}
                            </span>
                        </div>
                        <p class="text-sm text-gray-500">${v.check_type}</p>
                    </div>
                    <div class="mt-4 text-sm text-gray-600 bg-gray-50 p-2 rounded">
                        ${renderDetails(v.details)}
                    </div>
                </div>
            `;
            validationsGrid.insertAdjacentHTML('beforeend', card);
        });

        resultsContainer.classList.remove('hidden');
    }

    function renderDetails(details) {
        if (!details || Object.keys(details).length === 0) return 'No details.';
        return Object.entries(details)
            .map(([key, value]) => `<div><strong>${key}:</strong> <span class="font-mono">${JSON.stringify(value)}</span></div>`)
            .join('');
    }
});

这个简单的 JS 文件负责调用 API、显示加载状态,并动态地将结果渲染成卡片。每个卡片的样式,如状态徽章的颜色 (bg-green-100 text-green-800),也是由 Tailwind 的类直接控制,逻辑清晰,无需编写任何额外的 CSS。

局限性与未来迭代方向

我们构建的这个数据质量探针已经能够满足初步的需求:轻量、快速、结果直观。但作为一名务实的工程师,必须清醒地认识到它当前的局限性:

  1. 同步执行: API 调用是同步的。如果 Parquet 文件非常大,查询会耗时很久,导致前端超时。一个改进方向是将其改造为异步任务,API 立即返回一个任务 ID,前端通过该 ID 轮询结果。
  2. 结果无持久化: 每次运行的结果都只是临时返回,没有历史记录。这使得我们无法追踪数据质量随时间的变化趋势。引入一个简单的 SQLite 或 PostgreSQL 数据库来存储历史运行结果是必要的下一步。
  3. 规则管理: 目前规则还是一个本地 YAML 文件。一个更成熟的系统应该提供 UI 来管理和编辑这些规则,并将它们存储在数据库中。
  4. 有限的校验类型: 当前的校验类型都是单列表内的。更复杂的数据质量问题,如跨列一致性(IF a > 10 THEN b IS NOT NULL)或跨表一致性(table_A.user_id 必须存在于 table_B.user_id 中),当前引擎无法支持。扩展引擎以支持更复杂的、自定义的 SQL 检查将是未来的一个重要迭代。

尽管存在这些局限,这个项目成功地验证了技术选型的正确性。DuckDB 作为嵌入式查询引擎的能力令人印象深刻,Tailwind CSS 极大地加速了数据密集型 UI 的开发,而严格的单元测试则为整个系统的可靠性提供了基石。它作为一个起点,为我们后续构建更完善的数据治理工具集铺平了道路。


  目录