Announcing the Python SDK for CloudQuery Plugin Development
August 17, 2023
We're excited to announce the first release of a Python SDK for CloudQuery plugin development! This SDK provides a high-level toolkit for developing CloudQuery plugins in Python.
Background
CloudQuery is designed with a pluggable architecture and uses Apache Arrow (opens in a new tab) over gRPC (opens in a new tab) for communication between plugins. Source and destination plugins are independent of one another, and this architecture allows plugins to be written in different languages but still communicate with one another. Until now, we've only provided an SDK for writing plugins in Go, but we're excited to announce that we now have an SDK for writing plugins in Python as well.
Features
The Python SDK provides a number of features to make plugin development easier, and make plugins written in Python performant.
Plugin Server
The most basic functionality provided by the Python SDK is to start a gRPC plugin server that supports all the flags expected by the CloudQuery CLI. This allows you to write a plugin in Python and run it using the same command line interface as any other plugin.
The following example shows how to create a plugin server that runs a plugin called MyPlugin
:
import sys
from cloudquery.sdk import serve
from plugin import MyPlugin
def main():
p = MyPlugin()
serve.PluginCommand(p).run(sys.argv[1:])
if __name__ == "__main__":
main()
Here MyPlugin
is a class that extends cloudquery.sdk.plugin.Plugin
. We'll look at the Plugin
class in more detail next.
Plugin Class
A CloudQuery Python source plugin, like MyPlugin
above, should subclass cloudquery.sdk.plugin.Plugin
and needs to implement three methods: init
, get_tables
and sync
. The init
method is called when the plugin is started, and is where you can do any initialization work. The get_tables
method should return a list of tables that the plugin supports. The sync
method is called when a table needs to be synced, and is where the SDK scheduler can be used to manage the syncing of all the supported tables.
Multi-threaded Scheduler
The scheduler's main responsibilities are to manage concurrent execution of requests and the order in which tables are synced to avoid dependency issues. It also place limits on the number of concurrent requests and memory usage.
To invoke the scheduler, the sync
method of a plugin should pass a list of its tables and options to the scheduler. The scheduler will take care of the rest. Here is an example from the Typeform plugin (opens in a new tab):
from typing import Generator
from cloudquery.sdk import message
from cloudquery.sdk import plugin
from cloudquery.sdk.scheduler import TableResolver
# ...
class TypeformPlugin(plugin.Plugin):
# ...
def sync(
self, options: plugin.SyncOptions
) -> Generator[message.SyncMessage, None, None]:
resolvers: list[TableResolver] = []
for table in self.get_tables(
plugin.TableOptions(
tables=options.tables,
skip_tables=options.skip_tables,
skip_dependent_tables=options.skip_dependent_tables,
)
):
resolvers.append(table.resolver)
return self._scheduler.sync(
self._client, resolvers, options.deterministic_cq_id
)
Apache Arrow-based Type System with Custom Types
Table columns are defined using the Apache Arrow type system, a powerful and flexible way to define data types. CloudQuery destinations support almost all Arrow types, and the Python SDK provides support for a few additional types, such as UUID, IP address and JSON. For example, here is the definition of the typeform_forms
table from the Typeform plugin, that uses string
, timestamp
and JSON
types:
import pyarrow as pa
from cloudquery.sdk.schema import Column
from cloudquery.sdk.schema import Table
from cloudquery.sdk.types import JSONType
from plugin.tables.form_responses import FormResponses
class Forms(Table):
def __init__(self) -> None:
super().__init__(
name="typeform_forms",
title="Typeform Forms",
columns=[
Column("id", pa.string(), primary_key=True),
Column("created_at", pa.timestamp(unit="s")),
Column("last_updated_at", pa.timestamp(unit="s")),
Column("self", JSONType()),
Column("type", pa.string()),
Column("settings", JSONType()),
Column("theme", JSONType()),
Column("title", pa.string()),
Column("_links", JSONType()),
],
relations=[FormResponses()],
)
(Browse the full code on GitHub (opens in a new tab))
OpenAPI Transformer
The Python SDK introduces a transformer that can convert fields from an OpenAPI specification into CloudQuery-compatible table schemas. This can greatly reduce the manual work needed to develop plugins for APIs that have an OpenAPI spec. Check out the Square plugin source code (opens in a new tab) for a great example of this in action.
Docker for Cross-Platform Distribution
To support cross-platform packaging of Python plugins (and other languages in the future) in v3.12.0
we introduced a new docker
registry type to the CloudQuery CLI. Where Go-based plugins are downloaded as binaries from GitHub releases, Python plugins are downloaded as Docker images from the specified Docker registry. This allows CloudQuery to support multiple platforms, and also makes it easier to distribute plugins that have dependencies on external libraries.
Getting Started
If you're keen to dive right in and develop a source plugin in Python, check out the Python Plugin Development Guide and sample plugins on GitHub:
- Python SDK on PyPI (opens in a new tab)
- Square plugin (opens in a new tab)
- Typeform plugin (opens in a new tab)
We will also be adding more documentation and examples in the coming weeks, so stay tuned!
Future Work
Work is already underway to add SDKs for more languages. We won't spoil the surprise here, but we're excited to share more details soon. Be sure to follow us on Twitter (opens in a new tab) or subscribe to our newsletter below 👇 to get the latest updates.
The first release of the Python SDK only officially supports source plugins. Writing a destination plugin in Python is possible using the low-level gRPC APIs, but is not yet officially supported by the Python SDK.
Feedback
We'd love to hear your feedback on the Python SDK. If you have any questions, comments, or suggestions, please feel free to reach out to us on Discord (opens in a new tab) or GitHub (opens in a new tab).