BigQuery MCP 服务器:实现数据集前缀过滤

BigQuery MCP 服务器:实现数据集前缀过滤

使用 TypeScript 构建 BigQuery MCP 服务器,通过数据集前缀过滤控制 AI 代理的数据访问权限。

概述

Model Context Protocol(MCP,模型上下文协议)是一个专为 AI 代理与外部系统安全交互而设计的标准协议。本文将介绍如何将 BigQuery 集成为 MCP 服务器,并实现数据集前缀过滤功能,以控制 AI 代理的数据访问范围。

为什么需要数据集前缀过滤?

在大规模 BigQuery 环境中,可能存在数百个数据集。将所有数据集暴露给 AI 代理会带来以下问题:

  • 安全风险:对敏感数据授予不必要的访问权限
  • 性能下降:由于加载大量元数据导致的响应延迟
  • 可用性问题:不必要的信息导致上下文污染

通过数据集前缀过滤,只暴露具有特定前缀(例如:analytics_marketing_)的数据集,可以有效解决这些问题。

MCP 服务器基础架构

MCP 架构

MCP 基于客户端-服务器架构,使用 JSON-RPC 2.0 协议进行通信。

graph LR
    A[AI 代理<br/>Claude, GPT] --> B[MCP 客户端]
    B --> C[MCP 服务器]
    C --> D[BigQuery API]
    D --> E[Google Cloud<br/>数据集]

    style A fill:#e1f5ff
    style C fill:#fff4e1
    style E fill:#f0f0f0

核心组件

MCP 服务器由以下三个核心要素组成:

  1. Server(服务器):实现 MCP 协议并管理客户端连接
  2. Tools(工具):定义 AI 代理可以调用的函数
  3. Transport(传输层):通信渠道(stdio、SSE 等)
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

// 创建 MCP 服务器实例
const server = new Server(
  {
    name: "bigquery-mcp-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {}, // 启用工具功能
    },
  }
);

// 使用 Stdio 传输层进行连接
const transport = new StdioServerTransport();
await server.connect(transport);

BigQuery 客户端集成

认证配置

要使用 BigQuery API,需要配置 Google Cloud 服务账号认证。

import { BigQuery } from "@google-cloud/bigquery";

// 从环境变量加载项目 ID 和认证信息
const projectId = process.env.GOOGLE_CLOUD_PROJECT;
const keyFilename = process.env.GOOGLE_APPLICATION_CREDENTIALS;

// 初始化 BigQuery 客户端
const bigquery = new BigQuery({
  projectId,
  keyFilename, // 服务账号 JSON 密钥文件路径
});

环境变量设置

.env 文件中配置以下信息:

GOOGLE_CLOUD_PROJECT=your-project-id
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json
DATASET_PREFIX=analytics_  # 要过滤的前缀

基本 API 操作

BigQuery 客户端支持以下基本操作:

// 查询数据集列表
async function listDatasets() {
  const [datasets] = await bigquery.getDatasets();
  return datasets.map(dataset => dataset.id);
}

// 查询表架构
async function getTableSchema(datasetId: string, tableId: string) {
  const [metadata] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .getMetadata();

  return metadata.schema.fields;
}

// 执行查询
async function executeQuery(query: string) {
  const [rows] = await bigquery.query({ query });
  return rows;
}

实现数据集前缀过滤

过滤逻辑

数据集前缀过滤在两个级别实现:

  1. 数据集列表过滤:从 API 响应中只返回与前缀匹配的数据集
  2. 访问控制:阻止对不匹配前缀的数据集的直接访问
// 从环境变量加载前缀配置
const DATASET_PREFIX = process.env.DATASET_PREFIX || "";

// 检查数据集 ID 是否以允许的前缀开头
function isAllowedDataset(datasetId: string): boolean {
  if (!DATASET_PREFIX) {
    return true; // 如果未设置前缀,则允许所有数据集
  }
  return datasetId.startsWith(DATASET_PREFIX);
}

// 返回过滤后的数据集列表
async function getFilteredDatasets() {
  const [datasets] = await bigquery.getDatasets();

  return datasets
    .map(dataset => dataset.id!)
    .filter(isAllowedDataset)
    .sort();
}

