Extending Connectors

Airweave’s connector system is designed to be extensible, allowing you to create custom source and destination connectors for your specific needs.

After you’ve cloned or forked the Airweave repository, you can extend the platform by adding your own to the repo. Airweave will automatically pick up your new sources and destinations.

We are working on making this easier in the near future. The new Airweave CLI + connector SDK will allow you to add new sources and destinations without having to clone the repository.

The general platform module structure is as follows:

backend/app/platform
├── sources/
│ ├── slack.py
│ └── ghibli.py # See example implementation below
├── entities/
│ ├── slack.py
│ └── ghibli.py # See example implementation below
└── destinations/
└── weaviate.py
└── weaviate_native.py
└── pinecone.py
└── custom.py # See example implementation below
... # Other platform modules

Source Connectors

Source connectors define how data is extracted from various sources and converted into entities for processing.

Here is an example of the Slack connector implementation.

Slack source connector
1@source("Slack", "slack", AuthType.oauth2)
2class SlackSource(BaseSource):
3 """Slack source implementation.
4
5 This connector retrieves data from Slack such as Channels, Users, and Messages,
6 then yields them as entities using their respective Slack entity schemas.
7 """
8
9 @classmethod
10 async def create(cls, access_token: str) -> "SlackSource":
11 """Create a new Slack source instance."""
12 instance = cls()
13 instance.access_token = access_token
14 return instance

Creating a Source Connector

After you’ve cloned or forked the Airweave repository, you can create a new source connector by creating a new file in the app/platform/sources directory.

To get a 3rd party connector working, you need to implement the following:

  • Source class that extends BaseSource and is decorated with @source.
  • generate_entities: This method should yield Entity objects. These can be anything from a JSON object to a PDF document.
  • create: This method should return a SourceConnector instance.
  • Accompanying Entity schemas. You can find the Slack example here. This helps Airweave understand how to process the data, and standardize it across your syncs.

If you’re using OAuth2 and you would like Airweave to handle the OAuth2 flow for you, you need to implement the create so that it takes in an access token and add your integration to the dev.integrations.yaml file. We’re working on making this easier in the near future, for now we recommend reaching out to us!

Here’s an example of a custom source connector implementation. In this example, we’re fetching data from the Studio Ghibli API and yielding it as entities. These entities are then used to process into your destinations.

Custom source connector implementation
1from typing import AsyncGenerator
2import httpx
3
4from app.platform.entities._base import ChunkEntity
5from app.platform.decorators import source
6from app.platform.sources._base import BaseSource
7from app.platform.auth.schemas import AuthType
8
9@source("Studio Ghibli", "ghibli", AuthType.none)
10class GhibliSource(BaseSource):
11 """Studio Ghibli source implementation.
12
13 This connector fetches film data from the Studio Ghibli API and yields it as entities.
14 Each entity represents a Ghibli film with its details.
15 """
16
17 BASE_URL = "https://ghibli.rest/films"
18
19 @classmethod
20 async def create(cls) -> "GhibliSource":
21 """Create a new Ghibli source instance."""
22 return cls()
23
24 async def generate_entities(self) -> AsyncGenerator[ChunkEntity, None]:
25 """Generate entities from the Ghibli API.
26
27 Each entity contains information about a Studio Ghibli film.
28 """
29 async with httpx.AsyncClient() as client:
30 response = await client.get(self.BASE_URL)
31 response.raise_for_status()
32 films = response.json()
33
34 for film in films:
35 yield ChunkEntity(
36 source_name="ghibli",
37 entity_id=film["id"],
38 content=film["description"],
39 metadata={
40 "title": film["title"],
41 "original_title": film["original_title"],
42 "director": film["director"],
43 "release_date": film["release_date"],
44 "running_time": film["running_time"],
45 "rt_score": film["rt_score"]
46 }
47 )

Source Connector Features

Your source connector can implement additional features:

1class CustomSourceConnector(SourceConnector):
2 async def validate_config(self):
3 """
4 Validate the connector configuration
5 """
6 if not self.config.api_key:
7 raise ValueError("API key is required")
8
9 async def test_connection(self):
10 """
11 Test the connection to your data source
12 """
13 try:
14 await self.fetch_data(limit=1)
15 return True
16 except Exception as e:
17 raise ConnectionError(f"Failed to connect: {str(e)}")
18
19 async def get_schema(self):
20 """
21 Return the schema of your data source
22 """
23 return {
24 "type": "object",
25 "properties": {
26 "content": {"type": "string"},
27 "metadata": {
28 "type": "object",
29 "properties": {
30 "id": {"type": "string"},
31 "type": {"type": "string"}
32 }
33 }
34 }
35 }

Destination Connectors

Destination connectors handle how processed entities are stored in your target system.

Creating a Destination Connector

1from airweave.connectors import DestinationConnector
2from airweave.types import Entity, DestinationConfig
3
4class CustomDestinationConnector(DestinationConnector):
5 def __init__(self, config: DestinationConfig):
6 super().__init__(config)
7 self.config = config
8
9 async def store_entities(self, entities: list[Entity]):
10 """
11 Store entities in your destination system
12 """
13 # Your custom storage logic here
14 for entity in entities:
15 await self.store_entity(entity)
16
17 async def store_entity(self, entity: Entity):
18 """
19 Store a single entity
20 """
21 # Example implementation
22 pass

Destination Connector Features

Implement additional features for your destination connector:

1class CustomDestinationConnector(DestinationConnector):
2 async def initialize(self):
3 """
4 Initialize your destination system
5 """
6 # Example: Create necessary indexes
7 pass
8
9 async def cleanup(self):
10 """
11 Clean up resources
12 """
13 # Example: Close connections
14 pass
15
16 async def query(self, query: str, filter: dict = None):
17 """
18 Implement custom query logic
19 """
20 # Your custom query implementation
21 pass

Example Source Implementation

1# sources/api/rest.py
2from airweave.connectors import SourceConnector
3from airweave.types import Entity
4
5class RestApiSource(SourceConnector):
6 async def generate_entities(self):
7 async with aiohttp.ClientSession() as session:
8 async with session.get(self.config.url) as response:
9 data = await response.json()
10
11 for item in data:
12 yield Entity(
13 content=item["content"],
14 metadata={
15 "source": "rest-api",
16 "id": item["id"]
17 }
18 )

Destinations

The destinations directory contains implementations for different storage systems:

destinations/
├── vector_stores/
│ ├── weaviate.py
│ └── pinecone.py
├── search/
│ └── elasticsearch.py
└── custom/
└── your_implementation.py

Example Destination Implementation

1# destinations/vector_stores/custom_store.py
2from airweave.connectors import DestinationConnector
3
4class CustomVectorStore(DestinationConnector):
5 async def store_entities(self, entities):
6 vectors = await self.get_embeddings(entities)
7
8 async with self.client as client:
9 for entity, vector in zip(entities, vectors):
10 await client.upsert(
11 vector=vector,
12 metadata=entity.metadata
13 )

Best Practices

  1. Error Handling

    • Implement robust error handling
    • Provide clear error messages
    • Use appropriate exception types
  2. Configuration

    • Validate configuration early
    • Use type hints for configuration
    • Document required configuration
  3. Performance

    • Implement batching where appropriate
    • Use async/await for I/O operations
    • Consider rate limiting
  4. Testing

    • Write unit tests for your connector
    • Implement integration tests
    • Test edge cases and error conditions

Next Steps