Copying Data between Faros Graphs

The Faros GraphQL Source for Airbyte is capable of reading data from a Faros graph. The Faros Destination for Airbyte writes to a Faros graph, so the combination of both can be leveraged to copy graphs, even if the Faros Graphs are not using the same schema version (V1 or V2).

The source spec provides some options we can use depending on our use case. Most of them are self-explanatory, but the below ones are worth explaining:

query: An optional query to execute against the input graph. If this parameter is not specified, the source generates queries for all models. The expectation is that when no query is specified, the output of the source contains sufficient information to completely recreate the graph (clone it).

result_model: Where to place each of the query results in their corresponding output records:

  • Nested: { vcs { pullRequests { nodes: [<record>] } } }
  • Flat:{ vcs_PullRequest: <record> }

This parameter controls the output format of the records. "Nested" resembles the structure of the query results against the graph and is mostly intended for when we expect to apply a transformation using a V1 JSONata expression in the destination. "Flat" should be used when we expect the output records to be written by the destination without any transformation (unless the JSONata expression is built with that structure in mind).

Examples

We will use the airbyte-local-cli to copy data between Faros graphs.

Copy a graph into another graph

Note: All combinations of schema versions in source/destination graph versions are supported.

./airbyte-local.sh  \
  --src 'farosai/airbyte-faros-graphql-source' \
  --src.api_url $FAROS_API_URL \
  --src.api_key $FAROS_API_KEY \
  --src.graph 'your-source-graph' \
  --src.graphql_api 'v2' \
  --src.result_model 'Flat' \
  --check-connection \
  --dst 'farosai/airbyte-faros-destination' \
  --dst.edition_configs '{"edition":"cloud", "graph":"your-destination-graph", "graphql_api": "v2"}' \
  --dst.edition_configs.api_url $FAROS_API_URL \
  --dst.edition_configs.api_key $FAROS_API_KEY

Filtering data

You can optionally filter the data you're copying for a subset of models, e.g. org_Team, org_Employee:

  --src.models_filter '["org_Team","org_Employee"]'

Query a V1 graph and transform the results using JSONata

In this example, we query the vcs_Organization table and apply a simple transformation to override the organization name to "Faros AI" when its uid matches "faros-ai".

Query and JSONata expression are exported for easier crafting of the sync command.

export QUERY='''query MyQuery {  
  vcs {  
    organizations {  
      nodes {  
        id  
        uid  
        name  
        htmlUrl  
        type {  
          category  
          detail  
        }  
        source  
        createdAt  
        metadata {  
          refreshedAt  
        }  
      }  
    }  
  }  
}'''

export JSONATA_EXPR="""(  
    data.vcs.organizations.nodes.(  
        {  
            'vcs_Organization': {  
                'id': id,  
                'uid': uid,  
                'name': uid = 'faros-ai' ? 'Faros AI' : name,  
                'htmlUrl': htmlUrl,  
                'type': type,  
                'source': source  
            }  
        }  
    )  
)"""

./airbyte-local.sh \
  --src 'farosai/airbyte-faros-graphql-source' \
  --src.api_url $FAROS_API_URL \
  --src.api_key $FAROS_API_KEY \
  --src.graph 'your-source-graph' \
  --src.graphql_api 'v1' \
  --src.result_model 'Nested' \
  --src.query $QUERY \
  --check-connection \
  --dst 'farosai/airbyte-faros-destination' \
  --dst.edition_configs '{"edition":"cloud", "graph":"your-destination-graph", "graphql_api": "v1"}' \
  --dst.edition_configs.api_url $FAROS_API_URL \
  --dst.edition_configs.api_key $FAROS_API_KEY \
  --dst.jsonata_mode 'OVERRIDE' \
  --dst.jsonata_expression $JSONATA_EXPR \
  --dst.jsonata_destination_models '["vcs_Organization"]'

The JSONata expression can also produce model operation records. For example, the expression below can be used to delete organizations with name "Delete me, INC":