// 验证数据集访问权限
function validateDatasetAccess(datasetId: string): void {
  if (!isAllowedDataset(datasetId)) {
    throw new Error(
      `Access denied: Dataset '${datasetId}' does not match required prefix '${DATASET_PREFIX}'`
    );
  }
}

安全增强

在所有工具中一致地应用访问控制:

// 查询表列表时进行访问验证
async function listTables(datasetId: string) {
  validateDatasetAccess(datasetId); // 检查访问权限

  const [tables] = await bigquery.dataset(datasetId).getTables();
  return tables.map(table => ({
    tableId: table.id,
    type: table.metadata.type,
  }));
}

// 执行查询时提取并验证数据集
async function executeQuery(query: string) {
  // 从查询中提取引用的数据集
  const referencedDatasets = extractDatasetsFromQuery(query);

  // 检查所有引用的数据集的访问权限
  for (const datasetId of referencedDatasets) {
    validateDatasetAccess(datasetId);
  }

  const [rows] = await bigquery.query({ query });
  return rows;
}

// 从查询中提取数据集(使用正则表达式)
function extractDatasetsFromQuery(query: string): string[] {
  // 匹配 `project.dataset.table` 或 `dataset.table` 模式
  const pattern = /(?:FROM|JOIN)\s+(?:`?(?:\w+\.)?(\w+)\.\w+`?)/gi;
  const matches = [...query.matchAll(pattern)];
  return [...new Set(matches.map(m => m[1]))];
}

MCP 工具设计

工具定义原则

有效的 MCP 工具遵循以下原则:

  1. 单一职责:每个工具执行一个明确的任务
  2. 清晰的输入输出:使用 JSON Schema 定义参数和返回值
  3. 错误处理:提供清晰的错误消息
  4. 文档化:包含工具说明和示例

工具列表

BigQuery MCP 服务器提供以下四个工具:

1. list_datasets

返回过滤后的数据集列表。

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "list_datasets",
      description: `List BigQuery datasets${DATASET_PREFIX ? ` with prefix '${DATASET_PREFIX}'` : ''}`,
      inputSchema: {
        type: "object",
        properties: {},
        required: [],
      },
    },
    // ... 其他工具
  ],
}));

2. list_tables

返回特定数据集的表列表。

{
  name: "list_tables",
  description: "List tables in a BigQuery dataset",
  inputSchema: {
    type: "object",
    properties: {
      datasetId: {
        type: "string",
        description: "Dataset ID to list tables from",
      },
    },
    required: ["datasetId"],
  },
}

3. get_schema

返回表的架构信息。

{
  name: "get_schema",
  description: "Get schema information for a BigQuery table",
  inputSchema: {
    type: "object",
    properties: {
      datasetId: {
        type: "string",
        description: "Dataset ID",
      },
      tableId: {
        type: "string",
        description: "Table ID",
      },
    },
    required: ["datasetId", "tableId"],
  },
}

4. execute_query

执行 BigQuery SQL 查询。

{
  name: "execute_query",
  description: "Execute a BigQuery SQL query",
  inputSchema: {
    type: "object",
    properties: {
      query: {
        type: "string",
        description: "SQL query to execute",
      },
      maxResults: {
        type: "number",
        description: "Maximum number of results to return (default: 100)",
        default: 100,
      },
    },
    required: ["query"],
  },
}

实战实现

完整的 MCP 服务器代码

以下是应用了数据集前缀过滤的完整 BigQuery MCP 服务器实现:

#!/usr/bin/env node

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { BigQuery } from "@google-cloud/bigquery";
import dotenv from "dotenv";

// 加载环境变量
dotenv.config();

// 初始化 BigQuery 客户端
const projectId = process.env.GOOGLE_CLOUD_PROJECT;
const DATASET_PREFIX = process.env.DATASET_PREFIX || "";

if (!projectId) {
  throw new Error("GOOGLE_CLOUD_PROJECT environment variable is required");
}

const bigquery = new BigQuery({
  projectId,
  keyFilename: process.env.GOOGLE_APPLICATION_CREDENTIALS,
});

// 数据集访问验证函数
function isAllowedDataset(datasetId: string): boolean {
  if (!DATASET_PREFIX) return true;
  return datasetId.startsWith(DATASET_PREFIX);
}

