import requests
import time
class InvaroAPI:
def __init__(self, api_key):
self.base_url = "https://api.invaro.ai/api/v1"
self.headers = {"Authorization": f"Bearer {api_key}"}
def upload_document(self, file_path):
"""Upload a document and return its document_id"""
url = f"{self.base_url}/parse/upload"
with open(file_path, "rb") as f:
files = {"files": f}
response = requests.post(url, headers=self.headers, files=files)
return response.json()["data"]["files"][0]["doc_id"]
def process_document(self, document_id, doc_type="statements"):
"""Start processing a document"""
url = f"{self.base_url}/parse/{doc_type}"
data = {"document_id": document_id}
response = requests.post(url, headers=self.headers, json=data)
return response.json()["data"]["job_id"]
def process_batch(self, document_ids, doc_type="statements"):
"""Process multiple documents in batch"""
url = f"{self.base_url}/parse/{doc_type}/batch"
data = {
"files": [{"document_id": doc_id} for doc_id in document_ids]
}
response = requests.post(url, headers=self.headers, json=data)
return response.json()["data"]
def check_status(self, job_id, doc_type="statements"):
"""Check the status of a processing job"""
url = f"{self.base_url}/parse/{doc_type}/{job_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def process_with_polling(self, file_path, doc_type="statements"):
"""Process a single document with status polling"""
document_id = self.upload_document(file_path)
print(f"Document uploaded: {document_id}")
job_id = self.process_document(document_id, doc_type)
print(f"Processing started: {job_id}")
while True:
result = self.check_status(job_id, doc_type)
status = result["data"]["status"]
print(f"Status: {status}")
if status == "completed":
return result
elif status == "failed":
raise Exception("Processing failed")
time.sleep(5)
def process_batch_with_polling(self, file_paths, doc_type="statements"):
"""Process multiple documents with status polling"""
document_ids = [self.upload_document(path) for path in file_paths]
print(f"Documents uploaded: {document_ids}")
batch_result = self.process_batch(document_ids, doc_type)
print(f"Batch processing started: {batch_result['batch_id']}")
results = []
for job_id in batch_result["job_ids"]:
while True:
result = self.check_status(job_id, doc_type)
status = result["data"]["status"]
print(f"Job {job_id} status: {status}")
if status == "completed":
results.append(result)
break
elif status == "failed":
raise Exception(f"Processing failed for job {job_id}")
time.sleep(5)
return results
def main():
api = InvaroAPI("your_api_key")
result = api.process_with_polling("statement.pdf", "statements")
print("Single document result:", result)
files = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
results = api.process_batch_with_polling(files, "statements")
print("Batch processing results:", results)
if __name__ == "__main__":
main()