构建基于网关与Delta Lake的LLM应用端到端身份感知数据访问架构


在构建企业级检索增强生成(RAG)应用时,一个无法回避的核心问题是:如何在一个共享的、包含敏感信息的数据源(如Delta Lake)之上,为不同身份、不同权限的用户提供安全、隔离的访问。将所有数据访问权限赋予LangChain应用本身,再由应用逻辑来过滤数据,这种做法在生产环境中是极其危险且不可审计的。

我们面临的具体挑战是:设计一个架构,能够将用户的身份从系统的最外层(API网关)一直传递到最内层(Delta Lake),并在数据查询的最后一刻强制执行最小权限原则。这意味着,LangChain应用本身不应持有任何高权限凭证,它仅仅是执行由上游授权的、范围受限的数据操作的代理。

方案A:应用层权限控制的陷阱

一个直接的想法是在LangChain应用内部实现权限逻辑。

graph TD
    User -- "携带用户ID" --> A[API Gateway]
    A -- "透传用户ID" --> B[LangChain Service]
    B -- "1. 验证用户ID\n2. 查询IAM获取权限\n3. 根据权限过滤数据" --> C{Delta Lake}
    subgraph LangChain Service
        direction LR
        D[用户认证] --> E[权限查询模块]
        E --> F[业务逻辑]
    end
    C -- "返回全量或大量数据" --> B

实现思路:

  1. API网关简单地将请求和用户标识(例如,用户ID)转发给后端的LangChain服务。
  2. LangChain服务接收到请求后,需要包含一个专门的模块。该模块根据用户ID,主动查询一个外部的身份与访问管理(IAM)系统,获取该用户的角色和权限策略。
  3. 在构建对Delta Lake的查询时,业务逻辑代码负责解析这些权限策略,并手动将它们转换为SQL的 WHERE 子句。
  4. LangChain服务使用一个拥有对整个Delta Lake表读权限的、统一的服务账户(Service Account)来执行查询。

优劣分析:

  • 优点:
    • 逻辑内聚:所有与权限相关的逻辑都集中在应用服务内部,对于小型项目来说,开发初期可能比较直观。
  • 缺点:
    • 违背最小权限原则: LangChain服务本身持有过高的权限。一旦该服务存在漏洞(例如,通过巧妙的Prompt注入绕过了业务逻辑的过滤),攻击者就可能访问到Delta Lake中的所有数据。
    • 审计困难: 从Delta Lake的审计日志来看,所有的数据访问都来自于同一个服务账户。无法追溯到是哪个终端用户触发了特定的数据查询,这在合规和安全审计上是致命的缺陷。
    • 职责不清与代码耦合: 业务逻辑与安全逻辑紧密耦合。每次权限模型变更,都可能需要修改核心业务代码,增加了维护成本和出错的风险。
    • 性能瓶颈: 每次请求都需要应用服务主动查询IAM系统,这会引入额外的网络延迟。

在真实项目中,这种模式很快就会暴露出其脆弱性。一个常见的错误是,开发人员为了修复一个紧急的业务bug,可能会无意中注释掉一段权限检查代码,从而打开一个巨大的安全漏洞。

方案B:基于JWT声明的策略下放与强制执行

这个方案的核心思想是转变信任模型:我们不再信任LangChain应用,而是将其视为一个“不可信”的执行环境。安全边界前移到API网关,由网关负责认证和授权,并将一个临时的、范围受限的“能力凭证”下发给下游服务。JSON Web Tokens (JWT) 是承载这个凭证的理想选择。

graph TD
    User -- "携带身份Token (如OIDC Token)" --> A[API Gateway]
    A -- "1. 验证用户Token\n2. 查询IAM获取策略\n3. 生成内部Scoped JWT" --> IAM[IAM/Policy Service]
    IAM -- "返回用户数据权限\n(e.g., department: 'finance')" --> A
    A -- "Proxy Request with\n'Authorization-Internal' header\n(containing Scoped JWT)" --> B[LangChain Service]
    B -- "1. 验证Scoped JWT\n2. 解析Claims\n3. 构建带权限的查询" --> DAL[Secure Data Access Layer]
    DAL -- "SELECT * FROM docs WHERE ...\nAND department = 'finance'" --> C{Delta Lake}
    C -- "返回过滤后的数据" --> DAL