function validateDatasetAccess(datasetId: string): void {
  if (!isAllowedDataset(datasetId)) {
    throw new Error(
      `Access denied: Dataset '${datasetId}' does not match required prefix '${DATASET_PREFIX}'`
    );
  }
}

// 从查询中提取数据集
function extractDatasetsFromQuery(query: string): string[] {
  const pattern = /(?:FROM|JOIN)\s+(?:`?(?:\w+\.)?(\w+)\.\w+`?)/gi;
  const matches = [...query.matchAll(pattern)];
  return [...new Set(matches.map(m => m[1]))];
}

// 创建 MCP 服务器
const server = new Server(
  {
    name: "bigquery-mcp-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// 工具列表处理器
server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "list_datasets",
      description: `List BigQuery datasets${DATASET_PREFIX ? ` with prefix '${DATASET_PREFIX}'` : ''}`,
      inputSchema: {
        type: "object",
        properties: {},
        required: [],
      },
    },
    {
      name: "list_tables",
      description: "List tables in a BigQuery dataset",
      inputSchema: {
        type: "object",
        properties: {
          datasetId: {
            type: "string",
            description: "Dataset ID to list tables from",
          },
        },
        required: ["datasetId"],
      },
    },
    {
      name: "get_schema",
      description: "Get schema information for a BigQuery table",
      inputSchema: {
        type: "object",
        properties: {
          datasetId: {
            type: "string",
            description: "Dataset ID",
          },
          tableId: {
            type: "string",
            description: "Table ID",
          },
        },
        required: ["datasetId", "tableId"],
      },
    },
    {
      name: "execute_query",
      description: "Execute a BigQuery SQL query",
      inputSchema: {
        type: "object",
        properties: {
          query: {
            type: "string",
            description: "SQL query to execute",
          },
          maxResults: {
            type: "number",
            description: "Maximum number of results to return",
            default: 100,
          },
        },
        required: ["query"],
      },
    },
  ],
}));

// 工具调用处理器
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  try {
    switch (name) {
      case "list_datasets": {
        const [datasets] = await bigquery.getDatasets();
        const filtered = datasets
          .map(d => d.id!)
          .filter(isAllowedDataset)
          .sort();

        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(filtered, null, 2),
            },
          ],
        };
      }

      case "list_tables": {
        const { datasetId } = args as { datasetId: string };
        validateDatasetAccess(datasetId);

        const [tables] = await bigquery.dataset(datasetId).getTables();
        const tableList = tables.map(t => ({
          tableId: t.id,
          type: t.metadata.type,
        }));

        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(tableList, null, 2),
            },
          ],
        };
      }

      case "get_schema": {
        const { datasetId, tableId } = args as {
          datasetId: string;
          tableId: string;
        };
        validateDatasetAccess(datasetId);

        const [metadata] = await bigquery
          .dataset(datasetId)
          .table(tableId)
          .getMetadata();

        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(metadata.schema.fields, null, 2),
            },
          ],
        };
      }

      case "execute_query": {
        const { query, maxResults = 100 } = args as {
          query: string;
          maxResults?: number;
        };

        // 验证查询中引用的数据集
        const referencedDatasets = extractDatasetsFromQuery(query);
        for (const datasetId of referencedDatasets) {
          validateDatasetAccess(datasetId);
        }

        const [rows] = await bigquery.query({
          query,
          maxResults,
        });

        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(rows, null, 2),
            },
          ],
        };
      }

      default:
        throw new Error(`Unknown tool: ${name}`);
    }
  } catch (error) {
    return {
      content: [
        {
          type: "text",
          text: `Error: ${error instanceof Error ? error.message : String(error)}`,
        },
      ],
      isError: true,
    };
  }
});

// 启动服务器
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("BigQuery MCP server running on stdio");
}

main().catch((error) => {
  console.error("Fatal error:", error);
  process.exit(1);
});

package.json 配置

