Skip to content

Avro Integration

Tessera supports Apache Avro schemas for Kafka topics, event streams, and other data assets that use Avro serialization.

Installation

Avro support requires the optional fastavro dependency:

pip install tessera-contracts[avro]

Or with uv:

uv add tessera-contracts[avro]

Without fastavro, Tessera falls back to basic structural validation.

Publishing Avro Contracts

Via API

curl -X POST "$TESSERA_URL/api/v1/assets/{asset_id}/contracts" \
  -H "Authorization: Bearer $TESSERA_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "schema": {
      "type": "record",
      "name": "UserEvent",
      "namespace": "com.example.events",
      "fields": [
        {"name": "id", "type": "string"},
        {"name": "email", "type": "string"},
        {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
      ]
    },
    "schema_format": "avro",
    "compatibility_mode": "backward"
  }'

Via Python SDK

from tessera_sdk import TesseraClient

client = TesseraClient()

# Create asset for Kafka topic
asset = client.assets.create(
    fqn="kafka.events.user_created",
    owner_team_id=team.id,
    resource_type="kafka_topic"
)

# Publish Avro contract
result = client.assets.publish_contract(
    asset_id=asset.id,
    schema={
        "type": "record",
        "name": "UserCreatedEvent",
        "namespace": "com.example.events",
        "fields": [
            {"name": "id", "type": "string"},
            {"name": "email", "type": "string"},
            {"name": "name", "type": ["null", "string"], "default": None}
        ]
    },
    schema_format="avro",
    version="1.0.0"
)

Avro Schema Requirements

Tessera validates that Avro schemas conform to the Avro specification:

Record Types

Records must have: - type: Must be "record" - name: Schema name (required) - fields: Array of field definitions (required)

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

Enum Types

{
  "type": "enum",
  "name": "Status",
  "symbols": ["PENDING", "ACTIVE", "DELETED"]
}

Array Types

{
  "type": "array",
  "items": "string"
}

Map Types

{
  "type": "map",
  "values": "long"
}

Union Types (Optional Fields)

{
  "name": "middle_name",
  "type": ["null", "string"],
  "default": null
}

Breaking Change Detection

Tessera detects breaking changes in Avro schemas based on the compatibility mode:

Backward Compatibility (Default)

Breaking changes: - Removing a field without a default value - Adding a required field without a default - Changing field type to incompatible type - Removing enum symbols

Compatible changes: - Adding optional fields (with defaults) - Adding new enum symbols - Widening numeric types (int -> long)

Forward Compatibility

Breaking changes: - Adding fields (consumers don't know about them) - Adding enum symbols - Widening types

Compatible changes: - Removing optional fields - Removing enum symbols

Full Compatibility

Breaking if any change affects either readers or writers.

Impact Analysis

Check the impact of schema changes before publishing:

impact = client.assets.check_impact(
    asset_id=asset.id,
    proposed_schema={
        "type": "record",
        "name": "UserCreatedEvent",
        "fields": [
            {"name": "id", "type": "string"},
            # Removed 'email' field - breaking change!
            {"name": "name", "type": ["null", "string"], "default": None}
        ]
    },
    schema_format="avro"
)

if not impact.safe_to_publish:
    print(f"Breaking changes: {impact.breaking_changes}")
    print(f"Affected consumers: {impact.affected_consumers}")

Schema Registry Integration

Tessera can work alongside Confluent Schema Registry or other Avro registries:

import requests

# Fetch schema from registry
response = requests.get(
    f"{SCHEMA_REGISTRY_URL}/subjects/user-events-value/versions/latest"
)
avro_schema = response.json()["schema"]

# Publish to Tessera
client.assets.publish_contract(
    asset_id=asset.id,
    schema=json.loads(avro_schema),
    schema_format="avro",
    version="1.0.0"
)

CI/CD Integration

Validate Avro schemas in your CI pipeline:

# GitHub Actions
- name: Validate Avro Schema
  run: |
    RESPONSE=$(curl -s -X POST "$TESSERA_URL/api/v1/assets/$ASSET_ID/impact" \
      -H "Authorization: Bearer $TESSERA_API_KEY" \
      -H "Content-Type: application/json" \
      -d '{
        "proposed_schema": '"$(cat schema.avsc)"',
        "schema_format": "avro"
      }')

    SAFE=$(echo "$RESPONSE" | jq -r '.safe_to_publish')
    if [ "$SAFE" != "true" ]; then
      echo "Breaking changes detected!"
      echo "$RESPONSE" | jq '.breaking_changes'
      exit 1
    fi