一个内部开发者平台中数据湖查询层的架构选型与实现


我们面临一个日益严峻的挑战:内部开发者平台(IDP)需要为数百名开发者提供对存储在数据湖中 PB 级日志、指标和追踪数据的快速查询能力。现有的方案——允许开发者通过 Presto/Trino 客户端直接查询——已经暴露了三个核心问题:性能不可控、成本失控以及缺乏统一的治理。一个复杂的分析查询可能会占用集群大量资源,影响到其他关键的自动化任务;同时,无限制的 SELECT * 操作也导致了高昂的计算成本。我们需要构建一个新的统一查询层。

定义问题:统一查询层的架构抉择

摆在面前的有两种主流架构方案。

方案 A:一体化全功能查询服务

这种架构倾向于构建一个功能全面的单体服务,通常会采用一个成熟的企业级框架,比如 Java Spring Boot 或 Python Django。

graph TD
    subgraph "一体化查询服务 (Java/Python)"
        A1[API 端点]
        A2[认证与鉴权]
        A3[查询解析与优化]
        A4[数据湖引擎连接器]
        A5[结果集转换与缓存]
    end

    U1[Web Dashboard] --> A1
    U2[Android App] --> A1
    U3[CLI Tool] --> A1
    A1 --> A2 --> A3 --> A4 --> A5 --> A1
    A4 --> DL[(Data Lake)]
  • 优势:

    1. 开发简化: 单一代码库,统一的技术栈,使得初期的开发、部署和维护相对简单。
    2. 事务一致性: 如果需要,在服务内部处理复杂逻辑和状态管理会更容易。
  • 劣势:

    1. 性能瓶颈: 这是一个真实项目中必然会遇到的坑。Web Dashboard 发起的复杂、长耗时的分析查询,与 Android App 为 SRE 团队设计的、要求毫秒级响应的告警根因速查,会相互竞争资源。单体服务的线程池和数据库连接池是共享的,一个慢查询足以拖垮整个服务。
    2. 技术栈僵化: 为 Web UI 优化的技术选型(如深度集成的模板引擎)对于纯粹的 API 客户端(如 Android App)来说是多余的负担。反之亦然。
    3. 迭代风险: 任何微小的改动都需要整个服务重新部署,发布风险高,敏捷性差。

方案 B:轻量级查询网关与专用客户端

该方案提倡关注点分离。我们构建一个极度轻量化、高性能的 API 网关,它的唯一职责是作为数据湖查询的代理。所有客户端,无论是 Web、移动端还是 CLI,都通过这个网关进行交互。

graph TD
    subgraph "专用客户端"
        C1[Web Dashboard]
        C2[Android App for SRE]
        C3[CLI Tool]
    end

    subgraph "轻量级查询网关 (Go-Fiber)"
        B1[API Endpoints]
        B2[轻量级认证/鉴权]
        B3[速率限制/配额]
        B4[查询路由与透传]
    end

    subgraph "后端查询引擎"
        E1[Presto/Trino]
    end
    
    C1 --> B1
    C2 --> B1
    C3 --> B1
    B1 --> B2 --> B3 --> B4 --> E1
    E1 --> DL[(Data Lake)]
  • 优势:

    1. 性能隔离与优化: 网关本身可以非常快,因为它只做协议转换和请求转发。我们可以使用 Go 这样高并发、低内存占用的语言(例如 Fiber 框架)来构建。更重要的是,可以为不同类型的客户端或请求路径设置不同的速率限制和优先级。
    2. 技术栈灵活性: 网关使用 Go,Web 前端可以使用 React/Vue,Android 客户端使用 Kotlin。每个团队都可以选择最适合自身场景的技术栈,独立开发和部署。
    3. 高可用性与可维护性: 网关逻辑简单,核心依赖少,更容易实现高可用部署。各个客户端的迭代互不影响。
  • 劣势:

    1. 架构复杂性: 引入了更多的组件,需要管理多个代码仓库和部署流水线。
    2. API 契约: 网关的 API 设计至关重要,它成为了团队间协作的强契约,一旦定义不佳,后续的修改成本很高。

最终决策

在一个追求长期稳定性和可扩展性的生产环境中,方案 B 是更务实的选择。初期的复杂性投入换来的是未来的灵活性和性能保障。我们将使用 Go-Fiber 构建查询网关,因为它以性能著称且开发体验极佳。同时,我们将分别展示 Android 客户端和 Web 前端的核心实现片段,以体现这种架构的完整视图。