{
  "name": "bigquery-mcp-server",
  "version": "1.0.0",
  "type": "module",
  "bin": {
    "bigquery-mcp-server": "./build/index.js"
  },
  "scripts": {
    "build": "tsc",
    "start": "node build/index.js",
    "dev": "tsx src/index.ts"
  },
  "dependencies": {
    "@google-cloud/bigquery": "^7.9.0",
    "@modelcontextprotocol/sdk": "^0.6.0",
    "dotenv": "^16.4.5"
  },
  "devDependencies": {
    "@types/node": "^22.10.5",
    "tsx": "^4.19.2",
    "typescript": "^5.7.3"
  }
}

TypeScript 配置

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "Node16",
    "outDir": "./build",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "resolveJsonModule": true,
    "declaration": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules", "build"]
}

安全性与优化

安全最佳实践

1. 保护认证信息

// ❌ 硬编码的认证信息(绝对禁止)
const bigquery = new BigQuery({
  projectId: "my-project",
  credentials: {
    client_email: "service@project.iam.gserviceaccount.com",
    private_key: "-----BEGIN PRIVATE KEY-----\n...",
  },
});

// ✅ 使用环境变量
const bigquery = new BigQuery({
  projectId: process.env.GOOGLE_CLOUD_PROJECT,
  keyFilename: process.env.GOOGLE_APPLICATION_CREDENTIALS,
});

2. 最小权限原则

仅授予服务账号必需的最小权限:

# BigQuery Data Viewer(只读)
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" \
  --role="roles/bigquery.dataViewer"

# BigQuery Job User(查询执行)
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" \
  --role="roles/bigquery.jobUser"

3. 防止 SQL 注入

使用 BigQuery 参数化查询:

// ❌ 字符串拼接(SQL 注入风险)
const query = `SELECT * FROM dataset.table WHERE id = '${userId}'`;

// ✅ 参数化查询
const query = {
  query: "SELECT * FROM `dataset.table` WHERE id = @userId",
  params: { userId: userId },
};

const [rows] = await bigquery.query(query);

性能优化

1. 结果缓存

虽然 BigQuery 默认会缓存查询结果 24 小时,但也可以在 MCP 服务器层实现缓存:

import { LRUCache } from "lru-cache";

// 元数据缓存(最多 100 个条目,1 小时 TTL)
const schemaCache = new LRUCache<string, any>({
  max: 100,
  ttl: 1000 * 60 * 60, // 1 小时
});

async function getCachedSchema(datasetId: string, tableId: string) {
  const cacheKey = `${datasetId}.${tableId}`;

  // 从缓存查询
  let schema = schemaCache.get(cacheKey);

  if (!schema) {
    // 缓存未命中:调用 API
    const [metadata] = await bigquery
      .dataset(datasetId)
      .table(tableId)
      .getMetadata();

    schema = metadata.schema.fields;
    schemaCache.set(cacheKey, schema);
  }

  return schema;
}

2. 查询优化

// 结果限制和超时设置
async function executeOptimizedQuery(query: string, maxResults = 100) {
  const options = {
    query,
    maxResults,
    timeoutMs: 30000, // 30 秒超时
    useLegacySql: false, // 使用标准 SQL
    useQueryCache: true, // 启用查询缓存
  };

  const [rows] = await bigquery.query(options);
  return rows;
}

3. 并行处理

在查询多个数据集的表列表时使用并行处理:

async function listAllTables(datasetIds: string[]) {
  // 并行查询所有数据集的表
  const tablePromises = datasetIds.map(async (datasetId) => {
    const [tables] = await bigquery.dataset(datasetId).getTables();
    return {
      datasetId,
      tables: tables.map(t => t.id),
    };
  });

  return await Promise.all(tablePromises);
}

错误处理

1. 清晰的错误消息

function handleBigQueryError(error: any): string {
  if (error.code === 404) {
    return "Resource not found. Please check dataset/table name.";
  }

  if (error.code === 403) {
    return "Access denied. Please check your permissions.";
  }

  if (error.message?.includes("Syntax error")) {
    return `SQL syntax error: ${error.message}`;
  }

  return `BigQuery error: ${error.message || String(error)}`;
}

2. 重试逻辑

针对暂时性网络错误的重试机制:

