1. dify简介
Dify是一个开源的LLM应用开发平台。其直观的界面结合了AI工作流、RAG管道、Agent、模型管理、可观测性功能等,让您可以快速从原型到生产。
2. dify核心功能:
2.1. 强大的AI工作流构建与测试
- 用户可以直观地设计和构建 AI 工作流,根据不同的业务需求和场景进行定制化操作。
- 能够对构建的工作流进行全面测试,确保其在实际应用中的稳定性和准确性。
2.2. 全面的模型支持
- 无缝集成了数百种来自数十个推理提供商和自托管解决方案的专有 / 开源大型语言模型(LLM)。
- 涵盖了多种不同类型的模型,为用户提供了丰富的选择,以满足不同的任务需求。
2.3. 便捷的提示 IDE
- 提供了一个集成开发环境(IDE),方便用户编写和优化提示(prompt)。
- 帮助用户更好地与语言模型进行交互,提高提示的质量和效果。
2.4. 强大的 RAG(检索增强生成)管道
- 具备高效的检索增强生成功能,能够从大量的文本数据中检索相关信息,并结合语言模型生成更加准确和丰富的内容。
- 提升了语言模型的输出质量和实用性。
2.5. 灵活的代理能力
- 允许用户设置代理,以满足不同的网络环境和访问需求。
- 增强了系统的灵活性和可扩展性。
2.6. 有效的LLMOps
- 提供了一系列针对语言模型的操作和管理工具,包括模型部署、监控、优化等。
- 帮助用户更好地管理和维护语言模型的运行,提高工作效率。
2.7. 后端即服务
- 可以作为后端服务提供给其他应用程序使用,方便集成和扩展。
- 为开发者提供了便捷的接口和工具,降低了开发成本和难度。
与LangChain、Flowise、OpenAI Assistant API对比:
功能 | Dify.AI | LangChain | Flowise | OpenAI Assistant API |
---|---|---|---|---|
编程方法 | API + 应用程序导向 | Python 代码 | 应用程序导向 | API 导向 |
支持的 LLMs | 丰富多样 | 丰富多样 | 丰富多样 | 仅限 OpenAI |
RAG引擎 | ✅ | ✅ | ✅ | ✅ |
Agent | ✅ | ✅ | ❌ | ✅ |
工作流 | ✅ | ❌ | ✅ | ❌ |
可观测性 | ✅ | ✅ | ❌ | ❌ |
企业功能(SSO/访问控制) | ✅ | ❌ | ❌ | ❌ |
本地部署 | ✅ | ✅ | ✅ | ❌ |
3. 源码部署
通过源码部署,方便后续代码调试,了解具体实现原理
- 克隆dify源码
git clone https://github.com/langgenius/dify.git
- docker-compose部署外部依赖PostgresSQL/Redis/Weaviate等
cd dify/docker
cp middleware.env.example middleware.env
docker compose -f docker-compose.middleware.yaml up -d
- 准备dify虚拟环境
conda create --name dify python=3.12
conda activate dify
pip install poetry
- 启动dify-api/dify-worker服务
# 进入api目录
cd api/
# 拷贝环境变量文件
cp .env.example .env
# 生成随机密钥
openssl rand -base64 42
# 替换环境变量文件中的密钥
sed -i 's/SECRET_KEY=.*/SECRET_KEY=<your_value>/' .env
# 使用poetry安装依赖
poetry install
# 执行数据库迁移到最新版本
flask db upgrade
# 启动dify-api服务
flask run --host 0.0.0.0 --port=5001 --debug
# 启动dify-worker服务
celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace
- 启动dify-web服务
cd web
# 安装依赖
npm install
# 拷贝环境变量文件
cp .env.example .env.local
# 编译
npm run build
# 启动前端服务
npm run start
附上一个vscode launch.json文件:
{
"version": "0.2.0",
"configurations": [
{
"name": "dify-api",
"type": "debugpy",
"request": "launch",
"module": "flask",
"env": {
"FLASK_APP": "app.py",
"FLASK_ENV": "development",
"GEVENT_SUPPORT": "true",
},
"args": [
"run",
"--host=0.0.0.0",
"--port=5001",
"--debug"
],
"cwd": "${workspaceFolder}/api",
"justMyCode": false
},
{
"name": "dify-worker",
"type": "debugpy",
"request": "launch",
"program": "/opt/anaconda3/envs/dify/bin/celery",
"env": {
"GEVENT_SUPPORT": "true",
},
"args": [
"-A",
"app.celery",
"worker",
"-P",
"gevent",
"-c",
"1",
"--loglevel=INFO",
"-Q",
"dataset,generation,mail,ops_trace"
],
"cwd": "${workspaceFolder}/api",
"justMyCode": false,
"console": "integratedTerminal"
},
{
"name": "dify-web",
"type": "node",
"request": "launch",
"runtimeExecutable": "/opt/homebrew/bin/pnpm",
"runtimeArgs": [
"start"
],
"cwd": "${workspaceFolder}/web",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
}
]
}
如果源码编译dify-web失败的话,可以使用docker容器启动dify-web
docker run -it -d -p 3000:3000 -e CONSOLE_URL=http://127.0.0.1:5001 -e APP_URL=http://127.0.0.1:5001 langgenius/dify-web:main
4. 源码分析
以git commit:da67916843249390ee75438207042b215e752183
的代码为例
dify-api启动函数,整个项目基于flask框架实现
import os
import sys
def is_db_command():
"""
检查当前是否在执行 Flask 数据库相关命令
返回值:如果是数据库命令返回 True,否则返回 False
"""
if len(sys.argv) > 1 and sys.argv[0].endswith("flask") and sys.argv[1] == "db":
return True
return False
# 创建应用程序
if is_db_command():
# 如果是数据库命令,使用专门的工厂函数创建用于数据库迁移的应用实例
from app_factory import create_migrations_app
app = create_migrations_app()
else:
# JetBrains Python 调试器似乎与 gevent 不兼容,
# 因此在调试模式下需要禁用 gevent。
# 如果您使用 debugpy 并设置 GEVENT_SUPPORT=True,则可以使用 gevent 进行调试。
if (flask_debug := os.environ.get("FLASK_DEBUG", "0")) and flask_debug.lower() in {"false", "0", "no"}:
# 导入并初始化 gevent
from gevent import monkey # type: ignore
monkey.patch_all() # 使用 gevent 替换标准库中的阻塞操作
# 初始化 gRPC 的 gevent 支持
from grpc.experimental import gevent as grpc_gevent # type: ignore
grpc_gevent.init_gevent()
# 为 psycopg(PostgreSQL 驱动)添加 gevent 支持
import psycogreen.gevent # type: ignore
psycogreen.gevent.patch_psycopg()
# 创建常规应用实例
from app_factory import create_app
app = create_app()
# 获取 Celery 扩展实例
celery = app.extensions["celery"]
if __name__ == "__main__":
# 当直接运行此文件时,启动Flask开发服务器
app.run(host="0.0.0.0", port=5001)
api是如何启动的?dify-api启动基于Gevent库,在api的api/docker/entrypoint.sh
脚本中可以查看
...
if [[ "${DEBUG}" == "true" ]]; then
exec flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug
else
exec gunicorn \
--bind "${DIFY_BIND_ADDRESS:-0.0.0.0}:${DIFY_PORT:-5001}" \
--workers ${SERVER_WORKER_AMOUNT:-1} \
--worker-class ${SERVER_WORKER_CLASS:-gevent} \
--timeout ${GUNICORN_TIMEOUT:-200} \
--preload \
app:app
...
Gevent与asyncio的区别:
对比维度 | Gevent | asyncio |
---|---|---|
实现机制 | 基于greenlet实现协程,通过monkey patching修改标准库 | 基于Python原生的async/await语法和事件循环 |
编程模型 | 同步编程风格,代码看起来像普通同步代码 | 异步编程风格,需要显式使用async/await关键字 |
生态系统 | 成熟稳定但生态相对较小,适合老项目改造 | Python 3.4后的标准库,生态系统不断扩大 |
使用场景 | 适合需要高并发但不想改变代码结构的项目 | 适合从零开始的新项目,充分利用现代异步特性 |
性能特点 | 由于monkey patching开销,某些场景性能略低 | 原生异步实现,理论性能更好,需要全栈异步支持 |
需要分析的功能特性列表汇总:
- API路由&Middleware机制
- 多租户&成员管理&角色管理
- LLM管理
- 对象存储管理
- 向量数据库管理
- Chatbot应用类型
- Agent应用类型
- 文本生成应用类型
- Chatflow应用类型
- Workflow应用类型
- 知识库
- 工具/插件(v1.0.0-beta重构了这块,跟主干代码解耦)
- …
API路由
dify-api使用Flask Blueprint来组织路由,主要分为以下几个模块:
- 主要路由模块:
- service_api_bp: 服务 API 路由 (/api/v1)
- web_bp: Web API 路由 (/api)
- console_app_bp: 控制台 API 路由 (/console/api)
- files_bp: 文件处理 API 路由
- inner_api_bp: 内部 API 路由
- 路由注册:
- 在
ext_blueprints.py
中统一注册所有 blueprint - 每个模块都配置了相应的 CORS 策略
- 使用 Flask-RESTful 来实现 RESTful API
- 主要 API 分组:
- 应用相关 API (/apps/*)
- 认证相关 API (/account/, /oauth/)
- 工作区相关 API
- 文件上传 API
- 对话/聊天相关 API
举例:看一个登录API实现
class LoginApi(Resource):
@setup_required
def post(self):
# 1. 参数验证
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
parser.add_argument("password", type=valid_password, required=True, location="json")
parser.add_argument("remember_me", type=bool, required=False, default=False)
parser.add_argument("invite_token", type=str, required=False)
parser.add_argument("language", type=str, required=False, default="en-US")
# 2. 账户状态检查
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(args["email"]):
raise AccountInFreezeError()
# 3. 登录频率限制
if AccountService.is_login_error_rate_limit(args["email"]):
raise EmailPasswordLoginLimitError()
# 4. 邀请码验证
if invitation:
invitation = RegisterService.get_invitation_if_token_valid(
None, args["email"], invitation
)
# 5. 认证流程
try:
account = AccountService.authenticate(args["email"], args["password"])
except services.errors.account.AccountLoginError:
raise AccountBannedError()
except services.errors.account.AccountPasswordError:
AccountService.add_login_error_rate_limit(args["email"])
raise EmailOrPasswordMismatchError()
# 6. 工作区检查
tenants = TenantService.get_join_tenants(account)
if len(tenants) == 0:
return {"result": "fail", "data": "workspace not found"}
# 7. 生成令牌
token_pair = AccountService.login(
account=account,
ip_address=extract_remote_ip(request)
)
# 8. 重置错误计数
AccountService.reset_login_error_rate_limit(args["email"])
return {"result": "success", "data": token_pair.model_dump()}
Middleware实现
通过Flask扩展系统实现中间件功能,主要包括:
- 核心中间件:
- ext_timezone: 时区处理
- ext_logging: 日志系统
- ext_compress: 响应压缩
- ext_database: 数据库连接
- ext_redis: Redis 缓存
- ext_celery: 异步任务
- 安全相关中间件:
- ext_login: 用户认证
- ext_set_secretkey: 密钥设置
- ext_proxy_fix: 代理修复
- 功能性中间件:
- ext_storage: 存储服务
- ext_mail: 邮件服务
- ext_sentry: 错误追踪
- ext_app_metrics: 应用指标
- 中间件初始化:
- 在app_factory.py中通过 initialize_extensions()函数初始化所有中间件
- 中间件按照特定顺序加载,确保依赖关系
- 支持通过配置启用/禁用特定中间件
5.请求处理装饰器:
@setup_required
@login_required
@account_initialization_required
@validate_app_token
@cloud_edition_billing_resource_check
...
这些装饰器在路由处理前进行各种验证和检查
举例:来看一个用于API认证和授权的中间件
def validate_app_token(view: Optional[Callable] = None, *, fetch_user_arg: Optional[FetchUserArg] = None):
def decorator(view_func):
@wraps(view_func)
def decorated_view(*args, **kwargs):
# 1. 验证并获取 API Token
api_token = validate_and_get_api_token("app")
# 2. 应用状态检查
app_model = db.session.query(App).filter(App.id == api_token.app_id).first()
if not app_model:
raise Forbidden("The app no longer exists.")
if app_model.status != "normal":
raise Forbidden("The app's status is abnormal.")
if not app_model.enable_api:
raise Forbidden("The app's API service has been disabled.")
# 3. 租户状态检查
tenant = db.session.query(Tenant).filter(Tenant.id == app_model.tenant_id).first()
if tenant.status == TenantStatus.ARCHIVE:
raise Forbidden("The workspace's status is archived.")
# 4. 用户信息处理
if fetch_user_arg:
user_id = get_user_id_from_request(fetch_user_arg)
kwargs["end_user"] = create_or_update_end_user_for_user_id(
app_model, user_id
)
return view_func(*args, **kwargs)
return decorated_view
return decorator if view is None else decorator(view)
# Token验证实现
def validate_and_get_api_token(scope: str | None = None):
# 1. 验证 Authorization Header
auth_header = request.headers.get("Authorization")
if auth_header is None or " " not in auth_header:
raise Unauthorized("Authorization header must be provided")
# 2. 验证认证方案
auth_scheme, auth_token = auth_header.split(None, 1)
if auth_scheme.lower() != "bearer":
raise Unauthorized("Authorization scheme must be 'Bearer'")
# 3. Token 使用时间更新
current_time = datetime.now(UTC).replace(tzinfo=None)
cutoff_time = current_time - timedelta(minutes=1)
with Session(db.engine, expire_on_commit=False) as session:
# 4. 更新 Token 最后使用时间
update_stmt = (
update(ApiToken)
.where(
ApiToken.token == auth_token,
(ApiToken.last_used_at.is_(None) | (ApiToken.last_used_at < cutoff_time)),
ApiToken.type == scope,
)
.values(last_used_at=current_time)
.returning(ApiToken)
)
# 5. Token 有效性验证
api_token = result.scalar_one_or_none()
if not api_token:
stmt = select(ApiToken).where(
ApiToken.token == auth_token,
ApiToken.type == scope
)
api_token = session.scalar(stmt)
if not api_token:
raise Unauthorized("Access token is invalid")
return api_token
多租户&成员管理&角色管理
- 多租户系统的核心模型:
- Tenant(租户):
- 代表一个独立的工作空间
- 包含基本信息:id、name、plan(计划)、status(状态)等
- 可以配置自定义设置(custom_config)
- Account(账户):
- 代表系统中的用户
- 包含用户基本信息:name、email、password 等
- 可以属于多个租户
-
租户-成员关系管理:
- TenantAccountJoin:
- 实现租户和账户的多对多关系
- 记录用户在租户中的角色(role)
- 记录邀请人(invited_by)
- 通过 unique_tenant_account_join 约束确保一个用户在同一租户中只有一个角色
-
角色权限体系(TenantAccountRole):
class TenantAccountRole(enum.StrEnum):
OWNER = "owner" # 租户所有者
ADMIN = "admin" # 管理员
EDITOR = "editor" # 编辑者
NORMAL = "normal" # 普通成员
DATASET_OPERATOR = "dataset_operator" # 数据集操作员
每个角色都有特定的权限:
- is_privileged_role: 判断是否为特权角色(owner 或 admin)
- is_editing_role: 判断是否有编辑权限
- is_dataset_edit_role: 判断是否有数据集编辑权限
- 租户状态管理:
class TenantStatus(enum.StrEnum):
NORMAL = "normal" # 正常状态
ARCHIVE = "archive" # 归档状态
- 成员管理功能:
- 租户可以通过 get_accounts() 方法获取所有成员
- 用户可以通过 current_tenant 属性访问当前所在的租户
- 用户可以通过 current_role 属性获取在当前租户中的角色
- 提供了一系列角色判断方法:
- is_admin_or_owner
- is_admin
- is_editor
- is_dataset_editor
- is_dataset_operator
角色定义在代码中硬编码了,需要修改角色定义的地方方可实现,以下是一种思路实现,可能还有遗漏处:
- 在TenantAccountRole枚举类中添加新角色:
class TenantAccountRole(enum.StrEnum):
OWNER = "owner"
ADMIN = "admin"
EDITOR = "editor"
NORMAL = "normal"
DATASET_OPERATOR = "dataset_operator"
# 添加新角色,例如:
VIEWER = "viewer" # 只读角色
- 更新角色验证方法: 在 TenantAccountRole 类中的 is_valid_role 方法中添加新角色:
@staticmethod
def is_valid_role(role: str) -> bool:
return role in {
TenantAccountRole.OWNER,
TenantAccountRole.ADMIN,
TenantAccountRole.EDITOR,
TenantAccountRole.NORMAL,
TenantAccountRole.DATASET_OPERATOR,
TenantAccountRole.VIEWER, # 添加新角色
}
- 定义新角色的权限: 根据新角色的权限需求,在 TenantAccountRole 类中添加或修改相应的权限判断方法:
@staticmethod
def is_viewing_role(role: str) -> bool:
return role in {
TenantAccountRole.OWNER,
TenantAccountRole.ADMIN,
TenantAccountRole.EDITOR,
TenantAccountRole.NORMAL,
TenantAccountRole.VIEWER
}
- 在 Account 类中添加角色判断属性:
@property
def is_viewer(self):
return self._current_tenant.current_role == TenantAccountRole.VIEWER
- 同步更新 TenantAccountJoinRole 枚举
class TenantAccountJoinRole(enum.Enum):
OWNER = "owner"
ADMIN = "admin"
NORMAL = "normal"
DATASET_OPERATOR = "dataset_operator"
VIEWER = "viewer" # 添加新角色
- 数据库迁移: 由于角色是以字符串形式存储在数据库中,所以不需要修改数据库结构。但是建议添加数据库约束来确保角色值的有效性:
# 在 TenantAccountJoin 模型中添加角色约束
role = db.Column(db.String(16),
nullable=False,
server_default="normal",
db.CheckConstraint("role IN ('owner', 'admin', 'normal', 'dataset_operator', 'viewer')"))
更新相关的权限检查逻辑
LLM管理
Dify中LLM管理系统的具体实现过程:
- 基础架构设计
# 基础模型类
class AIModel:
model_type: ModelType # 模型类型(LLM/Embedding等)
# LLM基类
class LargeLanguageModel(AIModel):
model_type = ModelType.LLM
def invoke(self, model: str, credentials: dict, prompt_messages: list[PromptMessage],
model_parameters: dict = None, tools: list[PromptMessageTool] = None,
stop: list[str] = None, stream: bool = True, user: str = None) -> Union[LLMResult, Generator]:
"""调用LLM模型的核心方法"""
pass
def get_parameter_rules(self, model: str, credentials: dict) -> list[ParameterRule]:
"""获取模型参数规则"""
pass
def get_model_mode(self, model: str, credentials: dict = None) -> LLMMode:
"""获取模型模式(对话/补全)"""
pass
- 模型实例管理
class ModelInstance:
def __init__(self):
self.tenant_id: str # 租户ID
self.model_type: ModelType # 模型类型
self.provider: str # 提供商
self.model: str # 模型名称
self.credentials: dict # 认证信息
self.model_type_instance: Union[LargeLanguageModel, TextEmbeddingModel] # 具体模型实例
def get_llm_num_tokens(self, prompt_messages: list[PromptMessage],
tools: list[PromptMessageTool] = None) -> int:
"""计算token数量"""
pass
def invoke_text_embedding(self, texts: list[str], user: str = None,
input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT) -> TextEmbeddingResult:
"""调用文本嵌入"""
pass
- 模型管理器实现
class ModelManager:
def get_model_instance(self, tenant_id: str, model_type: ModelType,
provider: str, model: str) -> ModelInstance:
"""获取模型实例"""
# 1. 获取提供商配置
provider_configurations = self._get_provider_configurations(tenant_id)
# 2. 验证模型是否存在
if provider not in provider_configurations:
raise ValueError(f"Provider {provider} does not exist")
# 3. 创建模型实例
model_instance = ModelInstance(
tenant_id=tenant_id,
model_type=model_type,
provider=provider,
model=model
)
return model_instance
def invoke_llm(self, model_instance: ModelInstance,
prompt_messages: list[PromptMessage],
stream: bool = True,
user: str = None) -> Union[LLMResult, Generator]:
"""调用LLM模型"""
pass
- 具体LLM提供商实现
# OpenAI实现
class OpenAILargeLanguageModel(LargeLanguageModel):
def _invoke(self, model: str, credentials: dict,
prompt_messages: list[PromptMessage],
model_parameters: dict,
tools: list[PromptMessageTool] = None,
stop: list[str] = None,
stream: bool = True,
user: str = None) -> Union[LLMResult, Generator]:
"""实现OpenAI模型调用"""
# 1. 处理认证信息
client = OpenAI(**credentials)
# 2. 转换消息格式
messages = self._convert_messages(prompt_messages)
# 3. 调用API
response = client.chat.completions.create(
model=model,
messages=messages,
stream=stream,
**model_parameters
)
# 4. 处理响应
return self._handle_response(response, stream)
def remote_models(self, credentials: dict) -> list[AIModelEntity]:
"""获取远程可用模型列表"""
pass
- 模型配置管理
class ModelProperties(BaseModel):
context_size: int # 上下文窗口大小
max_tokens: int # 最大token数
mode: LLMMode # 模型模式
class ModelConfig(BaseModel):
properties: ModelProperties
features: list[ModelFeature] # 模型特性
# 模型配置示例
MODEL_CONFIGS = {
"gpt-4": ModelConfig(
properties=ModelProperties(
context_size=8192,
max_tokens=4096,
mode=LLMMode.CHAT
),
features=[
ModelFeature.AGENT_THOUGHT,
ModelFeature.TOOL_CALL
]
)
}
- 提供商管理
class ProviderManager:
def update_default_model_record(self, tenant_id: str,
model_type: ModelType,
provider: str,
model: str) -> TenantDefaultModel:
"""更新默认模型记录"""
# 1. 验证提供商
provider_configurations = self.get_configurations(tenant_id)
if provider not in provider_configurations:
raise ValueError(f"Provider {provider} does not exist")
# 2. 验证模型
available_models = provider_configurations.get_models(
model_type=model_type,
only_active=True
)
# 3. 更新数据库记录
default_model = TenantDefaultModel.query.filter(
tenant_id=tenant_id,
model_type=model_type
).first()
if default_model:
default_model.provider_name = provider
default_model.model_name = model
else:
default_model = TenantDefaultModel(
tenant_id=tenant_id,
model_type=model_type,
provider_name=provider,
model_name=model
)
db.session.commit()
return default_model
模型调用流程示例:
# 1. 创建模型实例
model_instance = model_manager.get_model_instance(
tenant_id="tenant_123",
model_type=ModelType.LLM,
provider="openai",
model="gpt-4"
)
# 2. 准备提示消息
messages = [
SystemPromptMessage(content="You are a helpful assistant"),
UserPromptMessage(content="Tell me about LLMs")
]
# 3. 设置模型参数
model_parameters = {
"temperature": 0.7,
"max_tokens": 1000
}
# 4. 调用模型
try:
response = model_instance.model_type_instance.invoke(
model="gpt-4",
credentials=model_instance.credentials,
prompt_messages=messages,
model_parameters=model_parameters,
stream=True
)
# 5. 处理流式响应
for chunk in response:
yield chunk.delta.content
except Exception as e:
logger.error(f"Model invocation failed: {str(e)}")
raise
对象存储管理
Dify中的对象存储系统主要用于以下几个方面:
- 文件上传管理
class FileService:
@staticmethod
def upload_file(
filename: str,
content: bytes,
mimetype: str,
user: Union[Account, EndUser, Any],
source: Literal["datasets"] | None = None,
source_url: str = "",
) -> UploadFile:
主要用于:
- 数据集文档上传
- 图片文件存储
- 其他用户上传的文件
- 存储实现支持多种对象存储服务:
class StorageType(StrEnum):
ALIYUN_OSS = "aliyun-oss" # 阿里云OSS
AZURE_BLOB = "azure-blob" # Azure Blob存储
BAIDU_OBS = "baidu-obs" # 百度对象存储
GOOGLE_STORAGE = "google-storage" # Google Cloud Storage
HUAWEI_OBS = "huawei-obs" # 华为对象存储
LOCAL = "local" # 本地存储
S3 = "s3" # AWS S3
TENCENT_COS = "tencent-cos" # 腾讯云COS
# ...等
- 基础存储接口定义:
class BaseStorage(ABC):
@abstractmethod
def save(self, filename, data):
"""保存文件"""
@abstractmethod
def load_once(self, filename: str) -> bytes:
"""一次性加载文件"""
@abstractmethod
def load_stream(self, filename: str) -> Generator:
"""流式加载文件"""
@abstractmethod
def download(self, filename, target_filepath):
"""下载文件"""
@abstractmethod
def exists(self, filename):
"""检查文件是否存在"""
@abstractmethod
def delete(self, filename):
"""删除文件"""
- 具体实现示例(以S3为例):
class AwsS3Storage(BaseStorage):
def __init__(self):
self.bucket_name = dify_config.S3_BUCKET_NAME
self.client = boto3.client(
"s3",
aws_secret_access_key=dify_config.S3_SECRET_KEY,
aws_access_key_id=dify_config.S3_ACCESS_KEY,
endpoint_url=dify_config.S3_ENDPOINT,
region_name=dify_config.S3_REGION
)
def save(self, filename, data):
self.client.put_object(
Bucket=self.bucket_name,
Key=filename,
Body=data
)
- 文件上传限制:
class FileApi(Resource):
def get(self):
return {
"file_size_limit": dify_config.UPLOAD_FILE_SIZE_LIMIT,
"batch_count_limit": dify_config.UPLOAD_FILE_BATCH_LIMIT,
"image_file_size_limit": dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT,
"video_file_size_limit": dify_config.UPLOAD_VIDEO_FILE_SIZE_LIMIT,
"audio_file_size_limit": dify_config.UPLOAD_AUDIO_FILE_SIZE_LIMIT
}
- 文件数据模型:
class UploadFile(db.Model):
id: Mapped[str]
tenant_id: Mapped[str] # 租户ID
storage_type: Mapped[str] # 存储类型
key: Mapped[str] # 存储键
name: Mapped[str] # 文件名
size: Mapped[int] # 文件大小
extension: Mapped[str] # 扩展名
mime_type: Mapped[str] # MIME类型
created_by: Mapped[str] # 创建者
created_at: Mapped[datetime] # 创建时间
向量数据库管理
Dify中向量数据库管理的实现:
- 整体架构设计:
- 采用工厂模式和策略模式,通过 AbstractVectorFactory 抽象工厂类和 BaseVector 基类来管理不同的向量数据库实现
- 支持多种向量数据库,包括:PGVector、MyScale、Qdrant、ElasticSearch、OceanBase、TiDB等
- 统一的向量操作接口,包括:创建、添加、搜索、删除等基本操作
- 核心组件:
# 基类定义向量数据库的基本接口
class BaseVector(ABC):
@abstractmethod
def create(self, texts, embeddings, **kwargs) # 创建集合
@abstractmethod
def add_texts(self, documents, embeddings, **kwargs) # 添加文档
@abstractmethod
def search_by_vector(self, query_vector, **kwargs) # 向量搜索
@abstractmethod
def search_by_full_text(self, query, **kwargs) # 全文搜索
@abstractmethod
def delete(self) # 删除集合
主要功能实现: a) 向量集合管理:
- 创建集合时自动创建表结构和索引
- 支持设置维度、距离计算方式等参数
- 使用Redis锁避免并发创建问题
b) 文档管理:
- 支持批量添加文档和向量
- 文档包含文本内容、元数据、向量等信息
- 支持文档去重和ID管理
c) 向量检索:
- 支持KNN近邻搜索
- 可配置top_k、score_threshold等参数
- 支持不同的距离计算方式(cosine、l2等)
d) 全文检索:
- 部分实现支持全文搜索功能
- 可与向量检索结合使用
Chatbot应用类型
五种应用类型覆盖了从简单到复杂的不同使用场景,为用户提供了灵活的选择。其中:
- Chatbot和Completion是基础应用类型
- Agent提供了更智能的自主决策能力
- Chatflow和Workflow则提供了更高级的编排和自定义能力
介绍应用类型之前,了解下消息队列管理(所有类型共用),核心实现类:MessageBasedAppQueueManager
主要实现:
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
message = MessageQueueMessage(
task_id=self._task_id,
message_id=self._message_id,
conversation_id=self._conversation_id,
app_mode=self._app_mode,
event=event,
)
self._q.put(message)
- 消息事件管理
- 队列操作封装
- 支持多种事件类型
- 错误处理机制
Chatbot应用类型核心实现类:ChatAppGenerator
,具体工作流程
- 初始化阶段
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
):
# 验证必要参数
if not args.get("query"):
raise ValueError("query is required")
# 获取会话信息
conversation = None
if args.get("conversation_id"):
conversation = self._get_conversation_by_user(...)
# 加载应用模型配置
app_model_config = self._get_app_model_config(...)
- 生成过程
- 处理文件上传配置
- 转换为应用配置(ChatAppConfigManager)
- 初始化追踪管理器(TraceQueueManager)
- 创建生成实体(ChatAppGenerateEntity)
- 消息处理
- 使用 MessageBasedAppQueueManager管理消息队列
- 支持流式响应和阻塞响应
- 通过线程处理生成任务
- 错误处理
- 处理验证错误(ValidationError)
- 处理授权错误(InvokeAuthorizationError)
- 处理任务停止错误(GenerateTaskStoppedError)
Agent应用类型 (Agent Chat)
核心实现类:AgentChatAppGenerator
- 初始化阶段
def generate(
self,
*,
app_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
):
# 验证流式响应
if not streaming:
raise ValueError("Agent Chat App does not support blocking mode")
# 验证必要参数
if not args.get("query"):
raise ValueError("query is required")
# 获取会话信息
conversation = None
if args.get("conversation_id"):
conversation = self._get_conversation_by_user(...)
- 代理配置过程
- 初始化代理配置(AgentChatAppConfigManager)
- 配置文件处理能力
- 设置工具使用权限
- 创建代理生成实体(AgentChatAppGenerateEntity)
- 任务执行
- 初始化追踪管理器(TraceQueueManager)
- 创建工作线程处理生成任务
- 支持工具调用和结果处理
- 维护代理状态和上下文
- 错误处理
- 处理验证错误(ValidationError)
- 处理授权错误(InvokeAuthorizationError)
- 处理任务中断(GenerateTaskStoppedError)
Chatflow应用类型 (Advanced Chat)
核心实现类:AdvancedChatAppGenerator
- 初始化阶段
def generate(
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
):
# 验证工作流
if not workflow:
raise ValueError("workflow is required")
# 验证输入
if not args.get("query"):
raise ValueError("query is required")
# 初始化上下文
contexts.tenant_id.set(app_model.tenant_id)
- 工作流配置
- 加载工作流配置
- 设置节点执行顺序
- 配置变量和参数
- 创建工作流生成实体(AdvancedChatAppGenerateEntity)
- 执行流程
- 初始化工作流运行环境
- 按序执行节点任务
- 处理节点间数据传递
- 维护执行状态和上下文
- 错误处理
- 处理节点执行错误
- 处理工作流中断
- 支持部分成功状态
Workflow应用类型
核心实现类:WorkflowAppGenerator
- 初始化阶段
def generate(
self,
app_model: App,
workflow_config: dict,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
):
# 验证工作流配置
if not workflow_config:
raise ValueError("workflow_config is required")
# 初始化工作流状态
workflow_state = WorkflowState(workflow_config)
- 工作流编排
- 解析工作流配置
- 构建执行图
- 设置数据流向
- 创建工作流执行实体
- 执行管理
- 使用 WorkflowAppQueueManager 管理执行队列
- 处理节点执行结果
- 支持条件分支和循环
- 维护执行状态
- 错误处理
- 处理节点失败
- 支持部分成功(QueueWorkflowPartialSuccessEvent)
- 处理完全失败(QueueWorkflowFailedEvent)
- 支持成功完成(QueueWorkflowSucceededEvent)
文本生成应用类型 (Completion)
核心实现类:CompletionAppGenerator
- 初始化阶段
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
):
# 验证输入文本
if not args.get("prompt"):
raise ValueError("prompt is required")
# 加载模型配置
model_config = self._get_model_config(app_model)
- 生成配置
- 设置模型参数
- 配置输出格式
- 设置生成约束
- 创建生成实体(CompletionGenerateEntity)
- 执行过程
- 调用模型生成
- 处理生成结果
- 格式化输出
- 保存生成历史
- 错误处理
- 处理模型调用错误
- 处理格式验证错误
- 处理生成限制错误
注:文本生成应用类型(Completion)已被标记为废弃,不建议在新项目中使用。建议使用 Chatflow 或 Workflow 类型替代。
如何添加一个内置工具
- 代码参考实现:https://github.com/langgenius/dify/pull/7991
- 官方指引:https://docs.dify.ai/guides/tools/quick-tool-integration
如何添加一个对象存储实现
代码参考实现:https://github.com/langgenius/dify/pull/8164
如何添加一个向量数据库
代码参考实现:https://github.com/langgenius/dify/pull/9287
如何添加一个LLM Model Provider
- 代码参考实现:https://github.com/langgenius/dify/pull/8428
- 官方指引:https://docs.dify.ai/guides/model-configuration/new-provider
如何添加一个Embedding Model Provider
- 代码参考实现:https://github.com/langgenius/dify/pull/8728
- 官方指引:https://docs.dify.ai/guides/model-configuration/predefined-model
参考链接
- https://github.com/langgenius/dify/blob/main/README_CN.md
- 如何在本地源码启动dify
- Dify Enterprise Helm Chart
- Dify Enterprise部署文档
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付
