dify学习笔记

Posted by iceyao on Monday, August 26, 2024

1. dify简介

Dify是一个开源的LLM应用开发平台。其直观的界面结合了AI工作流、RAG管道、Agent、模型管理、可观测性功能等,让您可以快速从原型到生产。

2. dify核心功能:

2.1. 强大的AI工作流构建与测试

  • 用户可以直观地设计和构建 AI 工作流,根据不同的业务需求和场景进行定制化操作。
  • 能够对构建的工作流进行全面测试,确保其在实际应用中的稳定性和准确性。

2.2. 全面的模型支持

  • 无缝集成了数百种来自数十个推理提供商和自托管解决方案的专有 / 开源大型语言模型(LLM)。
  • 涵盖了多种不同类型的模型,为用户提供了丰富的选择,以满足不同的任务需求。

2.3. 便捷的提示 IDE

  • 提供了一个集成开发环境(IDE),方便用户编写和优化提示(prompt)。
  • 帮助用户更好地与语言模型进行交互,提高提示的质量和效果。

2.4. 强大的 RAG(检索增强生成)管道

  • 具备高效的检索增强生成功能,能够从大量的文本数据中检索相关信息,并结合语言模型生成更加准确和丰富的内容。
  • 提升了语言模型的输出质量和实用性。

2.5. 灵活的代理能力

  • 允许用户设置代理,以满足不同的网络环境和访问需求。
  • 增强了系统的灵活性和可扩展性。

2.6. 有效的LLMOps

  • 提供了一系列针对语言模型的操作和管理工具,包括模型部署、监控、优化等。
  • 帮助用户更好地管理和维护语言模型的运行,提高工作效率。

2.7. 后端即服务

  • 可以作为后端服务提供给其他应用程序使用,方便集成和扩展。
  • 为开发者提供了便捷的接口和工具,降低了开发成本和难度。

与LangChain、Flowise、OpenAI Assistant API对比:

功能 Dify.AI LangChain Flowise OpenAI Assistant API
编程方法 API + 应用程序导向 Python 代码 应用程序导向 API 导向
支持的 LLMs 丰富多样 丰富多样 丰富多样 仅限 OpenAI
RAG引擎
Agent
工作流
可观测性
企业功能(SSO/访问控制)
本地部署

3. 源码部署

通过源码部署,方便后续代码调试,了解具体实现原理

  1. 克隆dify源码
git clone https://github.com/langgenius/dify.git
  1. docker-compose部署外部依赖PostgresSQL/Redis/Weaviate等
cd dify/docker
cp middleware.env.example middleware.env
docker compose -f docker-compose.middleware.yaml up -d
  1. 准备dify虚拟环境
conda create --name dify python=3.12
conda activate dify
pip install poetry
  1. 启动dify-api/dify-worker服务
# 进入api目录
cd api/
# 拷贝环境变量文件
cp .env.example .env
# 生成随机密钥
openssl rand -base64 42
# 替换环境变量文件中的密钥
sed -i 's/SECRET_KEY=.*/SECRET_KEY=<your_value>/' .env
# 使用poetry安装依赖
poetry install
# 执行数据库迁移到最新版本
flask db upgrade

# 启动dify-api服务 
flask run --host 0.0.0.0 --port=5001 --debug

# 启动dify-worker服务
celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace
  1. 启动dify-web服务
cd web

# 安装依赖
npm install
# 拷贝环境变量文件
cp .env.example .env.local

# 编译
npm run build

# 启动前端服务
npm run start

附上一个vscode launch.json文件:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "dify-api",
            "type": "debugpy",
            "request": "launch",
            "module": "flask",
            "env": {
                "FLASK_APP": "app.py",
                "FLASK_ENV": "development",
                "GEVENT_SUPPORT": "true",
            },
            "args": [
                "run",
                "--host=0.0.0.0",
                "--port=5001",
                "--debug"
            ],
            "cwd": "${workspaceFolder}/api",
            "justMyCode": false
        },
        {
            "name": "dify-worker",
            "type": "debugpy",
            "request": "launch",
            "program": "/opt/anaconda3/envs/dify/bin/celery",
            "env": {
                "GEVENT_SUPPORT": "true",
            },
            "args": [
                "-A",
                "app.celery",
                "worker",
                "-P",
                "gevent",
                "-c",
                "1",
                "--loglevel=INFO",
                "-Q",
                "dataset,generation,mail,ops_trace"
            ],
            "cwd": "${workspaceFolder}/api",
            "justMyCode": false,
            "console": "integratedTerminal"
        },
        {
            "name": "dify-web",
            "type": "node",
            "request": "launch",
            "runtimeExecutable": "/opt/homebrew/bin/pnpm",
            "runtimeArgs": [
                "start"
            ],
            "cwd": "${workspaceFolder}/web",
            "console": "integratedTerminal",
            "internalConsoleOptions": "neverOpen",
        }
    ]
}