实现思路:

  1. API网关(策略执行点):

    • 用户使用其主身份凭证(如OIDC Id Token)请求网关。
    • 网关验证该凭证的有效性。
    • 验证通过后,网关作为特权客户端,向IAM服务查询该用户的具体数据访问策略(例如,{ "table": "financial_reports", "filter": "department = 'finance' AND region = 'emea'" })。
    • 网关根据查询到的策略,生成一个全新的、短生命周期的内部JWT(Scoped JWT)。这个JWT的Claims中包含了用户被授权访问的数据范围。
    • 网关将原始请求代理到LangChain服务,并将这个内部JWT通过一个特定的HTTP Header(如 Authorization-Internal)传递过去。
  2. LangChain服务(不可信执行者):

    • 服务启动时加载用于验证内部JWT的公钥。
    • 对于每个请求,服务从Header中提取内部JWT,并使用公钥验证其签名和时效性。
    • 验证通过后,解析JWT的Claims,获得授权的数据过滤器。
    • 服务在执行任何数据操作时,必须将这些过滤器应用到查询中。它自身不持有任何数据库的长期凭证。
  3. 安全数据访问层(强制执行):

    • 在LangChain应用中,创建一个专门的层来与Delta Lake交互。这个层的核心职责是接收业务查询和从JWT解析出的安全过滤器,然后将两者安全地合并成最终的SQL查询。

最终选择与理由:

我们坚定地选择方案B。尽管其实现复杂度更高,但它提供了一个在架构层面就确保安全的模型。

  • 安全可信边界清晰: 安全的边界在API网关。LangChain应用被降级为一个受限的沙箱环境,即使其自身代码被攻破,损害也被限制在JWT所允许的极小范围内。
  • 职责分离: 网关负责“认证”和“授权”,LangChain应用负责“业务逻辑”,数据访问层负责“策略强制执行”。各司其职,易于维护和审计。
  • 可审计性: 对Delta Lake的访问可以配置为需要特定的、由JWT动态生成的短期凭证,或者在应用层日志中记录下JWT的唯一标识(jti),从而将每一次数据查询都精确关联到终端用户和其当时被授予的临时权限。

核心实现概览

我们将使用Python生态来实现这个架构。API网关使用FastAPI,LangChain服务也使用Python,数据访问将通过pyspark与Delta Lake交互。

1. API网关:策略注入与JWT生成

假设我们有一个IAM服务,可以通过用户ID获取其数据策略。

gateway/main.py

import os
import time
import jwt
import httpx
from fastapi import FastAPI, Request, HTTPException, Response
from fastapi.responses import StreamingResponse

# --- 配置 ---
# 在生产环境中,这些应该来自环境变量或配置服务
# 用于签发内部JWT的私钥
JWT_PRIVATE_KEY = os.environ.get("INTERNAL_JWT_PRIVATE_KEY")
JWT_ALGORITHM = "RS256"
LANGCHAIN_SERVICE_URL = "http://langchain-app:8001"
IAM_SERVICE_URL = "http://iam-service:9000"

app = FastAPI()
http_client = httpx.AsyncClient()

