from ergpt.kb import AsyncKnowledgeBaseClient, KnowledgeBaseClient API_TOKEN = "your_api_token_here" def sync_example(): client = KnowledgeBaseClient(api_token=API_TOKEN) kb = client.create_knowledge_base( name="Моя база знаний", description="Описание базы знаний", chunk_size=1000, chunk_overlap=200, ) print(f"Создана база знаний: {kb.id}") with open("document.pdf", "rb") as f: doc = client.upload_document(kb_id=kb.id, file=f) print(f"Загружен документ: {doc.id}, статус: {doc.status}") results = client.search( kb_id=kb.id, query="Как настроить авторизацию?", limit=5, score_threshold=0.2, ) for result in results.chunks: print(f"Документ: {result.filename}") for chunk in result.chunks: print(f" [score={chunk.score:.2f}] {chunk.text[:100]}...") chunks = client.get_document_chunks(document_id=doc.id, limit=10) for chunk in chunks.result: print(f"Чанк {chunk.chunk_id}: {chunk.text[:50]}...") client.delete_knowledge_base(kb.id) print("База знаний удалена") client.close() async def async_example(): async with AsyncKnowledgeBaseClient(api_token=API_TOKEN) as client: kb = await client.create_knowledge_base( name="Моя база знаний", description="Описание базы знаний", chunk_size=1000, chunk_overlap=200, ) print(f"Создана база знаний: {kb.id}") with open("document.pdf", "rb") as f: doc = await client.upload_document(kb_id=kb.id, file=f) print(f"Загружен документ: {doc.id}, статус: {doc.status}") results = await client.search( kb_id=kb.id, query="Как настроить авторизацию?", limit=5, score_threshold=0.2, ) for result in results.chunks: print(f"Документ: {result.filename}") for chunk in result.chunks: print(f" [score={chunk.score:.2f}] {chunk.text[:100]}...") chunks = await client.get_document_chunks(document_id=doc.id, limit=10) for chunk in chunks.result: print(f"Чанк {chunk.chunk_id}: {chunk.text[:50]}...") await client.delete_knowledge_base(kb.id) print("База знаний удалена") def list_example(): with KnowledgeBaseClient(api_token=API_TOKEN) as client: kbs = client.list_knowledge_bases(search="проект", current=0, page_size=20) print(f"Всего баз: {kbs.total}") for kb in kbs.result: print(f" {kb.name} ({kb.documents_count} документов)") docs = client.list_documents( kb_id="some-kb-id", statuses=["FAILED"], current=0, page_size=10, ) for doc in docs.result: print(f" {doc.filename}: {doc.status}, ошибка: {doc.error}") def bulk_operations_example(): with KnowledgeBaseClient(api_token=API_TOKEN) as client: client.delete_documents_bulk(["doc-id-1", "doc-id-2", "doc-id-3"]) print("Документы удалены") client.retry_document("failed-doc-id") print("Обработка перезапущена") if __name__ == "__main__": pass