Components

The compoments module gathers a couple of common components that are of use for both agents and clients.


class clacks.common.components.amqp_proxy.AMQPEventConsumer(url, domain='org.clacks', xquery='.', callback=None)

The AMQPEventConsumer can be used to subscribe for events and process them thru a callback. The subscription is done thru XQuery, the callback can be a python method.

Example listening for an event called AsteriskNotification:

>>> from clacks.common.components import AMQPEventConsumer
>>> from lxml import etree
>>>
>>> # Event callback
>>> def process(data):
...     print(etree.tostring(data, pretty_print=True))
>>>
>>> # Create event consumer
>>> consumer = AMQPEventConsumer("amqps://admin:secret@localhost/org.clacks",
...             xquery="""
...                 declare namespace f='http://www.gonicus.de/Events';
...                 let $e := ./f:Event
...                 return $e/f:AsteriskNotification
...             """,
...             callback=process)

The consumer will start right away, listening for your events.

Parameter Description
url URL used to connect to the AMQP service broker
domain If the domain is not already encoded in the URL, it can be specified here.
xquery XQuery string to query for events.
callback Python method to be called if the event happened.

Note

The AMQP URL consists of these parts:

(amqp|amqps)://user:password@host:port/domain
class clacks.common.components.amqp_proxy.AMQPServiceProxy(serviceURL, serviceAddress=None, serviceName=None, conn=None, worker=None, methods=None)

The AMQPServiceProxy provides a simple way to use clacks RPC services from various clients. Using the proxy object, you can directly call methods without the need to know where it actually gets executed:

>>> from clacks.common.components import AMQPServiceProxy
>>> proxy = AMQPServiceProxy('amqp://admin:secret@localhost/org.clacks')
>>> proxy.getMethods()

This will return a dictionary describing the available methods.

Parameter Description
serviceURL URL used to connect to the AMQP service broker
serviceAddress Address string describing the target queue to bind to, must be skipped if no special queue is needed
serviceName internal
conn internal
worker internal
methods internal

The AMQPService proxy creates a temporary AMQP reply to queue, which is used for command results.

close()

Close the AMQP connection established by the proxy.

class clacks.common.components.amqp_proxy.AMQPStandaloneWorker(connection, s_address=None, r_address=None, workers=0, callback=None)

AMQP standalone worker container. This object creates a number of worker threads for the defined sender and receiver addresses. It registers receiver callbacks for incoming packets.

Parameter Description
connection qpid.messaging.Connection
s_address Address used to create a sender instance
r_address Address used to create a receiver instance
workers Number of worker threads
callback method to be called on incoming messages
join()

Join the worker threads.


class clacks.common.components.amqp.AMQPHandler

This class handles the AMQP connection, incoming and outgoing connections.

checkAuth(user, password)

This function checks a username / password combination using the AMQP service’ SASL configuration.

Parameter Description
user Username
password Password

Return: Bool, success or failure

getConnection()

Returns an AMQP connection handle for further usage.

Return: qpid.messaging.Connection

sendEvent(data, user=None)

Send and validate an event thru AMQP service.

Parameter Description
data XML string or etree object representing the event.

Return: Bool, success or failure

start()

Enable AMQP queueing. This method puts up the event processor and sets it to “active”.

class clacks.common.components.amqp.AMQPProcessor(ssn, callback)

AMQP worker thread. This objects get instantiated by the AMQPWorker class. It is responsible for receiving the messages and calling the callback function.

Parameter Description
ssn AMQP session
callback method to be called when receiving AMQP messages
class clacks.common.components.amqp.AMQPWorker(env, connection, s_address=None, r_address=None, workers=0, callback=None)

AMQP worker container. This object creates a number of worker threads for the defined sender and receiver addresses. It registers receiver callbacks for incoming packets.

Parameter Description
env clacks.common.env.Environment object
connection qpid.messaging.Connection object
s_address address used to create a sender instance
r_address address used to create a receiver instance
workers number of worker threads
callback method to be called on incoming messages

clacks.common.components.cache.cache

Caching decorator basically taken from the Python Wiki.

>>> @cache(ttl=60)
>>> def fibonacci(n):
...    "Return the nth fibonacci number."
...    if n in (0, 1):
...       return n
...    return fibonacci(n-1) + fibonacci(n-2)
>>>
>>> fibonacci(12)
Parameter Description
ttl time to cache results in seconds

clacks.common.components.command.Command(**d_kwargs)

This is the Command decorator. It adds properties based on its parameters to the function attributes:

>>> @Command(needsQueue= False, type= NORMAL)
>>> def hello():
...
Parameter Description
needsQueue indicates if the decorated function needs a queue parameter
needsUser indicates if the decorated function needs a user parameter
type describes the function type

Function types can be:

  • NORMAL (default)

    The decorated function will be called as if it is local. Which node will answer this request is not important.

  • FIRSTRESULT

    Some functionality may be distributed on several nodes with several information. FIRSTRESULT iterates thru all nodes which provide the decorated function and return on first success.

  • CUMULATIVE

    Some functionality may be distributed on several nodes with several information. CUMULATIVE iterates thru all nodes which provide the decorated function and returns the combined result.

exception clacks.common.components.command.CommandInvalid

Exception which is raised when the command is not valid.