# 伪IAM服务客户端
async def get_user_policy_from_iam(user_id: str) -> dict:
    """
    模拟查询IAM服务以获取用户的数据访问策略。
    真实场景下,这里会有认证和复杂的策略逻辑。
    """
    try:
        response = await http_client.get(f"{IAM_SERVICE_URL}/policy/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        # 记录错误日志
        print(f"Error fetching policy for user {user_id}: {e}")
        raise HTTPException(status_code=403, detail="Could not retrieve user policy.")
    except Exception as e:
        print(f"Unexpected error with IAM service: {e}")
        raise HTTPException(status_code=500, detail="Internal error with IAM service.")


def create_internal_jwt(user_id: str, policy: dict) -> str:
    """根据用户策略生成范围受限的内部JWT。"""
    payload = {
        "iss": "api-gateway",
        "aud": "langchain-service",
        "sub": user_id,
        "exp": int(time.time()) + 300,  # 5分钟有效期
        "iat": int(time.time()),
        "jti": os.urandom(16).hex(), # JWT唯一标识,用于日志追踪
        "policy": policy, # 核心:将权限策略注入JWT
    }
    encoded_jwt = jwt.encode(payload, JWT_PRIVATE_KEY, algorithm=JWT_ALGORITHM)
    return encoded_jwt

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(request: Request, path: str):
    """
    代理所有请求到LangChain服务,并注入内部JWT。
    """
    # 1. 认证外部用户
    # 真实场景: 从 Authorization header 中解析 OIDC/OAuth2 token 并验证
    # 这里为了演示,我们从一个自定义header获取用户ID
    user_id = request.headers.get("X-User-Id")
    if not user_id:
        raise HTTPException(status_code=401, detail="Missing X-User-Id header for authentication.")

    # 2. 从IAM获取策略
    policy = await get_user_policy_from_iam(user_id)
    if not policy.get("allow", False):
         raise HTTPException(status_code=403, detail="User is not allowed access.")

    # 3. 生成内部JWT
    internal_token = create_internal_jwt(user_id, policy)

    # 4. 构造代理请求
    langchain_url = f"{LANGCHAIN_SERVICE_URL}/{path}"
    
    headers = dict(request.headers)
    # 移除主机头,由httpx处理
    headers.pop("host", None) 
    # 注入内部token,覆盖可能存在的外部Authorization头
    headers["Authorization-Internal"] = f"Bearer {internal_token}"
    
    # 流式处理请求体
    req = http_client.build_request(
        method=request.method,
        url=langchain_url,
        headers=headers,
        params=request.query_params,
        content=request.stream(),
    )

    try:
        # 5. 发送请求并流式返回响应
        resp = await http_client.send(req, stream=True)
        return StreamingResponse(
            resp.aiter_raw(),
            status_code=resp.status_code,
            headers=resp.headers,
        )
    except httpx.ConnectError:
        raise HTTPException(status_code=503, detail="LangChain service is unavailable.")

if __name__ == "__main__":
    # 在启动前确保密钥已设置
    if not JWT_PRIVATE_KEY:
        raise ValueError("INTERNAL_JWT_PRIVATE_KEY environment variable not set.")
    # In a real app, use uvicorn to run this.
    # uvicorn gateway.main:app --host 0.0.0.0 --port 8000
    pass

2. LangChain应用:JWT验证与安全数据访问层

LangChain应用现在必须从信任外部输入转变为只信任内部JWT的声明。

langchain_app/security.py

import os
import jwt
from functools import wraps
from fastapi import Request, HTTPException

# --- 配置 ---
# 用于验证内部JWT的公钥
JWT_PUBLIC_KEY = os.environ.get("INTERNAL_JWT_PUBLIC_KEY")
JWT_ALGORITHM = "RS256"
JWT_AUDIENCE = "langchain-service"

class AuthContext:
    """用于在请求上下文中传递已验证的权限策略。"""
    def __init__(self, claims: dict):
        self.user_id: str = claims.get("sub")
        self.policy: dict = claims.get("policy", {})
        self.jwt_id: str = claims.get("jti")

    def get_delta_filter_clause(self, table_alias: str = None) -> str:
        """
        根据策略动态生成SQL WHERE子句。
        这是将声明转化为数据层强制执行策略的关键。
        """
        filter_str = self.policy.get("delta_filter")
        if not filter_str:
            # 默认策略:如果JWT中没有明确的过滤器,则拒绝所有查询
            # 这是一个安全默认值,防止配置错误导致数据泄露
            return "1 = 0" 
            
        # 简单的占位符替换,防止SQL注入
        # 生产级代码应使用更复杂的AST解析或验证
        if table_alias:
            return filter_str.replace("{TABLE_ALIAS}", f"{table_alias}.")
        else:
            # 确保占位符不存在,否则可能导致错误
            if "{TABLE_ALIAS}" in filter_str:
                 raise ValueError("Table alias required but not provided for filter clause.")
            return filter_str


def token_required(request: Request) -> AuthContext:
    """
    FastAPI Dependency: 验证内部JWT并返回AuthContext。
    """
    token_header = request.headers.get("Authorization-Internal")
    if not token_header or not token_header.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Internal token is missing or malformed.")
    
    token = token_header.split(" ")[1]
    
    try:
        decoded_payload = jwt.decode(
            token,
            JWT_PUBLIC_KEY,
            algorithms=[JWT_ALGORITHM],
            audience=JWT_AUDIENCE
        )
        return AuthContext(claims=decoded_payload)
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Internal token has expired.")
    except jwt.InvalidAudienceError:
        raise HTTPException(status_code=401, detail="Invalid token audience.")
    except jwt.PyJWTError as e:
        # 记录具体的JWT错误
        print(f"JWT validation failed: {e}")
        raise HTTPException(status_code=401, detail="Invalid internal token.")

langchain_app/data_retriever.py

from pyspark.sql import SparkSession
from langchain_core.retrievers import BaseRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from typing import List

from .security import AuthContext

class SecureDeltaLakeRetriever(BaseRetriever):
    """
    一个实现了端到端安全的自定义LangChain Retriever。
    """
    spark: SparkSession
    delta_table_path: str
    auth_context: AuthContext
    content_column: str = "content"
    metadata_columns: List[str] = []

    class Config:
        arbitrary_types_allowed = True

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        """
        执行带有安全过滤器的检索。
        这里的核心是 `auth_context.get_delta_filter_clause()`.
        """
        # 1. 从AuthContext获取安全过滤器
        # 这里的`t`是我们在SQL查询中为表设置的别名
        security_filter = self.auth_context.get_delta_filter_clause(table_alias="t")

        # 2. 这里的相似性搜索逻辑被简化了
        # 真实场景中,你会用向量搜索(如ANN)先找到候选ID,
        # 然后再用安全SQL查询过滤这些ID。
        # 为了演示,我们假设基于文本内容进行过滤。
        # 注意:不要将用户输入的`query`直接拼接到SQL中,防止注入。
        # 使用参数化查询。
        
        # 假设我们检索包含查询关键词的文档
        # 这里的 `query` 需要被安全地处理
        escaped_query = query.replace("'", "''")

        # 3. 构建最终的安全查询
        final_sql = f"""
            SELECT
                {self.content_column},
                {', '.join(self.metadata_columns) if self.metadata_columns else '1 as dummy'}
            FROM delta.`{self.delta_table_path}` AS t
            WHERE
                ({security_filter})
                AND {self.content_column} LIKE '%{escaped_query}%'
            LIMIT 10
        """
        
        print(f"Executing secure query for user {self.auth_context.user_id} (JTI: {self.auth_context.jwt_id}):")
        print(final_sql)

        try:
            df = self.spark.sql(final_sql)
            results = df.collect()
        except Exception as e:
            # 记录数据库查询错误
            print(f"Spark SQL query failed: {e}")
            return []

        # 4. 转换为LangChain Document格式
        documents = []
        for row in results:
            metadata = {col: row[col] for col in self.metadata_columns}
            doc = Document(page_content=row[self.content_column], metadata=metadata)
            documents.append(doc)
            
        return documents

langchain_app/main.py

from fastapi import FastAPI, Depends
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pyspark.sql import SparkSession

from .security import AuthContext, token_required
from .data_retriever import SecureDeltaLakeRetriever

# --- 配置 ---
DELTA_LAKE_PATH = "/path/to/your/delta_table"

app = FastAPI()

# 初始化SparkSession。在生产环境中,这应该是一个长期运行的会话。
spark = SparkSession.builder \
    .appName("SecureLangChainApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 初始化LLM
llm = ChatOpenAI(model="gpt-4o")

template = """
Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

@app.post("/chat")
def chat_endpoint(request: dict, auth: AuthContext = Depends(token_required)):
    """
    一个使用安全Retriever的RAG聊天端点。
    """
    question = request.get("question")
    if not question:
        return {"error": "question is required"}, 400

    # 关键步骤:在运行时,用从JWT解析出的AuthContext来实例化Retriever
    secure_retriever = SecureDeltaLakeRetriever(
        spark=spark,
        delta_table_path=DELTA_LAKE_PATH,
        auth_context=auth,
        content_column="text_content",
        metadata_columns=["document_id", "department", "region"]
    )

    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)

    # 构建RAG链
    rag_chain = (
        {"context": secure_retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    # 这里的日志非常重要,记录下是哪个JWT ID触发了本次调用
    print(f"Invoking RAG chain for user {auth.user_id} with JWT ID {auth.jwt_id}")
    
    result = rag_chain.invoke(question)
    
    return {"answer": result}

if __name__ == "__main__":
    # In a real app, use uvicorn to run this.
    # uvicorn langchain_app.main:app --host 0.0.0.0 --port 8001
    pass

架构的扩展性与局限性

这个架构模式为构建安全的企业级LLM应用打下了坚实的基础。

扩展性:

  • 多租户数据隔离: 通过在JWT的policy中定义tenant_id,可以轻松地将此架构扩展到多租户场景,delta_filter会变成 tenant_id = '...' AND ...
  • 列级别安全性: 虽然当前实现是行级别的,但可以通过修改SecureDeltaLakeRetriever来支持列级别安全。JWT可以包含允许访问的列列表,Retriever在构建SELECT语句时动态选择列。
  • 与不同IAM系统集成: API网关是唯一与特定IAM系统交互的组件。更换IAM系统(如Okta, Auth0, 或者内部系统)只需要修改网关的策略获取逻辑,下游服务完全无感。
  • 策略的复杂性: JWT中的delta_filter可以支持更复杂的逻辑,例如 (department = 'finance' OR is_public = true),只要它是有效的SQL WHERE子句。

局限性:

  • JWT大小限制: 如果权限策略非常复杂,生成的JWT可能会变得很大,超出HTTP Header的大小限制。在这种情况下,可以考虑使用“引用令牌(Reference Token)”模式,即JWT中只包含一个策略ID,LangChain服务再用这个ID去一个可信的策略缓存服务中获取完整的策略。
  • 密钥管理: 内部JWT的私钥和公钥管理至关重要。私钥必须安全地存储在API网关中,并且需要有定期的轮换机制。
  • 性能开销: 每次请求都需要进行一次JWT的签名(网关)和验签(应用)。虽然RS256的验签性能很高,但在极高的QPS下,这仍然是一项不可忽视的CPU开销。
  • 数据源依赖: get_delta_filter_clause 方法的实现与数据源是Delta Lake(及其SQL方言)强相关。如果要支持其他数据源(如Elasticsearch),就需要为每种数据源实现一个相应的“策略到查询”的转换器。

  目录