async function retryableQuery(
  query: string,
  maxRetries = 3,
  delayMs = 1000
) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await bigquery.query({ query });
    } catch (error: any) {
      // 检查是否为可重试错误
      const isRetryable =
        error.code === 500 ||
        error.code === 503 ||
        error.message?.includes("timeout");

      if (!isRetryable || attempt === maxRetries) {
        throw error;
      }

      // 指数退避等待
      await new Promise(resolve =>
        setTimeout(resolve, delayMs * Math.pow(2, attempt - 1))
      );
    }
  }
}

运行与测试

构建和运行

# 1. 安装依赖
npm install

# 2. TypeScript 构建
npm run build

# 3. 设置环境变量
cat > .env << EOF
GOOGLE_CLOUD_PROJECT=your-project-id
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
DATASET_PREFIX=analytics_
EOF

# 4. 开发模式运行
npm run dev

# 5. 生产模式运行
npm start

Claude Desktop 集成

要将 MCP 服务器连接到 Claude Desktop,需要修改配置文件:

macOS~/Library/Application Support/Claude/claude_desktop_config.json

Windows%APPDATA%\Claude\claude_desktop_config.json

{
  "mcpServers": {
    "bigquery": {
      "command": "node",
      "args": [
        "/absolute/path/to/bigquery-mcp-server/build/index.js"
      ],
      "env": {
        "GOOGLE_CLOUD_PROJECT": "your-project-id",
        "GOOGLE_APPLICATION_CREDENTIALS": "/path/to/service-account.json",
        "DATASET_PREFIX": "analytics_"
      }
    }
  }
}

测试场景

1. 查询数据集列表

用户:显示 BigQuery 的数据集列表

Claude:[调用 list_datasets 工具]

结果:
[
  "analytics_events",
  "analytics_users",
  "analytics_sessions"
]

2. 确认表架构

用户:告诉我 analytics_events 数据集中 events 表的架构

Claude:[调用 get_schema 工具]

结果:
[
  {
    "name": "event_date",
    "type": "DATE",
    "mode": "REQUIRED"
  },
  {
    "name": "event_name",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "user_id",
    "type": "STRING",
    "mode": "NULLABLE"
  }
]

3. 执行查询

用户:统计今天发生的事件数量

Claude:[调用 execute_query 工具]

查询:
SELECT event_name, COUNT(*) as count
FROM `analytics_events.events`
WHERE event_date = CURRENT_DATE()
GROUP BY event_name
ORDER BY count DESC
LIMIT 10

4. 访问控制测试

用户:显示 finance_data 数据集的表

Claude:[调用 list_tables 工具]

错误:Access denied: Dataset 'finance_data' does not match
required prefix 'analytics_'

结论

通过实现应用了数据集前缀过滤的 BigQuery MCP 服务器,可以获得以下优势:

核心成果

  1. 安全增强:明确限制 AI 代理的数据访问范围
  2. 性能提升:通过最小化不必要的元数据加载来改善响应速度
  3. 可用性改进:仅暴露相关数据集,提升上下文质量
  4. 灵活配置:通过环境变量轻松更改前缀

扩展可能性

基于此实现,可以添加以下功能:

  • 多前缀支持:允许多个前缀模式(例如:analytics_*marketing_*
  • 基于角色的访问控制(RBAC):为不同用户/组应用不同前缀
  • 审计日志:记录和监控所有查询执行
  • 成本跟踪:衡量每个查询的 BigQuery 使用成本
  • 查询模板:将常用查询模式作为资源提供

后续步骤

  1. 生产部署:Docker 容器化和 Kubernetes 部署
  2. 监控设置:使用 Prometheus/Grafana 收集性能指标
  3. 集成测试:为各种场景编写自动化测试
  4. 文档编写:创建 API 文档和用户指南

使用 MCP 的 BigQuery 集成为 AI 代理安全高效地分析数据提供了基础。数据集前缀过滤是显著提升此类集成的安全性和性能的核心功能。

参考资料

官方文档

相关资源

社区

阅读其他语言版本

这篇文章有帮助吗?

您的支持能帮助我创作更好的内容。请我喝杯咖啡吧!☕

关于作者

JK

Kim Jangwook

AI/LLM专业全栈开发者

凭借10年以上的Web开发经验,构建AI代理系统、LLM应用程序和自动化解决方案。分享Claude Code、MCP和RAG系统的实践经验。