exception clacks.common.components.command.CommandNotAuthorized

Exception which is raised when the call was not authorized.


class clacks.common.components.dbus_runner.DBusRunner

The DBusRunner module acts as a singleton for the DBUS system bus. Interested instances can obtain the system bus from the runner.

static get_instance()

Singleton to return a DBusRunner object.

Return: clacks.common.dbus_runner.DBusRunner

get_system_bus()

Return the current DBUS system bus.

Return: DBusRunner bus object

is_active()

Return the current DBUS system bus.

Return: Bool value

start()

Start the gobject.MainLoop() to establish DBUS communications.

stop()

Stop the gobject.MainLoop() to shut down DBUS communications.


class clacks.common.components.jsonrpc_proxy.JSONServiceProxy(serviceURL=None, serviceName=None, opener=None, mode='POST')

The JSONServiceProxy provides a simple way to use clacks RPC services from various clients. Using the proxy object, you can directly call methods without the need to know where it actually gets executed.

Example:

>>> proxy = JSONServiceProxy('https://localhost')
>>> proxy.login("admin", "secret")
>>> proxy.getMethods()
...
>>> proxy.logout()

This will return a dictionary describing the available methods.

Parameter Description
serviceURL URL used to connect to the HTTP service
serviceName internal
opener internal
mode Use POST or GET for communication

The URL format is:

(http|https)://user:password@host:port/rpc

Note

The HTTP service is operated by a clacks-agent instance. This means that you don’t have load balancing and failover out of the box. If you need these features, you should use clacks.common.components.amqp_proxy.AMQPServiceProxy instead.


class clacks.common.components.plugin.Plugin

The Plugin object is just a marker in the moment: it lacks special code. While being a marker, there’s a mandatory class member that must be maintained by users: _target_

The _target_ class member determines what queue the plugin will be registered on. The target queue is assembled by the clacks domain and the _target_. Plugins that do not offer common functionality should register on something else than core.

In this example, we create a sample plugin which is listening on <domain>.sample (domain is org.clacks if you didn’t change the configuration), which makes it possible that only agents having this module installed get related messages:

>>> from clacks.common import Environment
>>> from clacks.common.components import Command, Plugin
>>>
>>> class SampleModule(Plugin):
...     _target_ = 'sample'
...
...     @Command(__help__=N_("Return a pre-defined message to the caller"))
...     def hello(self, name="unknown"):
...         return _("Hello %s!") % name

class clacks.common.components.registry.PluginRegistry(component='agent.module')

Plugin registry class. The registry holds plugin instances and provides overall functionality like “serve” and “shutdown”.

Parameter Description
component What setuptools entrypoint to use when looking for clacks.common.components.plugin.Plugin.
static getInstance(name)

Return an instance of a registered class.

Parameter Description
name name of the class to get instance of
>>> from clacks.common.components import PluginRegistry
>>> cr = PluginRegistry.getInstance("CommandRegistry")
static shutdown()

Call handlers stop() methods in order to grant a clean shutdown.


class clacks.common.components.objects.ObjectRegistry

Object registry class. The registry holds object instances that are currently in use by clients. Objects can be either registered manually using:

>>> from clacks.common.components import ObjectRegistry
>>> ObjectRegistry.register("the.unique.object.oid", ObjectToRegister)

The preferred way to register objects is to use the setuptools section `[object]`:

[object]
the.unique.object.oid = full.path.to.the:ObjectToRegister

In this case, all objects are registered after the agent is fired up automatically.

static getInstance()

Act as a singleton and return an instance of ObjectRegistry.

static register(oid, obj)

Register the given object at the provided OID.


class clacks.common.components.zeroconf_client.ZeroconfClient(regtypes, timeout=2.0, callback=None, domain='local', direct=False)

The ZeroconfClient class helps with browsing for announced services. It creates a separate thread and needs the registrated service to look for as a parameter.

Usage example:

>>> import time
>>> import ZerconfClient
>>>
>>> # This is the function called on changes
>>> def callback(sdRef, flags, interfaceIndex, errorCode, fullname,
...                         hosttarget, port, txtRecord):
...   print('Resolved service:')
...   print('  fullname   =', fullname)
...   print('  hosttarget =', hosttarget)
...   print('  TXT        =', txtRecord)
...   print('  port       =', port)
>>>
>>> # Get instance and tell client to start
>>> z= ZeroconfClient(['_amqps._tcp'], callback=callback)
>>> z.start()
>>>
>>> # Do some sleep until someone presses Ctrl+C
>>> try:
>>>     while True:
>>>         time.sleep(1)
>>> except KeyboardInterrupt:
>>>     # Shutdown client
>>>     z.stop()
>>>     exit()
Parameter Description
regtypes The service list to watch out for - i.e. _amqps._tcp
timeout The timeout in seconds
callback Method to call when we’ve received something
domain optional DNS domain to discover
direct Do not use python DBUS, but the avahi-browse binary

class clacks.common.components.zeroconf.ZeroconfService(name, port, stype='', domain='', host='', text='')

Module to publish our services with zeroconf using avahi.

Parameter Description
name service description
port port which is used by the service
stype service type (i.e. _http._tcp)
domain service type
host hostname to identify where the service runs
text additional descriptive text
publish()

Start publishing the service.

unpublish()

Stop publishing the service.