sdk/examples.py
mrmamongo e641fe2e3c chore: add project config and usage examples
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-04-20 12:08:31 +03:00

102 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from ergpt.kb import AsyncKnowledgeBaseClient, KnowledgeBaseClient
API_TOKEN = "your_api_token_here"
def sync_example():
client = KnowledgeBaseClient(api_token=API_TOKEN)
kb = client.create_knowledge_base(
name="Моя база знаний",
description="Описание базы знаний",
chunk_size=1000,
chunk_overlap=200,
)
print(f"Создана база знаний: {kb.id}")
with open("document.pdf", "rb") as f:
doc = client.upload_document(kb_id=kb.id, file=f)
print(f"Загружен документ: {doc.id}, статус: {doc.status}")
results = client.search(
kb_id=kb.id,
query="Как настроить авторизацию?",
limit=5,
score_threshold=0.2,
)
for result in results.chunks:
print(f"Документ: {result.filename}")
for chunk in result.chunks:
print(f" [score={chunk.score:.2f}] {chunk.text[:100]}...")
chunks = client.get_document_chunks(document_id=doc.id, limit=10)
for chunk in chunks.result:
print(f"Чанк {chunk.chunk_id}: {chunk.text[:50]}...")
client.delete_knowledge_base(kb.id)
print("База знаний удалена")
client.close()
async def async_example():
async with AsyncKnowledgeBaseClient(api_token=API_TOKEN) as client:
kb = await client.create_knowledge_base(
name="Моя база знаний",
description="Описание базы знаний",
chunk_size=1000,
chunk_overlap=200,
)
print(f"Создана база знаний: {kb.id}")
with open("document.pdf", "rb") as f:
doc = await client.upload_document(kb_id=kb.id, file=f)
print(f"Загружен документ: {doc.id}, статус: {doc.status}")
results = await client.search(
kb_id=kb.id,
query="Как настроить авторизацию?",
limit=5,
score_threshold=0.2,
)
for result in results.chunks:
print(f"Документ: {result.filename}")
for chunk in result.chunks:
print(f" [score={chunk.score:.2f}] {chunk.text[:100]}...")
chunks = await client.get_document_chunks(document_id=doc.id, limit=10)
for chunk in chunks.result:
print(f"Чанк {chunk.chunk_id}: {chunk.text[:50]}...")
await client.delete_knowledge_base(kb.id)
print("База знаний удалена")
def list_example():
with KnowledgeBaseClient(api_token=API_TOKEN) as client:
kbs = client.list_knowledge_bases(search="проект", current=0, page_size=20)
print(f"Всего баз: {kbs.total}")
for kb in kbs.result:
print(f" {kb.name} ({kb.documents_count} документов)")
docs = client.list_documents(
kb_id="some-kb-id",
statuses=["FAILED"],
current=0,
page_size=10,
)
for doc in docs.result:
print(f" {doc.filename}: {doc.status}, ошибка: {doc.error}")
def bulk_operations_example():
with KnowledgeBaseClient(api_token=API_TOKEN) as client:
client.delete_documents_bulk(["doc-id-1", "doc-id-2", "doc-id-3"])
print("Документы удалены")
client.retry_document("failed-doc-id")
print("Обработка перезапущена")
if __name__ == "__main__":
pass