如果源码编译dify-web失败的话,可以使用docker容器启动dify-web

docker run -it -d -p 3000:3000 -e CONSOLE_URL=http://127.0.0.1:5001 -e APP_URL=http://127.0.0.1:5001 langgenius/dify-web:main

4. 源码分析

以git commit:da67916843249390ee75438207042b215e752183的代码为例

dify-api启动函数,整个项目基于flask框架实现

import os
import sys


def is_db_command():
    """
    检查当前是否在执行 Flask 数据库相关命令
    返回值:如果是数据库命令返回 True,否则返回 False
    """
    if len(sys.argv) > 1 and sys.argv[0].endswith("flask") and sys.argv[1] == "db":
        return True
    return False


# 创建应用程序
if is_db_command():
    # 如果是数据库命令,使用专门的工厂函数创建用于数据库迁移的应用实例
    from app_factory import create_migrations_app

    app = create_migrations_app()
else:
    # JetBrains Python 调试器似乎与 gevent 不兼容,
    # 因此在调试模式下需要禁用 gevent。
    # 如果您使用 debugpy 并设置 GEVENT_SUPPORT=True,则可以使用 gevent 进行调试。
    if (flask_debug := os.environ.get("FLASK_DEBUG", "0")) and flask_debug.lower() in {"false", "0", "no"}:
        # 导入并初始化 gevent
        from gevent import monkey  # type: ignore
        monkey.patch_all()  # 使用 gevent 替换标准库中的阻塞操作

        # 初始化 gRPC 的 gevent 支持
        from grpc.experimental import gevent as grpc_gevent  # type: ignore
        grpc_gevent.init_gevent()

        # 为 psycopg(PostgreSQL 驱动)添加 gevent 支持
        import psycogreen.gevent  # type: ignore
        psycogreen.gevent.patch_psycopg()

    # 创建常规应用实例
    from app_factory import create_app
    app = create_app()
    # 获取 Celery 扩展实例
    celery = app.extensions["celery"]

if __name__ == "__main__":
    # 当直接运行此文件时,启动Flask开发服务器
    app.run(host="0.0.0.0", port=5001)

api是如何启动的?dify-api启动基于Gevent库,在api的api/docker/entrypoint.sh脚本中可以查看

...
  if [[ "${DEBUG}" == "true" ]]; then
    exec flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug
  else
    exec gunicorn \
      --bind "${DIFY_BIND_ADDRESS:-0.0.0.0}:${DIFY_PORT:-5001}" \
      --workers ${SERVER_WORKER_AMOUNT:-1} \
      --worker-class ${SERVER_WORKER_CLASS:-gevent} \
      --timeout ${GUNICORN_TIMEOUT:-200} \
      --preload \
      app:app
...

Gevent与asyncio的区别:

对比维度 Gevent asyncio
实现机制 基于greenlet实现协程,通过monkey patching修改标准库 基于Python原生的async/await语法和事件循环
编程模型 同步编程风格,代码看起来像普通同步代码 异步编程风格,需要显式使用async/await关键字
生态系统 成熟稳定但生态相对较小,适合老项目改造 Python 3.4后的标准库,生态系统不断扩大
使用场景 适合需要高并发但不想改变代码结构的项目 适合从零开始的新项目,充分利用现代异步特性
性能特点 由于monkey patching开销,某些场景性能略低 原生异步实现,理论性能更好,需要全栈异步支持

需要分析的功能特性列表汇总:

  • API路由&Middleware机制
  • 多租户&成员管理&角色管理
  • LLM管理
  • 对象存储管理
  • 向量数据库管理
  • Chatbot应用类型
  • Agent应用类型
  • 文本生成应用类型
  • Chatflow应用类型
  • Workflow应用类型
  • 知识库
  • 工具/插件(v1.0.0-beta重构了这块,跟主干代码解耦)

API路由

