Complete Implementation Examples

Here are complete code examples showing the entire workflow, including document upload, processing, and status monitoring.

import requests
import time

class InvaroAPI:
    def __init__(self, api_key):
        self.base_url = "https://api.invaro.ai/api/v1"
        self.headers = {"Authorization": f"Bearer {api_key}"}

    def upload_document(self, file_path):
        """Upload a document and return its document_id"""
        url = f"{self.base_url}/parse/upload"
        with open(file_path, "rb") as f:
            files = {"files": f}
            response = requests.post(url, headers=self.headers, files=files)
            return response.json()["data"]["files"][0]["doc_id"]

    def process_document(self, document_id, doc_type="statements"):
        """Start processing a document"""
        url = f"{self.base_url}/parse/{doc_type}"
        data = {"document_id": document_id}
        response = requests.post(url, headers=self.headers, json=data)
        return response.json()["data"]["job_id"]

    def process_batch(self, document_ids, doc_type="statements"):
        """Process multiple documents in batch"""
        url = f"{self.base_url}/parse/{doc_type}/batch"
        data = {
            "files": [{"document_id": doc_id} for doc_id in document_ids]
        }
        response = requests.post(url, headers=self.headers, json=data)
        return response.json()["data"]

    def check_status(self, job_id, doc_type="statements"):
        """Check the status of a processing job"""
        url = f"{self.base_url}/parse/{doc_type}/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def process_with_polling(self, file_path, doc_type="statements"):
        """Process a single document with status polling"""
        # Upload
        document_id = self.upload_document(file_path)
        print(f"Document uploaded: {document_id}")

        # Start processing
        job_id = self.process_document(document_id, doc_type)
        print(f"Processing started: {job_id}")

        # Poll for results
        while True:
            result = self.check_status(job_id, doc_type)
            status = result["data"]["status"]
            print(f"Status: {status}")

            if status == "completed":
                return result
            elif status == "failed":
                raise Exception("Processing failed")

            time.sleep(5)  # Wait 5 seconds before next check

    def process_batch_with_polling(self, file_paths, doc_type="statements"):
        """Process multiple documents with status polling"""
        # Upload all documents
        document_ids = [self.upload_document(path) for path in file_paths]
        print(f"Documents uploaded: {document_ids}")

        # Start batch processing
        batch_result = self.process_batch(document_ids, doc_type)
        print(f"Batch processing started: {batch_result['batch_id']}")

        # Poll for results of each job
        results = []
        for job_id in batch_result["job_ids"]:
            while True:
                result = self.check_status(job_id, doc_type)
                status = result["data"]["status"]
                print(f"Job {job_id} status: {status}")

                if status == "completed":
                    results.append(result)
                    break
                elif status == "failed":
                    raise Exception(f"Processing failed for job {job_id}")

                time.sleep(5)

        return results

# Usage example
def main():
    api = InvaroAPI("your_api_key")
    
    # Process single document
    result = api.process_with_polling("statement.pdf", "statements")
    print("Single document result:", result)
    
    # Process multiple documents
    files = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
    results = api.process_batch_with_polling(files, "statements")
    print("Batch processing results:", results)

if __name__ == "__main__":
    main()

Features Implemented

These examples implement:

  1. Document upload
  2. Individual document processing
  3. Batch processing (up to 10 documents)
  4. Status monitoring with polling
  5. Error handling
  6. Progress tracking

Best Practices Implemented

The code examples follow these best practices:

  1. Proper Error Handling - All API calls are wrapped in try-catch blocks
  2. Status Polling - Implements recommended polling intervals
  3. Batch Processing - Efficiently handles multiple documents
  4. Progress Tracking - Provides status updates during processing
  5. Resource Cleanup - Properly closes file handles (Python)

Next Steps