Writing a DBus Service

For those who don’t know DBus is a great tool for sharing information between processes inside a Unix system (local only AFAIK). DBus allows the creation of simple processes that communicate with others, so it makes an ideal solution for creating daemons that need to interact with others.

Ok here is a small server:

#! /bin/python

# based on http://dbus.freedesktop.org/doc/dbus-python/doc/tutorial.html

import dbus, dbus.service

BUS_NAME="com.example.DBusService"

class DBusService(dbus.service.Object):
    def __init__(self, object_path=None, bus=None):
        self.bus = bus or dbus.SessionBus()
        self.object_path = object_path or "/dbusservice" 
        self.name = dbus.service.BusName(BUS_NAME, self.bus)
        DBusService.instance = self
        dbus.service.Object.__init__(self, self.bus, self.object_path)
        print "service ready"

    @dbus.service.method(dbus_interface=BUS_NAME,
                         in_signature='v', out_signature='s')
    def toString(self, variant):
        return str(variant)
    
    @dbus.service.method(dbus_interface=BUS_NAME,
                         in_signature='', out_signature='s',
                         sender_keyword='sender')
    def SayHello(self, sender=None):
        return 'Hello, %s!' % sender
    
    @dbus.service.method(dbus_interface=BUS_NAME,
                         in_signature='s', out_signature='')
    def SendAMessage(self, text):
        self.YouHaveAMessage(text)
    
    @dbus.service.signal(dbus_interface=BUS_NAME,
                         signature='s')
    def YouHaveAMessage(self, text):
        print "YouHaveAMessage %s" % text
    
if __name__ == '__main__':
    import gobject
    from dbus.mainloop.glib import DBusGMainLoop
    
    loop = gobject.MainLoop()
    DBusGMainLoop(set_as_default=True)
    
    manager = DBusService()
    loop.run()

And here is a small client that will show how to interface with the previous server:

#! /bin/python

if __name__ == '__main__':
    import dbus, gobject
    
    def sendAMessage():
        print "sending a message"
        iface.SendAMessage("Hello You")
        return False
    
    def callback(*args, **kwargs):
        loop.quit()
        print "callback", args, kwargs
    
    bus = dbus.SessionBus()
    
    proxy = bus.get_object('com.example.DBusService', '/dbusservice')
    iface = dbus.Interface(proxy, dbus_interface='com.example.DBusService')
    
    print iface.toString(1234)
    
    print iface.SayHello()
    
    print "sending a message"
    iface.SendAMessage("hello you")
    
    loop = gobject.MainLoop()
    iface.connect_to_signal("YouHaveAMessage", callback)
    gobject.timeout_add(10, sendAMessage)
    loop.run()
Advertisements
This entry was posted in dbus, python. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s