dify-api使用Flask Blueprint来组织路由,主要分为以下几个模块:

  1. 主要路由模块:
  • service_api_bp: 服务 API 路由 (/api/v1)
  • web_bp: Web API 路由 (/api)
  • console_app_bp: 控制台 API 路由 (/console/api)
  • files_bp: 文件处理 API 路由
  • inner_api_bp: 内部 API 路由
  1. 路由注册:
  • ext_blueprints.py中统一注册所有 blueprint
  • 每个模块都配置了相应的 CORS 策略
  • 使用 Flask-RESTful 来实现 RESTful API
  1. 主要 API 分组:
  • 应用相关 API (/apps/*)
  • 认证相关 API (/account/, /oauth/)
  • 工作区相关 API
  • 文件上传 API
  • 对话/聊天相关 API

举例:看一个登录API实现

class LoginApi(Resource):
    @setup_required
    def post(self):
        # 1. 参数验证
        parser = reqparse.RequestParser()
        parser.add_argument("email", type=email, required=True, location="json")
        parser.add_argument("password", type=valid_password, required=True, location="json")
        parser.add_argument("remember_me", type=bool, required=False, default=False)
        parser.add_argument("invite_token", type=str, required=False)
        parser.add_argument("language", type=str, required=False, default="en-US")
        
        # 2. 账户状态检查
        if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(args["email"]):
            raise AccountInFreezeError()
            
        # 3. 登录频率限制
        if AccountService.is_login_error_rate_limit(args["email"]):
            raise EmailPasswordLoginLimitError()
            
        # 4. 邀请码验证
        if invitation:
            invitation = RegisterService.get_invitation_if_token_valid(
                None, args["email"], invitation
            )
            
        # 5. 认证流程
        try:
            account = AccountService.authenticate(args["email"], args["password"])
        except services.errors.account.AccountLoginError:
            raise AccountBannedError()
        except services.errors.account.AccountPasswordError:
            AccountService.add_login_error_rate_limit(args["email"])
            raise EmailOrPasswordMismatchError()
            
        # 6. 工作区检查
        tenants = TenantService.get_join_tenants(account)
        if len(tenants) == 0:
            return {"result": "fail", "data": "workspace not found"}
            
        # 7. 生成令牌
        token_pair = AccountService.login(
            account=account, 
            ip_address=extract_remote_ip(request)
        )
        
        # 8. 重置错误计数
        AccountService.reset_login_error_rate_limit(args["email"])
        return {"result": "success", "data": token_pair.model_dump()}

Middleware实现

通过Flask扩展系统实现中间件功能,主要包括:

  1. 核心中间件:
  • ext_timezone: 时区处理
  • ext_logging: 日志系统
  • ext_compress: 响应压缩
  • ext_database: 数据库连接
  • ext_redis: Redis 缓存
  • ext_celery: 异步任务
  1. 安全相关中间件:
  • ext_login: 用户认证
  • ext_set_secretkey: 密钥设置
  • ext_proxy_fix: 代理修复
  1. 功能性中间件:
  • ext_storage: 存储服务
  • ext_mail: 邮件服务
  • ext_sentry: 错误追踪
  • ext_app_metrics: 应用指标
  1. 中间件初始化:
  • 在app_factory.py中通过 initialize_extensions()函数初始化所有中间件
  • 中间件按照特定顺序加载,确保依赖关系
  • 支持通过配置启用/禁用特定中间件

5.请求处理装饰器:

@setup_required
@login_required
@account_initialization_required
@validate_app_token
@cloud_edition_billing_resource_check
...

这些装饰器在路由处理前进行各种验证和检查

举例:来看一个用于API认证和授权的中间件

def validate_app_token(view: Optional[Callable] = None, *, fetch_user_arg: Optional[FetchUserArg] = None):
    def decorator(view_func):
        @wraps(view_func)
        def decorated_view(*args, **kwargs):
            # 1. 验证并获取 API Token
            api_token = validate_and_get_api_token("app")
            
            # 2. 应用状态检查
            app_model = db.session.query(App).filter(App.id == api_token.app_id).first()
            if not app_model:
                raise Forbidden("The app no longer exists.")
            if app_model.status != "normal":
                raise Forbidden("The app's status is abnormal.")
            if not app_model.enable_api:
                raise Forbidden("The app's API service has been disabled.")
                
            # 3. 租户状态检查
            tenant = db.session.query(Tenant).filter(Tenant.id == app_model.tenant_id).first()
            if tenant.status == TenantStatus.ARCHIVE:
                raise Forbidden("The workspace's status is archived.")
                
            # 4. 用户信息处理
            if fetch_user_arg:
                user_id = get_user_id_from_request(fetch_user_arg)
                kwargs["end_user"] = create_or_update_end_user_for_user_id(
                    app_model, user_id
                )
                
            return view_func(*args, **kwargs)
        return decorated_view
    return decorator if view is None else decorator(view)
# Token验证实现
def validate_and_get_api_token(scope: str | None = None):
    # 1. 验证 Authorization Header
    auth_header = request.headers.get("Authorization")
    if auth_header is None or " " not in auth_header:
        raise Unauthorized("Authorization header must be provided")
        
    # 2. 验证认证方案
    auth_scheme, auth_token = auth_header.split(None, 1)
    if auth_scheme.lower() != "bearer":
        raise Unauthorized("Authorization scheme must be 'Bearer'")
        
    # 3. Token 使用时间更新
    current_time = datetime.now(UTC).replace(tzinfo=None)
    cutoff_time = current_time - timedelta(minutes=1)
    
    with Session(db.engine, expire_on_commit=False) as session:
        # 4. 更新 Token 最后使用时间
        update_stmt = (
            update(ApiToken)
            .where(
                ApiToken.token == auth_token,
                (ApiToken.last_used_at.is_(None) | (ApiToken.last_used_at < cutoff_time)),
                ApiToken.type == scope,
            )
            .values(last_used_at=current_time)
            .returning(ApiToken)
        )
        
        # 5. Token 有效性验证
        api_token = result.scalar_one_or_none()
        if not api_token:
            stmt = select(ApiToken).where(
                ApiToken.token == auth_token, 
                ApiToken.type == scope
            )
            api_token = session.scalar(stmt)
            if not api_token:
                raise Unauthorized("Access token is invalid")
                
        return api_token

多租户&成员管理&角色管理

  1. 多租户系统的核心模型:
  • Tenant(租户):
    • 代表一个独立的工作空间
    • 包含基本信息:id、name、plan(计划)、status(状态)等
    • 可以配置自定义设置(custom_config)
  • Account(账户):
    • 代表系统中的用户
    • 包含用户基本信息:name、email、password 等
    • 可以属于多个租户
  1. 租户-成员关系管理:

    • TenantAccountJoin:
    • 实现租户和账户的多对多关系
    • 记录用户在租户中的角色(role)
    • 记录邀请人(invited_by)
    • 通过 unique_tenant_account_join 约束确保一个用户在同一租户中只有一个角色
  2. 角色权限体系(TenantAccountRole):

class TenantAccountRole(enum.StrEnum):
    OWNER = "owner"      # 租户所有者
    ADMIN = "admin"      # 管理员
    EDITOR = "editor"    # 编辑者
    NORMAL = "normal"    # 普通成员
    DATASET_OPERATOR = "dataset_operator"  # 数据集操作员

每个角色都有特定的权限:

  • is_privileged_role: 判断是否为特权角色(owner 或 admin)
  • is_editing_role: 判断是否有编辑权限
  • is_dataset_edit_role: 判断是否有数据集编辑权限
  1. 租户状态管理:
class TenantStatus(enum.StrEnum):
    NORMAL = "normal"   # 正常状态
    ARCHIVE = "archive" # 归档状态
  1. 成员管理功能:
  • 租户可以通过 get_accounts() 方法获取所有成员
  • 用户可以通过 current_tenant 属性访问当前所在的租户
  • 用户可以通过 current_role 属性获取在当前租户中的角色
  • 提供了一系列角色判断方法:
    • is_admin_or_owner
    • is_admin
    • is_editor
    • is_dataset_editor
    • is_dataset_operator

角色定义在代码中硬编码了,需要修改角色定义的地方方可实现,以下是一种思路实现,可能还有遗漏处:

  1. 在TenantAccountRole枚举类中添加新角色:
class TenantAccountRole(enum.StrEnum):
    OWNER = "owner"
    ADMIN = "admin"
    EDITOR = "editor"
    NORMAL = "normal"
    DATASET_OPERATOR = "dataset_operator"
    # 添加新角色,例如:
    VIEWER = "viewer"  # 只读角色
  1. 更新角色验证方法: 在 TenantAccountRole 类中的 is_valid_role 方法中添加新角色:
@staticmethod
def is_valid_role(role: str) -> bool:
    return role in {
        TenantAccountRole.OWNER,
        TenantAccountRole.ADMIN,
        TenantAccountRole.EDITOR,
        TenantAccountRole.NORMAL,
        TenantAccountRole.DATASET_OPERATOR,
        TenantAccountRole.VIEWER,  # 添加新角色
    }
  1. 定义新角色的权限: 根据新角色的权限需求,在 TenantAccountRole 类中添加或修改相应的权限判断方法:
@staticmethod
def is_viewing_role(role: str) -> bool:
    return role in {
        TenantAccountRole.OWNER,
        TenantAccountRole.ADMIN,
        TenantAccountRole.EDITOR,
        TenantAccountRole.NORMAL,
        TenantAccountRole.VIEWER
    }
  1. 在 Account 类中添加角色判断属性:
@property
def is_viewer(self):
    return self._current_tenant.current_role == TenantAccountRole.VIEWER
  1. 同步更新 TenantAccountJoinRole 枚举
class TenantAccountJoinRole(enum.Enum):
    OWNER = "owner"
    ADMIN = "admin"
    NORMAL = "normal"
    DATASET_OPERATOR = "dataset_operator"
    VIEWER = "viewer"  # 添加新角色
  1. 数据库迁移: 由于角色是以字符串形式存储在数据库中,所以不需要修改数据库结构。但是建议添加数据库约束来确保角色值的有效性:
# 在 TenantAccountJoin 模型中添加角色约束
role = db.Column(db.String(16), 
                nullable=False, 
                server_default="normal",
                db.CheckConstraint("role IN ('owner', 'admin', 'normal', 'dataset_operator', 'viewer')"))

更新相关的权限检查逻辑

LLM管理

Dify中LLM管理系统的具体实现过程:

  1. 基础架构设计
# 基础模型类
class AIModel:
    model_type: ModelType  # 模型类型(LLM/Embedding等)

# LLM基类
class LargeLanguageModel(AIModel):
    model_type = ModelType.LLM
    
    def invoke(self, model: str, credentials: dict, prompt_messages: list[PromptMessage], 
              model_parameters: dict = None, tools: list[PromptMessageTool] = None,
              stop: list[str] = None, stream: bool = True, user: str = None) -> Union[LLMResult, Generator]:
        """调用LLM模型的核心方法"""
        pass
        
    def get_parameter_rules(self, model: str, credentials: dict) -> list[ParameterRule]:
        """获取模型参数规则"""
        pass
        
    def get_model_mode(self, model: str, credentials: dict = None) -> LLMMode:
        """获取模型模式(对话/补全)"""
        pass
  1. 模型实例管理
class ModelInstance:
    def __init__(self):
        self.tenant_id: str  # 租户ID
        self.model_type: ModelType  # 模型类型
        self.provider: str  # 提供商
        self.model: str  # 模型名称
        self.credentials: dict  # 认证信息
        self.model_type_instance: Union[LargeLanguageModel, TextEmbeddingModel]  # 具体模型实例
        
    def get_llm_num_tokens(self, prompt_messages: list[PromptMessage], 
                          tools: list[PromptMessageTool] = None) -> int:
        """计算token数量"""
        pass
        
    def invoke_text_embedding(self, texts: list[str], user: str = None,
                            input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT) -> TextEmbeddingResult:
        """调用文本嵌入"""
        pass
  1. 模型管理器实现
class ModelManager:
    def get_model_instance(self, tenant_id: str, model_type: ModelType,
                          provider: str, model: str) -> ModelInstance:
        """获取模型实例"""
        # 1. 获取提供商配置
        provider_configurations = self._get_provider_configurations(tenant_id)
        
        # 2. 验证模型是否存在
        if provider not in provider_configurations:
            raise ValueError(f"Provider {provider} does not exist")
            
        # 3. 创建模型实例
        model_instance = ModelInstance(
            tenant_id=tenant_id,
            model_type=model_type,
            provider=provider,
            model=model
        )
        
        return model_instance
        
    def invoke_llm(self, model_instance: ModelInstance, 
                   prompt_messages: list[PromptMessage],
                   stream: bool = True,
                   user: str = None) -> Union[LLMResult, Generator]:
        """调用LLM模型"""
        pass
  1. 具体LLM提供商实现
# OpenAI实现
class OpenAILargeLanguageModel(LargeLanguageModel):
    def _invoke(self, model: str, credentials: dict,
                prompt_messages: list[PromptMessage],
                model_parameters: dict,
                tools: list[PromptMessageTool] = None,
                stop: list[str] = None,
                stream: bool = True,
                user: str = None) -> Union[LLMResult, Generator]:
        """实现OpenAI模型调用"""
        # 1. 处理认证信息
        client = OpenAI(**credentials)
        
        # 2. 转换消息格式
        messages = self._convert_messages(prompt_messages)
        
        # 3. 调用API
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            stream=stream,
            **model_parameters
        )
        
        # 4. 处理响应
        return self._handle_response(response, stream)
        
    def remote_models(self, credentials: dict) -> list[AIModelEntity]:
        """获取远程可用模型列表"""
        pass
  1. 模型配置管理
class ModelProperties(BaseModel):
    context_size: int  # 上下文窗口大小
    max_tokens: int    # 最大token数
    mode: LLMMode     # 模型模式

class ModelConfig(BaseModel):
    properties: ModelProperties
    features: list[ModelFeature]  # 模型特性

# 模型配置示例
MODEL_CONFIGS = {
    "gpt-4": ModelConfig(
        properties=ModelProperties(
            context_size=8192,
            max_tokens=4096,
            mode=LLMMode.CHAT
        ),
        features=[
            ModelFeature.AGENT_THOUGHT,
            ModelFeature.TOOL_CALL
        ]
    )
}
  1. 提供商管理
class ProviderManager:
    def update_default_model_record(self, tenant_id: str, 
                                  model_type: ModelType,
                                  provider: str, 
                                  model: str) -> TenantDefaultModel:
        """更新默认模型记录"""
        # 1. 验证提供商
        provider_configurations = self.get_configurations(tenant_id)
        if provider not in provider_configurations:
            raise ValueError(f"Provider {provider} does not exist")
            
        # 2. 验证模型
        available_models = provider_configurations.get_models(
            model_type=model_type,
            only_active=True
        )
        
        # 3. 更新数据库记录
        default_model = TenantDefaultModel.query.filter(
            tenant_id=tenant_id,
            model_type=model_type
        ).first()
        
        if default_model:
            default_model.provider_name = provider
            default_model.model_name = model
        else:
            default_model = TenantDefaultModel(
                tenant_id=tenant_id,
                model_type=model_type,
                provider_name=provider,
                model_name=model
            )
            
        db.session.commit()
        return default_model

模型调用流程示例:

# 1. 创建模型实例
model_instance = model_manager.get_model_instance(
    tenant_id="tenant_123",
    model_type=ModelType.LLM,
    provider="openai",
    model="gpt-4"
)

# 2. 准备提示消息
messages = [
    SystemPromptMessage(content="You are a helpful assistant"),
    UserPromptMessage(content="Tell me about LLMs")
]

# 3. 设置模型参数
model_parameters = {
    "temperature": 0.7,
    "max_tokens": 1000
}

# 4. 调用模型
try:
    response = model_instance.model_type_instance.invoke(
        model="gpt-4",
        credentials=model_instance.credentials,
        prompt_messages=messages,
        model_parameters=model_parameters,
        stream=True
    )
    
    # 5. 处理流式响应
    for chunk in response:
        yield chunk.delta.content
        
except Exception as e:
    logger.error(f"Model invocation failed: {str(e)}")
    raise

对象存储管理

Dify中的对象存储系统主要用于以下几个方面:

  1. 文件上传管理
class FileService:
    @staticmethod
    def upload_file(
        filename: str,
        content: bytes,
        mimetype: str,
        user: Union[Account, EndUser, Any],
        source: Literal["datasets"] | None = None,
        source_url: str = "",
    ) -> UploadFile:

主要用于:

  • 数据集文档上传
  • 图片文件存储
  • 其他用户上传的文件
  1. 存储实现支持多种对象存储服务:
class StorageType(StrEnum):
    ALIYUN_OSS = "aliyun-oss"  # 阿里云OSS
    AZURE_BLOB = "azure-blob"  # Azure Blob存储
    BAIDU_OBS = "baidu-obs"    # 百度对象存储
    GOOGLE_STORAGE = "google-storage"  # Google Cloud Storage
    HUAWEI_OBS = "huawei-obs"  # 华为对象存储
    LOCAL = "local"            # 本地存储
    S3 = "s3"                 # AWS S3
    TENCENT_COS = "tencent-cos" # 腾讯云COS
    # ...等
  1. 基础存储接口定义:
class BaseStorage(ABC):
    @abstractmethod
    def save(self, filename, data):
        """保存文件"""
        
    @abstractmethod  
    def load_once(self, filename: str) -> bytes:
        """一次性加载文件"""
        
    @abstractmethod
    def load_stream(self, filename: str) -> Generator:
        """流式加载文件"""
        
    @abstractmethod
    def download(self, filename, target_filepath):
        """下载文件"""
        
    @abstractmethod
    def exists(self, filename):
        """检查文件是否存在"""
        
    @abstractmethod
    def delete(self, filename):
        """删除文件"""
  1. 具体实现示例(以S3为例):
class AwsS3Storage(BaseStorage):
    def __init__(self):
        self.bucket_name = dify_config.S3_BUCKET_NAME
        self.client = boto3.client(
            "s3",
            aws_secret_access_key=dify_config.S3_SECRET_KEY,
            aws_access_key_id=dify_config.S3_ACCESS_KEY,
            endpoint_url=dify_config.S3_ENDPOINT,
            region_name=dify_config.S3_REGION
        )
        
    def save(self, filename, data):
        self.client.put_object(
            Bucket=self.bucket_name,
            Key=filename,
            Body=data
        )
  1. 文件上传限制:
class FileApi(Resource):
    def get(self):
        return {
            "file_size_limit": dify_config.UPLOAD_FILE_SIZE_LIMIT,
            "batch_count_limit": dify_config.UPLOAD_FILE_BATCH_LIMIT,
            "image_file_size_limit": dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT,
            "video_file_size_limit": dify_config.UPLOAD_VIDEO_FILE_SIZE_LIMIT,
            "audio_file_size_limit": dify_config.UPLOAD_AUDIO_FILE_SIZE_LIMIT
        }
  1. 文件数据模型:
class UploadFile(db.Model):
    id: Mapped[str]
    tenant_id: Mapped[str]  # 租户ID
    storage_type: Mapped[str]  # 存储类型
    key: Mapped[str]  # 存储键
    name: Mapped[str]  # 文件名
    size: Mapped[int]  # 文件大小
    extension: Mapped[str]  # 扩展名
    mime_type: Mapped[str]  # MIME类型
    created_by: Mapped[str]  # 创建者
    created_at: Mapped[datetime]  # 创建时间

向量数据库管理

Dify中向量数据库管理的实现:

  1. 整体架构设计:
  • 采用工厂模式和策略模式,通过 AbstractVectorFactory 抽象工厂类和 BaseVector 基类来管理不同的向量数据库实现
  • 支持多种向量数据库,包括:PGVector、MyScale、Qdrant、ElasticSearch、OceanBase、TiDB等
  • 统一的向量操作接口,包括:创建、添加、搜索、删除等基本操作
  1. 核心组件:
# 基类定义向量数据库的基本接口
class BaseVector(ABC):
    @abstractmethod
    def create(self, texts, embeddings, **kwargs)  # 创建集合
    @abstractmethod 
    def add_texts(self, documents, embeddings, **kwargs) # 添加文档
    @abstractmethod
    def search_by_vector(self, query_vector, **kwargs) # 向量搜索
    @abstractmethod
    def search_by_full_text(self, query, **kwargs) # 全文搜索
    @abstractmethod
    def delete(self) # 删除集合

主要功能实现: a) 向量集合管理:

  • 创建集合时自动创建表结构和索引
  • 支持设置维度、距离计算方式等参数
  • 使用Redis锁避免并发创建问题

b) 文档管理:

  • 支持批量添加文档和向量
  • 文档包含文本内容、元数据、向量等信息
  • 支持文档去重和ID管理

c) 向量检索:

  • 支持KNN近邻搜索
  • 可配置top_k、score_threshold等参数
  • 支持不同的距离计算方式(cosine、l2等)

d) 全文检索:

  • 部分实现支持全文搜索功能
  • 可与向量检索结合使用

Chatbot应用类型

五种应用类型覆盖了从简单到复杂的不同使用场景,为用户提供了灵活的选择。其中:

  • Chatbot和Completion是基础应用类型
  • Agent提供了更智能的自主决策能力
  • Chatflow和Workflow则提供了更高级的编排和自定义能力

介绍应用类型之前,了解下消息队列管理(所有类型共用),核心实现类:MessageBasedAppQueueManager 主要实现:

def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
    message = MessageQueueMessage(
        task_id=self._task_id,
        message_id=self._message_id,
        conversation_id=self._conversation_id,
        app_mode=self._app_mode,
        event=event,
    )
    self._q.put(message)
  • 消息事件管理
  • 队列操作封装
  • 支持多种事件类型
  • 错误处理机制

Chatbot应用类型核心实现类:ChatAppGenerator,具体工作流程

  1. 初始化阶段
def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: bool = True,
):
    # 验证必要参数
    if not args.get("query"):
        raise ValueError("query is required")
    
    # 获取会话信息
    conversation = None
    if args.get("conversation_id"):
        conversation = self._get_conversation_by_user(...)
        
    # 加载应用模型配置
    app_model_config = self._get_app_model_config(...)
  1. 生成过程
  • 处理文件上传配置
  • 转换为应用配置(ChatAppConfigManager)
  • 初始化追踪管理器(TraceQueueManager)
  • 创建生成实体(ChatAppGenerateEntity)
  1. 消息处理
  • 使用 MessageBasedAppQueueManager管理消息队列
  • 支持流式响应和阻塞响应
  • 通过线程处理生成任务
  1. 错误处理
  • 处理验证错误(ValidationError)
  • 处理授权错误(InvokeAuthorizationError)
  • 处理任务停止错误(GenerateTaskStoppedError)

Agent应用类型 (Agent Chat)

核心实现类:AgentChatAppGenerator

  1. 初始化阶段
def generate(
    self,
    *,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: bool = True,
):
    # 验证流式响应
    if not streaming:
        raise ValueError("Agent Chat App does not support blocking mode")
    
    # 验证必要参数
    if not args.get("query"):
        raise ValueError("query is required")
        
    # 获取会话信息
    conversation = None
    if args.get("conversation_id"):
        conversation = self._get_conversation_by_user(...)
  1. 代理配置过程
  • 初始化代理配置(AgentChatAppConfigManager)
  • 配置文件处理能力
  • 设置工具使用权限
  • 创建代理生成实体(AgentChatAppGenerateEntity)
  1. 任务执行
  • 初始化追踪管理器(TraceQueueManager)
  • 创建工作线程处理生成任务
  • 支持工具调用和结果处理
  • 维护代理状态和上下文
  1. 错误处理
  • 处理验证错误(ValidationError)
  • 处理授权错误(InvokeAuthorizationError)
  • 处理任务中断(GenerateTaskStoppedError)

Chatflow应用类型 (Advanced Chat)

核心实现类:AdvancedChatAppGenerator

  1. 初始化阶段
def generate(
    self,
    app_model: App,
    workflow: Workflow,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: bool = True,
):
    # 验证工作流
    if not workflow:
        raise ValueError("workflow is required")
        
    # 验证输入
    if not args.get("query"):
        raise ValueError("query is required")
        
    # 初始化上下文
    contexts.tenant_id.set(app_model.tenant_id)
  1. 工作流配置
  • 加载工作流配置
  • 设置节点执行顺序
  • 配置变量和参数
  • 创建工作流生成实体(AdvancedChatAppGenerateEntity)
  1. 执行流程
  • 初始化工作流运行环境
  • 按序执行节点任务
  • 处理节点间数据传递
  • 维护执行状态和上下文
  1. 错误处理
  • 处理节点执行错误
  • 处理工作流中断
  • 支持部分成功状态

Workflow应用类型

核心实现类:WorkflowAppGenerator

  1. 初始化阶段
def generate(
    self,
    app_model: App,
    workflow_config: dict,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
):
    # 验证工作流配置
    if not workflow_config:
        raise ValueError("workflow_config is required")
        
    # 初始化工作流状态
    workflow_state = WorkflowState(workflow_config)
  1. 工作流编排
  • 解析工作流配置
  • 构建执行图
  • 设置数据流向
  • 创建工作流执行实体
  1. 执行管理
  • 使用 WorkflowAppQueueManager 管理执行队列
  • 处理节点执行结果
  • 支持条件分支和循环
  • 维护执行状态
  1. 错误处理
  • 处理节点失败
  • 支持部分成功(QueueWorkflowPartialSuccessEvent)
  • 处理完全失败(QueueWorkflowFailedEvent)
  • 支持成功完成(QueueWorkflowSucceededEvent)

文本生成应用类型 (Completion)

核心实现类:CompletionAppGenerator

  1. 初始化阶段
def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
):
    # 验证输入文本
    if not args.get("prompt"):
        raise ValueError("prompt is required")
        
    # 加载模型配置
    model_config = self._get_model_config(app_model)
  1. 生成配置
  • 设置模型参数
  • 配置输出格式
  • 设置生成约束
  • 创建生成实体(CompletionGenerateEntity)
  1. 执行过程
  • 调用模型生成
  • 处理生成结果
  • 格式化输出
  • 保存生成历史
  1. 错误处理
  • 处理模型调用错误
  • 处理格式验证错误
  • 处理生成限制错误

注:文本生成应用类型(Completion)已被标记为废弃,不建议在新项目中使用。建议使用 Chatflow 或 Workflow 类型替代。

如何添加一个内置工具

如何添加一个对象存储实现

代码参考实现:https://github.com/langgenius/dify/pull/8164

如何添加一个向量数据库

代码参考实现:https://github.com/langgenius/dify/pull/9287

如何添加一个LLM Model Provider

如何添加一个Embedding Model Provider

参考链接

「真诚赞赏,手留余香」

爱折腾的工程师

真诚赞赏,手留余香

使用微信扫描二维码完成支付