核心实现:构建 Go-Fiber 查询网关

网关的核心职责包括:接收请求、验证身份、执行查询、返回结果。

1. 项目结构与配置

一个典型的 Go 项目结构如下:

/idp-query-gateway
|-- /cmd/server
|   `-- main.go
|-- /internal
|   |-- /api
|   |   |-- handler.go
|   |   `-- routes.go
|   |-- /config
|   |   `-- config.go
|   |-- /query
|   |   |-- presto_service.go
|   |   `-- service.go
|-- go.mod
|-- go.sum
|-- Dockerfile
`-- config.yaml

config.yaml 文件用于管理所有配置,避免硬编码。

server:
  port: ":8080"

auth:
  # In a real project, use a more secure key management system like Vault
  jwt_secret: "a-very-secret-and-long-key-for-hs256"

query_engine:
  type: "presto"
  presto:
    host: "http://presto-coordinator:8080"
    user: "idp-gateway"
    source: "idp-query-gateway"
    catalog: "hive"
    schema: "default"
    timeout_seconds: 120

2. 主程序入口与中间件

cmd/server/main.go 是程序的起点,我们在这里初始化配置、设置 Fiber 实例和注册路由。

package main

import (
	"log"

	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/fiber/v2/middleware/logger"
	"github.com/gofiber/fiber/v2/middleware/recover"

	"idp-query-gateway/internal/api"
	"idp-query-gateway/internal/config"
	"idp-query-gateway/internal/query"
)

func main() {
	// 1. Load configuration
	cfg, err := config.LoadConfig(".")
	if err != nil {
		log.Fatalf("could not load config: %v", err)
	}

	// 2. Initialize dependencies
	// A real implementation would use a proper DB connection pool from the presto driver.
	// For this example, we encapsulate config details within the service.
	queryService, err := query.NewPrestoService(cfg.QueryEngine.Presto)
	if err != nil {
		log.Fatalf("failed to create query service: %v", err)
	}
	queryHandler := api.NewQueryHandler(queryService)

	// 3. Setup Fiber app
	app := fiber.New(fiber.Config{
		ErrorHandler: api.GlobalErrorHandler, // Custom global error handler
	})

	// 4. Register middlewares
	app.Use(recover.New())
	app.Use(logger.New(logger.Config{
		Format: "[${ip}]:${port} ${status} - ${method} ${path}\n",
	}))

	// 5. Setup routing
	api.SetupRoutes(app, queryHandler, cfg.Auth)

	// 6. Start server
	log.Printf("Starting server on port %s", cfg.Server.Port)
	if err := app.Listen(cfg.Server.Port); err != nil {
		log.Fatalf("failed to start server: %v", err)
	}
}

3. API 路由与认证

internal/api/routes.go 文件定义了所有 API 路由,并应用了 JWT 中间件来保护端点。

package api

import (
	jwtware "github.com/gofiber/contrib/jwt"
	"github.com/gofiber/fiber/v2"
	"idp-query-gateway/internal/config"
)

// SetupRoutes configures the application's routes.
func SetupRoutes(app *fiber.App, handler *QueryHandler, authConfig config.AuthConfig) {
	api := app.Group("/api")
	v1 := api.Group("/v1")

	// Public route, e.g., for health checks
	v1.Get("/health", func(c *fiber.Ctx) error {
		return c.JSON(fiber.Map{"status": "ok"})
	})

	// Private group for query endpoints
	queryGroup := v1.Group("/query")
	
	// Configure JWT middleware
	// This ensures that only authenticated requests can access the query endpoints.
	queryGroup.Use(jwtware.New(jwtware.Config{
		SigningKey: jwtware.SigningKey{Key: []byte(authConfig.JWTSecret)},
		// A custom error handler for JWT failures provides better client feedback.
		ErrorHandler: jwtError,
	}))

	queryGroup.Post("/", handler.ExecuteQuery)
}

func jwtError(c *fiber.Ctx, err error) error {
	if err.Error() == "Missing or malformed JWT" {
		return c.Status(fiber.StatusBadRequest).
			JSON(fiber.Map{"status": "error", "message": "Missing or malformed JWT", "data": nil})
	}
	return c.Status(fiber.StatusUnauthorized).
		JSON(fiber.Map{"status": "error", "message": "Invalid or expired JWT", "data": nil})
}

4. 核心查询处理器

internal/api/handler.go 包含了处理业务逻辑的代码。

package api

import (
	"context"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/golang-jwt/jwt/v5"

	"idp-query-gateway/internal/query"
)

// QueryRequest defines the structure for incoming query requests.
type QueryRequest struct {
	Query string `json:"query"`
	// Client-side timeout in seconds. The server can enforce a max timeout.
	Timeout int `json:"timeout_seconds"` 
}

// QueryHandler holds the business logic for query operations.
type QueryHandler struct {
	service query.Service
}

func NewQueryHandler(s query.Service) *QueryHandler {
	return &QueryHandler{service: s}
}

// ExecuteQuery handles the query execution request.
func (h *QueryHandler) ExecuteQuery(c *fiber.Ctx) error {
	// 1. Parse and validate request body
	var req QueryRequest
	if err := c.BodyParser(&req); err != nil {
		return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
	}
	if req.Query == "" {
		return fiber.NewError(fiber.StatusBadRequest, "Query cannot be empty")
	}

	// 2. Extract user identity from JWT for auditing/logging purposes.
	// A real-world application would use this for fine-grained access control.
	user := c.Locals("user").(*jwt.Token)
	claims := user.Claims.(jwt.MapClaims)
	username := claims["username"].(string)

	// 3. Set a request context with a timeout.
	// We honor the client's timeout but cap it to prevent abuse.
	clientTimeout := time.Duration(req.Timeout) * time.Second
	serverMaxTimeout := 120 * time.Second
	if clientTimeout <= 0 || clientTimeout > serverMaxTimeout {
		clientTimeout = serverMaxTimeout
	}
	ctx, cancel := context.WithTimeout(c.Context(), clientTimeout)
	defer cancel()

	// 4. Log the query attempt. Structured logging is preferred in production.
	log.Printf("User '%s' is executing query with timeout %v", username, clientTimeout)

	// 5. Delegate to the query service
	result, err := h.service.RunQuery(ctx, req.Query)
	if err != nil {
		// The error will be handled by the global error handler.
		return err
	}

	return c.Status(fiber.StatusOK).JSON(fiber.Map{
		"status": "success",
		"data":   result,
	})
}

5. 查询服务抽象与实现

为了保持代码整洁和可测试性,我们定义一个 Service 接口,并为 Presto 提供一个具体实现。这使得未来切换到 ClickHouse 或其他引擎变得容易。

internal/query/service.go:

package query

import "context"

// QueryResult represents a generic result from the query engine.
// A more robust implementation might use typed columns and rows.
type QueryResult struct {
	Columns []string        `json:"columns"`
	Rows    [][]interface{} `json:"rows"`
}

// Service is the interface for interacting with a query engine.
type Service interface {
	RunQuery(ctx context.Context, sql string) (*QueryResult, error)
}

internal/query/presto_service.go (这是一个简化的实现,实际生产代码会使用成熟的 Presto Go 驱动):

package query

import (
	"context"
	"database/sql"
	"fmt"
	"time"

	// Production code should use a robust driver like "trino-go-client" or "prestgo"
	_ "github.com/prestodb/presto-go-client/presto"
	"idp-query-gateway/internal/config"
)

type prestoService struct {
	db *sql.DB
	// ... other presto specific configs
}

func NewPrestoService(cfg config.PrestoConfig) (Service, error) {
	dsn := fmt.Sprintf("http://%s@%s?source=%s&catalog=%s&schema=%s",
		cfg.User, cfg.Host, cfg.Source, cfg.Catalog, cfg.Schema)
	
	db, err := sql.Open("presto", dsn)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to presto: %w", err)
	}

	// It's crucial to configure the connection pool for production workloads.
	db.SetMaxOpenConns(20)
	db.SetMaxIdleConns(10)
	db.SetConnMaxLifetime(time.Hour)

	return &prestoService{db: db}, nil
}

func (s *prestoService) RunQuery(ctx context.Context, sql string) (*QueryResult, error) {
	rows, err := s.db.QueryContext(ctx, sql)
	if err != nil {
		// Context deadline exceeded is a common error we need to handle gracefully.
		if ctx.Err() == context.DeadlineExceeded {
			return nil, fmt.Errorf("query timed out: %w", ctx.Err())
		}
		return nil, fmt.Errorf("presto query failed: %w", err)
	}
	defer rows.Close()

	columns, err := rows.Columns()
	if err != nil {
		return nil, fmt.Errorf("failed to get columns: %w", err)
	}

	var results [][]interface{}
	for rows.Next() {
		values := make([]interface{}, len(columns))
		valuePtrs := make([]interface{}, len(columns))
		for i := range columns {
			valuePtrs[i] = &values[i]
		}

		if err := rows.Scan(valuePtrs...); err != nil {
			return nil, fmt.Errorf("failed to scan row: %w", err)
		}
		results = append(results, values)
	}
	
	if err = rows.Err(); err != nil {
        return nil, fmt.Errorf("error iterating rows: %w", err)
    }

	return &QueryResult{Columns: columns, Rows: results}, nil
}

专用客户端实现:Android SRE 工具

Android 客户端的目标不是提供一个功能齐全的仪表盘,而是为 On-Call 工程师提供一个快速诊断问题的工具。它会调用网关执行预设好的、高优先级的查询。

SreQueryRepository.kt:

import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor
import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory
import java.io.IOException

// Data classes matching the Go service's JSON response
data class QueryPayload(val query: String, val timeout_seconds: Int = 15)
data class QueryResponse(val status: String, val data: QueryResult?)
data class QueryResult(val columns: List<String>, val rows: List<List<Any>>)

// Retrofit API interface
interface IdpGatewayApi {
    @POST("api/v1/query/")
    suspend fun executeQuery(@Header("Authorization") token: String, @Body payload: QueryPayload): Response<QueryResponse>
}

// A simplified repository class for handling data operations
class SreQueryRepository {

    private val api: IdpGatewayApi

    init {
        val logging = HttpLoggingInterceptor().apply {
            level = HttpLoggingInterceptor.Level.BODY
        }

        val client = OkHttpClient.Builder()
            .addInterceptor(logging)
            // A short timeout for critical path queries
            .connectTimeout(5, TimeUnit.SECONDS)
            .readTimeout(20, TimeUnit.SECONDS)
            .build()

        val retrofit = Retrofit.Builder()
            .baseUrl("http://your-gateway-host:8080/")
            .client(client)
            .addConverterFactory(GsonConverterFactory.create())
            .build()

        api = retrofit.create(IdpGatewayApi::class.java)
    }

    // Example of a focused, pre-canned query for SREs
    suspend fun getP0ErrorCountForService(serviceName: String, jwtToken: String): Result<Long> {
        // This is a pre-defined, optimized query. It's not ad-hoc.
        val sql = """
            SELECT count(*) 
            FROM logs 
            WHERE service = '$serviceName' 
            AND level = 'ERROR' 
            AND error_code LIKE 'P0-%'
            AND event_timestamp > now() - interval '5' minute
        """.trimIndent()

        val payload = QueryPayload(query = sql)
        val bearerToken = "Bearer $jwtToken"

        return try {
            val response = api.executeQuery(bearerToken, payload)
            if (response.isSuccessful && response.body()?.status == "success") {
                val count = (response.body()?.data?.rows?.get(0)?.get(0) as? Double)?.toLong() ?: 0L
                Result.success(count)
            } else {
                // In a real app, parse the error body for a meaningful message.
                Result.failure(IOException("API Error: ${response.code()}"))
            }
        } catch (e: Exception) {
            Result.failure(e)
        }
    }
}

专用客户端实现:Web 前端与测试

Web 前端是一个复杂的仪表盘,允许用户构建任意查询。这种复杂性使得组件的可维护性和测试变得至关重要。

使用 CSS Modules 保证样式隔离

在一个大型 IDP 中,多个团队可能贡献不同的前端模块。使用 CSS Modules 是避免样式冲突的有效手段。

QueryBuilder.module.css:

.queryBuilder {
  border: 1px solid #333;
  padding: 16px;
  background-color: #1e1e1e;
  border-radius: 4px;
}

.filterGroup {
  margin-top: 12px;
  padding-left: 16px;
  border-left: 2px solid #4a4a4a;
}

.addButton {
  background-color: #007acc;
  color: white;
  border: none;
  padding: 8px 12px;
  cursor: pointer;
}

.addButton:hover {
  background-color: #005f9e;
}

React 组件中使用:

import React from 'react';
import styles from './QueryBuilder.module.css';

const QueryBuilder = () => {
  // ... component logic
  return (
    <div className={styles.queryBuilder}>
      {/* ... */}
      <div className={styles.filterGroup}>
        {/* ... filter rows */}
      </div>
      <button className={styles.addButton}>Add Filter</button>
    </div>
  );
};

使用 Jest 对复杂组件进行单元测试

QueryBuilder 组件的逻辑可能非常复杂,包含状态管理、用户交互和 API 调用。Jest 测试是保证其质量的关键。

QueryBuilder.test.js:

import React from 'react';
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import '@testing-library/jest-dom';
import { QueryBuilder } from './QueryBuilder';
import { api } from './api'; // Assuming an api module for fetching data

// Mock the API module to avoid actual network requests
jest.mock('./api');

describe('QueryBuilder', () => {
    
    beforeEach(() => {
        // Reset mocks before each test
        api.executeQuery.mockClear();
    });

    test('should render initial state correctly', () => {
        render(<QueryBuilder />);
        expect(screen.getByText('SELECT')).toBeInTheDocument();
        expect(screen.getByRole('button', { name: /Add Filter/i })).toBeInTheDocument();
    });

    test('should add a new filter row when "Add Filter" is clicked', () => {
        render(<QueryBuilder />);
        const addButton = screen.getByRole('button', { name: /Add Filter/i });
        fireEvent.click(addButton);

        // Expect two filter rows now (assuming one is there by default)
        const filterFields = screen.getAllByPlaceholderText(/field/i);
        expect(filterFields).toHaveLength(2);
    });

    test('should update query state when user types in a filter', () => {
        render(<QueryBuilder />);
        const fieldInput = screen.getByPlaceholderText(/field/i);
        const valueInput = screen.getByPlaceholderText(/value/i);

        fireEvent.change(fieldInput, { target: { value: 'service_name' } });
        fireEvent.change(valueInput, { target: { value: 'api-gateway' } });
        
        // This test would be more robust if we could inspect component state
        // or if there was a visible output of the generated query.
        // For example, if a text area showed the query:
        // expect(screen.getByTestId('query-output')).toHaveValue(
        //   "SELECT * FROM logs WHERE service_name = 'api-gateway'"
        // );
    });

    test('should call api.executeQuery with the correct query on submit', async () => {
        api.executeQuery.mockResolvedValue({ 
            data: { columns: ['col1'], rows: [['val1']] } 
        });

        render(<QueryBuilder />);

        // Simulate user input
        fireEvent.change(screen.getByPlaceholderText(/field/i), { target: { value: 'status' } });
        fireEvent.change(screen.getByPlaceholderText(/value/i), { target: { value: '500' } });

        const submitButton = screen.getByRole('button', { name: /Run Query/i });
        fireEvent.click(submitButton);

        await waitFor(() => {
            expect(api.executeQuery).toHaveBeenCalledTimes(1);
            // Verify the payload sent to the API is correct
            expect(api.executeQuery).toHaveBeenCalledWith(
                expect.stringContaining("WHERE status = '500'")
            );
        });

        // Verify that results are rendered
        expect(screen.getByText('col1')).toBeInTheDocument();
        expect(screen.getByText('val1')).toBeInTheDocument();
    });
});

架构的扩展性与局限性

我们选择的网关架构为未来提供了良好的扩展路径。例如,我们可以轻松地在网关层添加缓存逻辑,对高频查询进行结果缓存,从而降低数据湖的负载。同样,也可以引入一个新的查询引擎(如 ClickHouse)用于实时分析,网关可以根据请求特征将其路由到最合适的引擎,而对客户端保持透明。

但这个架构并非没有局限。首先,API 网关自身成为了一个关键的单点,必须通过多副本部署和负载均衡来确保其高可用性。其次,API 契约的管理变得至关重要,任何不兼容的变更都会影响到所有的客户端。最后,这个方案优化的是查询的“访问层”,它并不能解决数据湖本身存在的慢查询问题。如果底层数据模型设计不佳或缺乏适当的索引、分区,查询性能依然会很差。下一步的优化路径必然要深入到数据湖内部,考虑引入物化视图、预计算聚合结果,甚至部署一个真正的 OLAP 数据库作为高性能查询的补充。


  目录