Announcing the Python SDK for CloudQuery Plugin Development

August 17, 2023

Herman Schaaf
Name
Herman Schaaf
Twitter
hermanschaaf

We're excited to announce the first release of a Python SDK for CloudQuery plugin development! This SDK provides a high-level toolkit for developing CloudQuery plugins in Python.

Background

CloudQuery is designed with a pluggable architecture and uses Apache Arrow (opens in a new tab) over gRPC (opens in a new tab) for communication between plugins. Source and destination plugins are independent of one another, and this architecture allows plugins to be written in different languages but still communicate with one another. Until now, we've only provided an SDK for writing plugins in Go, but we're excited to announce that we now have an SDK for writing plugins in Python as well.

cloudquery high-level architecture

Features

The Python SDK provides a number of features to make plugin development easier, and make plugins written in Python performant.

Plugin Server

The most basic functionality provided by the Python SDK is to start a gRPC plugin server that supports all the flags expected by the CloudQuery CLI. This allows you to write a plugin in Python and run it using the same command line interface as any other plugin.

The following example shows how to create a plugin server that runs a plugin called MyPlugin:

import sys
from cloudquery.sdk import serve

from plugin import MyPlugin


def main():
    p = MyPlugin()
    serve.PluginCommand(p).run(sys.argv[1:])


if __name__ == "__main__":
    main()

Here MyPlugin is a class that extends cloudquery.sdk.plugin.Plugin. We'll look at the Plugin class in more detail next.

Plugin Class

A CloudQuery Python source plugin, like MyPlugin above, should subclass cloudquery.sdk.plugin.Plugin and needs to implement three methods: init, get_tables and sync. The init method is called when the plugin is started, and is where you can do any initialization work. The get_tables method should return a list of tables that the plugin supports. The sync method is called when a table needs to be synced, and is where the SDK scheduler can be used to manage the syncing of all the supported tables.

Multi-threaded Scheduler

The scheduler's main responsibilities are to manage concurrent execution of requests and the order in which tables are synced to avoid dependency issues. It also place limits on the number of concurrent requests and memory usage.

To invoke the scheduler, the sync method of a plugin should pass a list of its tables and options to the scheduler. The scheduler will take care of the rest. Here is an example from the Typeform plugin (opens in a new tab):

from typing import Generator

from cloudquery.sdk import message
from cloudquery.sdk import plugin
from cloudquery.sdk.scheduler import TableResolver

# ...

class TypeformPlugin(plugin.Plugin):
# ...
    def sync(
        self, options: plugin.SyncOptions
    ) -> Generator[message.SyncMessage, None, None]:
        resolvers: list[TableResolver] = []
        for table in self.get_tables(
            plugin.TableOptions(
                tables=options.tables,
                skip_tables=options.skip_tables,
                skip_dependent_tables=options.skip_dependent_tables,
            )
        ):
            resolvers.append(table.resolver)
        return self._scheduler.sync(
            self._client, resolvers, options.deterministic_cq_id
        )

Apache Arrow-based Type System with Custom Types

Table columns are defined using the Apache Arrow type system, a powerful and flexible way to define data types. CloudQuery destinations support almost all Arrow types, and the Python SDK provides support for a few additional types, such as UUID, IP address and JSON. For example, here is the definition of the typeform_forms table from the Typeform plugin, that uses string, timestamp and JSON types:

import pyarrow as pa
from cloudquery.sdk.schema import Column
from cloudquery.sdk.schema import Table
from cloudquery.sdk.types import JSONType

from plugin.tables.form_responses import FormResponses

class Forms(Table):
    def __init__(self) -> None:
        super().__init__(
            name="typeform_forms",
            title="Typeform Forms",
            columns=[
                Column("id", pa.string(), primary_key=True),
                Column("created_at", pa.timestamp(unit="s")),
                Column("last_updated_at", pa.timestamp(unit="s")),
                Column("self", JSONType()),
                Column("type", pa.string()),
                Column("settings", JSONType()),
                Column("theme", JSONType()),
                Column("title", pa.string()),
                Column("_links", JSONType()),
            ],
            relations=[FormResponses()],
        )

(Browse the full code on GitHub (opens in a new tab))

OpenAPI Transformer

The Python SDK introduces a transformer that can convert fields from an OpenAPI specification into CloudQuery-compatible table schemas. This can greatly reduce the manual work needed to develop plugins for APIs that have an OpenAPI spec. Check out the Square plugin source code (opens in a new tab) for a great example of this in action.

Docker for Cross-Platform Distribution

To support cross-platform packaging of Python plugins (and other languages in the future) in v3.12.0 we introduced a new docker registry type to the CloudQuery CLI. Where Go-based plugins are downloaded as binaries from GitHub releases, Python plugins are downloaded as Docker images from the specified Docker registry. This allows CloudQuery to support multiple platforms, and also makes it easier to distribute plugins that have dependencies on external libraries.

Getting Started

If you're keen to dive right in and develop a source plugin in Python, check out the Python Plugin Development Guide and sample plugins on GitHub:

We will also be adding more documentation and examples in the coming weeks, so stay tuned!

Future Work

Work is already underway to add SDKs for more languages. We won't spoil the surprise here, but we're excited to share more details soon. Be sure to follow us on Twitter (opens in a new tab) or subscribe to our newsletter below 👇 to get the latest updates.

The first release of the Python SDK only officially supports source plugins. Writing a destination plugin in Python is possible using the low-level gRPC APIs, but is not yet officially supported by the Python SDK.

Feedback

We'd love to hear your feedback on the Python SDK. If you have any questions, comments, or suggestions, please feel free to reach out to us on Discord (opens in a new tab) or GitHub (opens in a new tab).