added dbus code - still need integration

This commit is contained in:
Fredrick W. Warren 2025-01-12 20:34:45 -07:00
parent 818ce4a897
commit 2ff964b1ac

View File

@ -6,11 +6,21 @@ Copyright (C) 2006 Mario Frasca
Licensed under the GNU General Public License.
"""
import asyncio
import signal
import logging
from dbus_next.aio import MessageBus
from dbus_next import Message
from wmdocklib import wmoo as wmoo
line_height = 9
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Application(wmoo.Application):
def __init__(self, *args, **kwargs):
@ -277,9 +287,57 @@ patterns = [
# Define the signal handler outside of any function
def on_received_im_msg(message):
if message.member == 'ReceivedImMsg':
print("ReceivedImMsg signal received with arguments:")
for arg in message.body:
print(f" - {arg}")
async def run_dbus():
"""
Sets up the DBus connection, adds the signal handler, and listens for signals.
"""
# Connect to the session bus
bus = await MessageBus().connect()
# Define the service and object path
service_name = 'im.pidgin.purple.PurpleService'
object_path = '/im/pidgin/purple/PurpleObject'
interface_name = 'im.pidgin.purple.PurpleInterface'
# Add the signal handler to the bus
bus.add_message_handler(on_received_im_msg)
# Construct the match rule for the signal
match_rule = (
f"type='signal',"
f"interface='{interface_name}',"
f"member='ReceivedImMsg'"
)
# Create the AddMatch message
add_match_message = Message(
destination='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='AddMatch',
signature='s',
body=[match_rule]
)
# Send the AddMatch message to subscribe to the signal
await bus.call(add_match_message)
# Inform the user that the program is listening for signals
print("Listening for ReceivedImMsg signals...")
# Keep the program running to listen for signals
await asyncio.Future() # Run forever
def main():
async def run_application():
app = Application(font_name='5x8',
margin = 3,
bg=0,
@ -292,7 +350,52 @@ def main():
app.draw_background()
app.draw_all_text()
app.addCallback(app.toggle_backlight, 'buttonrelease', area=(2,2,62,62))
app.run()
await app.run()
async def main():
# Connect to the session bus
bus = await MessageBus().connect()
# Define the service and object path
service_name = 'im.pidgin.purple.PurpleService'
object_path = '/im/pidgin/purple/PurpleObject'
interface_name = 'im.pidgin.purple.PurpleInterface'
# Add the signal handler to the bus
bus.add_message_handler(on_received_im_msg)
# Construct the match rule for the signal
match_rule = (
f"type='signal',"
f"interface='{interface_name}',"
f"member='ReceivedImMsg'"
)
# Create the AddMatch message
add_match_message = Message(
destination='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='AddMatch',
signature='s',
body=[match_rule]
)
# Send the AddMatch message to subscribe to the signal
await bus.call(add_match_message)
# Inform the user that the program is listening for signals
print("Listening for ReceivedImMsg signals...")
# Keep the program running to listen for signals
# await run_application()
await asyncio.Future()
if __name__ == '__main__':
main()
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Program terminated by user.")