export JSONATA_EXPR="""(  
    data.vcs.organizations.nodes[name = 'Delete me, INC'].(  
        {  
            'vcs_Organization\_\_Deletion': {  
                'where': {'id': id}  
            }  
        }  
    )  
)"""

Another possible use case is to filter out unwanted records. The below expression is used to filter out tms_TaskAssignment(s) for which the assignee has null uid. We filter out (return []) these records and return all other records unchanged. In this case we can use the wildcard "*" to mean "all models".

export JSONATA_EXPR="data.tms_TaskAssignment.assignee.uid = null ? \[] : data"

./airbyte-local.sh \
  --src 'farosai/airbyte-faros-graphql-source' \
  --src.api_url $FAROS_API_URL \
  --src.api_key $FAROS_API_KEY \
  --src.graph 'your-source-graph' \
  --src.graphql_api 'v1' \
  --src.result_model 'Flat' \
  --check-connection \
  --dst 'farosai/airbyte-faros-destination' \
  --dst.edition_configs '{"edition":"cloud", "graph":"your-destination-graph", "graphql_api": "v2"}' \
  --dst.edition_configs.api_url $FAROS_API_URL \
  --dst.edition_configs.api_key $FAROS_API_KEY \
  --dst.jsonata_mode 'OVERRIDE' \
  --dst.jsonata_expression $JSONATA_EXPR \
  --dst.jsonata_destination_models '["*"]'

Query a V2 graph using a V1 query

Export the gzip compressed, base64 encoded V1 schema into $V1_SCHEMA for ease of use. Same for the V1 query.

export V1_SCHEMA=$(cat v1_schema.gql | gzip | base64)

export QUERY='''query MyQuery {  
  vcs {  
    users {  
      nodes {  
        id  
        name  
        email  
        htmlUrl  
      }  
    }  
  }  
}'''

./airbyte-local.sh \
  --src 'farosai/airbyte-faros-graphql-source' \
  --src.api_url $FAROS_API_URL \
  --src.api_key $FAROS_API_KEY \
  --src.graph 'your-source-graph' \
  --src.graphql_api 'v2'  \
  --src.result_model 'Nested' \
  --src.query $QUERY \
  --src.adapt_v1_query 'true' \
  --src.legacy_v1_schema $V1_SCHEMA \
  --src-only

Using buckets to speed up the copy

One can split the read into N buckets (1 by default), allowing N concurrent source/destination pairs to speed up graph copying. Make sure to use a different path for the state of each bucket.

Note: Concurrent writes do not work for V1 graphs, so only use buckets if you intend only to split the read and do the writes sequentially.

Example for bucket 1 of 3 buckets total:

./airbyte-local.sh \
  --src 'farosai/airbyte-faros-graphql-source' \
  --src.api_url $FAROS_API_URL \
  --src.api_key $FAROS_API_KEY \
  --src.graph 'your-source-graph' \
  --src.graphql_api 'v2' \
  --src.result_model 'Flat' \
  --src.bucket_id 1 \
  --src.bucket_total 3 \
  --check-connection \
  --dst 'farosai/airbyte-faros-destination' \
  --dst.edition_configs '{"edition":"cloud", "graph":"your-destination-graph", "graphql_api": "v2"}' \
  --dst.edition_configs.api_url $FAROS_API_URL \
  --dst.edition_configs.api_key $FAROS_API_KEY \
  --state bucket-1-of-3.state.json

Replacing the origin of the copied records

By default, the destination preserves the origin of the copied records. That is, if the records emitted by the Faros GraphQL source have an origin field, its value is kept as is when copied to the destination graph.

This behavior can be turned off like so:

--dst.accept_input_records_origin false

In this case, the copied records will use an origin created by the Airbyte local CLI or, if present, a user provided origin via --dst.origin.

When accept_input_records_origin is enabled, it is also possible to replace certain origins and keep others untouched. For example:

--dst.replace_origin_map '"{ \"originA\": \"originB\" }"'

In this case, records emitted by the source with origin originA will be copied with origin originB to the destination graph. All other origins are kept as is.