Encrypting with Megolm¶
Room events are encrypted using Megolm sessions. We have two types of Megolm sessions: one for encrypting events, which we refer to as an outbound session; and one for decrypting events (including events we sent ourselves), which we refer to as an inbound session. We will create classes to manage these sessions. Let us start with the outbound session.
src/matrixlib/megolm.py:¶# {{copyright}}
"""Megolm-related functionality"""
import asyncio
from base64 import b64decode, b64encode
import json
import logging
import os
import time
import typing
import sys
import vodozemac
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from .client import Client
from . import client
from . import devices
from . import error
from . import events
from . import olm
from . import pubsub
from . import rooms
from . import schema
MEGOLM_ALGORITHM = "m.megolm.v1.aes-sha2"
{{megolm module classes}}
class OutboundMegolmSession:
"""Manages a Megolm session for decrypting"""
{{OutboundMegolmSession class methods}}
Each Megolm session is tied to a room, and we will need the room ID when we encrypt a message, so we will pass it into our initialization function. Part of the behaviour of the Megolm session will be determined by the current room state, so we will also need a room state tracker. We also need a device tracker so that we can get the recipient devices, and a device keys manager so that we can get our own device keys.
We will allow for the possibility of creating a brand new session or loading an
existing one from the storage, for example, after restarting the client. We
will do this by optionally specifying the ID of the Megolm session to the
initialization function, and loading the session (along with associated data)
from storage. As with the vodozemac Account, we will store the vodozemac
Megolm session encrypted in storage, so we will allow passing in a key. For
convenience, if no key is passed in, we use the same key as the key used in the
device keys manager.
We will be storing some data alongside the Megolm session. We will create the initial values for that data when we discuss the individual items. Some of the data will be encrypted, which we will do using AES-GCM and the same key as the pickle key for vodozemac, so we create an object that will allow us to do that.
As well, we will create a lock to ensure that we don’t have conflicts when manipulating our session data.
def __init__(
self,
c: Client,
room_id: str,
room_state_tracker: rooms.RoomStateTracker,
device_tracker: devices.DeviceTracker,
device_keys_manager: devices.DeviceKeysManager,
key: typing.Optional[bytes] = None,
session_id: typing.Optional[str] = None,
):
"""
Arguments:
``c``:
the client object
``room_id``:
the ID of the room that the session belongs to
``room_state_tracker``:
a ``RoomStateTracker`` object
``device_tracker``:
a ``DeviceTracker`` object
``device_keys_manager``:
a ``DeviceKeysManager`` object
``key``:
a 32-byte binary used to encrypt the objects in storage
``session_id``:
if given, will load a session from storage. If omitted a new session
will be created
"""
self.client = c
self.room_id = room_id
self.room_state_tracker = room_state_tracker
self.device_tracker = device_tracker
self.device_keys_manager = device_keys_manager
self.key = key if key else device_keys_manager.key
if session_id:
self.session_data = c.storage[
f"outbound_megolm_session.{room_id}.{session_id}"
]
self.session = vodozemac.GroupSession.from_pickle(
self.session_data["pickle"], self.key
)
else:
self.session = vodozemac.GroupSession()
self.session_data = {
"pickle": self.session.pickle(self.key),
{{OutboundMegolmSession session data initialization}}
}
self._store_session_data()
self.aesgcm = AESGCM(self.key)
self.lock = asyncio.Lock()
def _store_session_data(self, pickle=False) -> None:
if pickle:
self.session_data["pickle"] = self.session.pickle(self.key)
name = f"outbound_megolm_session.{self.room_id}.{self.session.session_id}"
self.client.storage[name] = self.session_data
Each session has an ID that allows clients to refer to it. We allow the client to access this.
@property
def session_id(self) -> str:
"""The ID of the Megolm session"""
return self.session.session_id
Getting the session key for sending to recipients¶
When we encrypt a message, we will need to send the current state of the Megolm
session (referred to as the “session key”) to all the recipient devices that
have not already received this Megolm session. We create a function that will
return a list of devices that we need to send the Megolm session to, along with
the data to send to them, in the form of the contents of an m.room_key
event, to be sent to those devices. The
m.room_key event should be sent to the devices encrypted. We will discuss
how this is done in a later section. FIXME: link For now, we will assume that
the event is magically securely teleported to the recipient.
async def get_session_key_for_sending(self) -> list[typing.Tuple[dict, dict]]:
"""Get the devices that we need to send the Megolm session, along with the
data to send.
Returns a list of tuples. The first item in the tuple is the device key
to send to, and the second item in the tuple is the contents of an
``m.room_key`` event to send to the device. The event should be sent
encrypted.
"""
async with self.lock:
{{check if Megolm session is expired}}
{{get Megolm session key}}
{{get devices to send Megolm session to}}
A single Megolm session can be used to encrypt multiple messages, but there are
limits to this. After a while, the Megolm session should be rotated (that is,
replaced with a new session). This is to ensure that if an attacker somehow
obtains a Megolm session, their ability to use it will be limited. This
function will raise an exception if the Megolm session should be rotated. The
expiry parameters are set by the m.room.encryption state event, and Megolm
sessions can expire based on time (the rotation_period_ms property, which
defaults to one week if the not set), or based on the number of messages that
it has encrypted (the rotation_period_msgs property, which defaults to 100
messages).
To check whether the session is expired based on the number of messages
encrypted, we can use the session’s own message_index property, which gives
the number of messages that have been encrypted. To check whether the session
is expired based on time, we will need to store the time at which we created
the session. Note that the message_index starts counting from zero, and we
call get_session_key_for_sending before we encrypt, so we raise the exception
if the message_index is greater or equal to rotation_period_msgs, rather
than strictly greater than.
Todo
allow the application to specify maximums for rotation period parameters
"creation_time": time.time_ns() / 1000,
encryption_state = self.room_state_tracker.get_state(
self.room_id, "m.room.encryption"
)
if encryption_state == None:
raise RuntimeError("Room is not encrypted")
encryption_state = typing.cast(events.StateEvent, encryption_state)
rotation_period_ms = encryption_state.content.get(
"rotation_period_ms", 7 * 24 * 60 * 60 * 1000
)
rotation_period_msgs = encryption_state.content.get(
"rotation_period_msgs", 100
)
if (
rotation_period_msgs <= self.session.message_index
or time.time_ns() / 1000
> self.session_data["creation_time"] + rotation_period_ms
):
raise SessionExpiredException()
class SessionExpiredException(Exception):
"""Indicates that the session has expired"""
pass
We construct the contents of an m.room_key event that includes the session
key from the vodozemac object, along with associated information. This will
form part of our return value, along with the devices that we are sending this
to.
room_key = {
"algorithm": MEGOLM_ALGORITHM,
"room_id": self.room_id,
"session_id": self.session.session_id,
"session_key": self.session.session_key,
}
Due to the nature of a cryptographic ratchet, we only need to send the session
key to devices that have not already received it. Devices that have already
received it will be able to decrypt new messages that are encrypted with it,
while devices that receive the current iteration of the session key will not be
able to decrypt old messages. To avoid sending the session key unnecessarily,
we will keep track of the devices that we have already sent the session to. We
will track this as a dict mapping user ID to device ID to a dict indicating
the status of the device. We will also use this to provide some measure of
fault tolerance. The device status will have the following property:
statusindicates the status of our attempt to share the session key with this device. It can be eithersent, indicating that the key was, as far as we can tell, successfully sent to the device; orpending, indicating that we do not know if the key was successfully sent. A status ofpendingcould be because we have not yet tried to send the key, or we encountered some sort of error when trying to send it.session_keyis the session key to be sent to the device. This property is only present ifstatusispending. This property allows us to re-try sending the key to the device if we had received an error after our first attempt. Since this allows encrypted messages to be read, this will be encrypted so that it is not stored in the clear.
Tradeoff
Storing the session_key property will take up more space in the storage. If
there are many recipient devices, this can add up. Rather than storing the
session key, we can re-create the session key each time, but since the session
may get ratcheted to a higher index, this means that the recipient may not be
able to decrypt some messages.
Initially, this map of devices that we have sent the key to will be the empty
dict.
"sent_to_devices": {},
To determine the devices that we need to send to, we first get the room
membership of users. The users who are allowed to read the message are
indicated by the m.room.history_visibility state event. If it is set to
world_readable, shared, or invited, or if it is unset, users who are
invited are allowed to read the messages, so we will send the session to any
users who are joined or invited (users whose current m.room.member event has
a membership of join or invite). If it is set to joined, then only
users who are joined to the room are allowed to read the messages, so we only
send the session to users who are joined.
Once we know which users to share the session with, we query the device tracker to find the device keys for the devices in the room.
history_visibility = self.room_state_tracker.get_state(
self.room_id,
"m.room.history_visibility",
)
if (
history_visibility
and history_visibility.content.get("history_visibility") == "join"
):
allowed_membership = ["join"]
else:
allowed_membership = ["join", "invite"]
member_events = self.room_state_tracker.get_all_state_for_type(
self.room_id,
"m.room.member",
)
members = [
user_id
for user_id, event in member_events.items()
if event.content.get("membership") in allowed_membership
]
recipient_keys = await self.device_tracker.get_device_keys(members)
We remove our own device, since we do not need to send the session to
ourselves. We will need to create an inbound Megolm session to decrypt our
own messages, but we will do that directly from our outbound Megolm session,
rather than sending ourselves an m.room_key message.
own_key_info = recipient_keys.get(typing.cast(str, self.client.user_id))
if own_key_info:
own_devices = own_key_info.device_keys
own_devices.pop(typing.cast(str, self.client.device_id), None)
if own_devices == {}:
del recipient_keys[typing.cast(str, self.client.user_id)]
Todo
also need to provide app a way to filter recipient devices (via a callback), e.g. so it can block recipients, only send to verified devices, etc.
If we have already sent the session to a device that is no longer in the room, then we cannot use the session any more, as that device will be able to decrypt new messages encrypted using the session. So we check if any devices that we’ve sent the session to are not present in the set of recipient devices. Note that we need to check devices rather than users. Even if the user owning the device is still in the room, we need to treat any removed devices a potentially being compromised.
Note that for this part, we count devices that were marked as pending as if
the session was sent to them: even though the application may not yet have
succeeded in sending the session to those devices, it may sent it later. So
for the purposes of ensuring the messages stay secret among the group members,
we will treat them as if they have already received the session.
for user_id, device_info in self.session_data["sent_to_devices"].items():
if user_id not in recipient_keys:
raise SessionExpiredException()
recipient_device_keys = recipient_keys.get(
user_id, devices.UserDeviceKeysResult({})
).device_keys
for device_id in device_info.keys():
if device_id not in recipient_device_keys:
raise SessionExpiredException()
We can now find any devices that are in the room, but that we have not sent the session to yet: these are the devices that we will need to send the session key to. We will also mark those devices as pending.
For each device, if they were already marked as pending, we will return the session key that should have previously been sent to it. Otherwise, we send the latest session key, which we got above.
devices_to_send_to = []
for user_id, key_info in recipient_keys.items():
sent_to_devices = self.session_data["sent_to_devices"].setdefault(
user_id, {}
)
for device_id, device_key in key_info.device_keys.items():
sent_info = sent_to_devices.get(device_id, {})
status = sent_info.get("status")
if status == "pending":
[nonce, encrypted_key] = sent_info["session_key"]
devices_to_send_to.append(
(
device_key,
{
"algorithm": MEGOLM_ALGORITHM,
"room_id": self.room_id,
"session_id": self.session.session_id,
"session_key": self.aesgcm.decrypt(
b64decode(nonce), b64decode(encrypted_key), None
).decode(),
},
)
)
elif status != "sent":
devices_to_send_to.append(
(
device_key,
room_key,
)
)
nonce = os.urandom(12)
sent_to_devices[device_id] = {
"status": "pending",
"session_key": [
b64encode(nonce).decode(),
b64encode(
self.aesgcm.encrypt(
nonce, room_key["session_key"].encode(), None
)
).decode(),
],
}
# Note: if storing things takes a long time, we could check if we actually made
# any changes before saving the session data
self._store_session_data()
return devices_to_send_to
The application now has messages that it can send to recipient devices. After the application has sent these to the recipients, we need to mark them as being sent, so we create a function to do so.
def mark_as_sent(self, device_keys: typing.Iterable[dict]) -> None:
"""Indicate that the session has been sent to the given devices.
Arguments:
``device_keys``:
an interable of devicec keys
"""
for device_key in device_keys:
if "user_id" in device_key and "device_id" in device_key:
sent_to_devices = self.session_data["sent_to_devices"].setdefault(
device_key["user_id"], {}
)
sent_to_devices[device_key["device_id"]] = {"status": "sent"}
self._store_session_data()
Tests
tests/test_megolm.py:¶# {{copyright}}
import asyncio
import aioresponses
import json
import pytest
import re
import typing
from unittest.mock import Mock
import unittest.mock as mock
import urllib
import vodozemac
from matrixlib import client
from matrixlib.client import Client
from matrixlib import devices
from matrixlib import events
from matrixlib import error
from matrixlib import megolm
from matrixlib import olm
from matrixlib import rooms
{{megolm test constants}}
{{megolm test utility functions}}
{{test megolm}}
We will several pieces of data that we use in several tests, so we define constants to avoid repeating the data
ROOM_ENCRYPTION_EVENT = {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
},
}
ALICE_ROOM_MEMBERSHIP = {
"room_id": "!room_id",
"type": "m.room.member",
"state_key": "@alice:example.org",
"sender": "@alice:example.org",
"event_id": "$alice_event",
"origin_server_ts": 1234567890123,
"content": {"membership": "join"},
}
BOB_ROOM_MEMBERSHIP = {
"room_id": "!room_id",
"type": "m.room.member",
"state_key": "@bob:example.org",
"sender": "@bob:example.org",
"event_id": "$bob_event",
"origin_server_ts": 1234567890123,
"content": {"membership": "join"},
}
ALICE_DEVICE_KEY = {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "ABCDEFG",
"keys": {
"curve25519:ABCDEFG": "some+key",
},
"user_id": "@alice:example.org",
}
BOB_DEVICE_KEY = {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "HIJKLMN",
"keys": {
"curve25519:HIJKLMN": "some+other+key",
},
"user_id": "@bob:example.org",
}
BOB_DEVICE_KEY2 = {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "OPQRSTU",
"keys": {
"curve25519:OPQRSTU": "yet+another+key",
},
"user_id": "@bob:example.org",
}
Todo
add test for history_visibilty
We test that we can generate a session key to send to another device. Until we get to implementing encryption and decryption, we will not be able to test that the session key works correctly, but we can test that we get the devices in the room (including when devices are added and leave), and that we can mark devices as having had the key sent to them.
@pytest.mark.asyncio
async def test_megolm_get_session_key(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
assert bob_device_key == {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "HIJKLMN",
"keys": {
"curve25519:HIJKLMN": "some+other+key",
},
"user_id": "@bob:example.org",
}
[
(bob_device_key_again, bob_room_key_again)
] = await outbound_session.get_session_key_for_sending()
assert bob_device_key_again == bob_device_key
assert bob_room_key_again == bob_room_key
outbound_session.mark_as_sent([bob_device_key])
assert await outbound_session.get_session_key_for_sending() == []
We can also test that the outbound session detects when it needs to be rotated. First we test that it detects that it needs to be rotated based on time. We do this by setting the rotation period to 5ms, creating a session, waiting for 10ms, and then trying to get the session key.
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_time(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
"rotation_period_ms": 5,
},
},
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key rotation by time}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await asyncio.sleep(0.01) # sleep for 10ms to make sure session has expired
with pytest.raises(megolm.SessionExpiredException):
await outbound_session.get_session_key_for_sending()
Next we test that it detects that it needs to rotate when a user or device leaves. In this situation, Bob starts with two devices. When one device logs out, the session needs to be rotated. We then create a new session. When Bob leaves the room completely, the new session will also need to be rotated.
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_membership(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
"OPQRSTU": BOB_DEVICE_KEY2,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key rotation by membership}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await outbound_session.get_session_key_for_sending()
# simulate Bob logging out a device
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/query",
status=200,
body=json.dumps(
{
"device_keys": {
"@bob:example.org": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
}
),
headers={
"Content-Type": "application/json",
},
)
await c.publisher.publish(client.DeviceChanges(["@bob:example.org"], []))
with pytest.raises(megolm.SessionExpiredException):
await outbound_session.get_session_key_for_sending()
outbound_session2 = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await outbound_session2.get_session_key_for_sending()
# simulate Bob leaving
await c.publisher.publish(
client.RoomTimelineUpdates(
"!room_id",
[
events.StateEvent(
room_id="!room_id",
type="m.room.member",
state_key="@bob:example.org",
sender="@bob:example.org",
content={
"membership": "leave",
},
event_id="$bob_leave_event",
origin_server_ts=1234567890123,
)
],
False,
"",
)
)
with pytest.raises(megolm.SessionExpiredException):
await outbound_session2.get_session_key_for_sending()
Encrypting¶
We can now create a method to encrypt an
event using our Megolm session. To
do this, we construct a dict containing the room ID, event type, and event
contents; serialize it as JSON; encrypt it; and include the resulting
ciphertext in a new object that includes other data. This object will be the
event contents for an m.room.encrypted event that we send to the room.
Note that the sender_key and device_id properties in the m.room.encrypted
event are deprecated: we include them in events that we send, for compatibility
with older clients, but we should tolerate receiving events that do not have
them.
def encrypt(self, event_type: str, content: dict) -> dict:
"""Encrypt an event
Arguments:
``event_type``:
the type of the event (e.g. ``m.room.message``)
``content``:
the event ``content``
Returns the ``content`` of a ``m.room.encrypted`` event
"""
plaintext = json.dumps(
{
"room_id": self.room_id,
"type": event_type,
"content": content,
}
)
ciphertext = self.session.encrypt(plaintext)
self._store_session_data(True)
return {
"algorithm": MEGOLM_ALGORITHM,
"sender_key": self.device_keys_manager.identity_key,
"device_id": self.client.device_id,
"session_id": self.session.session_id,
"ciphertext": ciphertext,
}
Tests
Now that we can encrypt, we can test that we detect that the session needs rotating based on the number of messages that it has encrypted. We set it to require rotation after two messages, ensure that we can encrypt two messages, and ensure that we get an error when we try to get the session key for the third encryption.
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_number(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
"rotation_period_msgs": 2,
},
},
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key rotation by number}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await outbound_session.get_session_key_for_sending()
outbound_session.encrypt("m.room.message", {"body": "one"})
await outbound_session.get_session_key_for_sending()
outbound_session.encrypt("m.room.message", {"body": "two"})
with pytest.raises(megolm.SessionExpiredException):
await outbound_session.get_session_key_for_sending()
Decrypting¶
To decrypt using Megolm, we create a class from which we can instantiate an object using the session key that we got above.
class InboundMegolmSession:
"""Manages a Megolm session for decrypting"""
{{InboundMegolmSession member variables}}
{{InboundMegolmSession class methods}}
There are several ways that we can construct an inbound Megolm session, which
can take different arguments. To avoid confusion, we will using the
initializer function, and create instances of this class using from_* class
methods.
def __init__(self):
"""Do not use initializer function. Use the ``from_*`` methods instead"""
raise RuntimeError("Use the from_* methods instead") # pragma: no cover
One way to create an inbound Megolm session is directly from on outbound Megolm session. This allows us to decrypt our own messages. Note: this should be done before any events are encrypted with the outbound session, otherwise we will not be able to decrypt all messages.
Like inbound sessions, outbound sessions are stored by room ID and session ID. Along with the session, we store the sender’s identity key, as well as whether the session is authenticated, that is, whether we trust that the session comes from the given identity key. In this case, since we are the source of the session and we trust ourselves not to lie to ourselves, we mark it as authenticated.
@classmethod
def from_outbound_session(
cls,
c: Client,
outbound: OutboundMegolmSession,
key: typing.Optional[bytes] = None,
) -> "InboundMegolmSession":
"""Create an ``InboundMegolmSession`` from an ``OutboundMegolmSession``
Arguments:
``c``:
the client object
``outbound``:
the ``OutboundMegolmSession`` to use
``key``:
a 32-byte binary used to encrypt the objects in storage. If not
specified, uses the same key as used by ``outbound``
"""
obj = cls.__new__(cls)
obj.client = c
obj.user_id = typing.cast(str, c.user_id)
obj.room_id = outbound.room_id
obj.key = key if key else outbound.key
obj.session = vodozemac.InboundGroupSession(outbound.session.session_key)
obj.session_data = {
"pickle": obj.session.pickle(obj.key),
"sender_key": outbound.device_keys_manager.identity_key,
"authenticated": True,
"event_ids": {},
}
obj._store_session_data()
return obj
def _store_session_data(self, pickle=False) -> None:
if pickle:
self.session_data["pickle"] = self.session.pickle(self.key)
name = f"inbound_megolm_session.{self.room_id}.{self.user_id}.{self.session.session_id}"
self.client.storage[name] = self.session_data
Since we aren’t using the initializer function, we need to declare our member variables so that the type checker knows about them.
client: Client
user_id: str
room_id: str
key: bytes
session: vodozemac.InboundGroupSession
session_data: dict[str, typing.Any]
We also expose the data about the session so that the application can make use of it. This allows the application to determine whether to trust messages decrypted using the session, depending on the application’s definition of “trust”. We will explore this concept later on. FIXME: link to section.
@property
def sender_key(self) -> str:
"""The identity key of the Megolm session's sender"""
return self.session_data["sender_key"]
@property
def authenticated(self) -> bool:
"""Whether we know that the Megolm session comes from the associated ``sender_key``"""
return self.session_data["authenticated"]
Another way to get an inbound session is via an m.room_key event. In this
case, we need to include the sender’s identity key, which we will get from the
Olm session that we received the event from. (We will see how this happens
when we discuss Olm.) As well, Olm is authenticated, so we set the
authenticated flag to True.
@classmethod
def from_room_key(
cls,
c: Client,
user_id: str,
sender_key: str,
room_key_content: dict,
key: bytes,
) -> "InboundMegolmSession":
"""Create an ``InboundMegolmSession`` from an ``m.room_key`` event
Arguments:
``c``:
the client object
``user_id``:
the user ID of the sender of the ``m.room_key`` event
``sender_key``:
the identity key of the sender of the ``m.room_key`` event
``room_key_content``:
the ``content`` of the ``m.room_key`` event
``key``:
a 32-byte binary used to encrypt the objects in storage
"""
obj = cls.__new__(cls)
if room_key_content["algorithm"] != MEGOLM_ALGORITHM:
raise RuntimeError("Invalid algorithm")
obj.client = c
obj.user_id = user_id
obj.room_id = room_key_content["room_id"]
obj.key = key
obj.session = vodozemac.InboundGroupSession(room_key_content["session_key"])
if obj.session.session_id != room_key_content["session_id"]:
raise RuntimeError("Mismatched session ID")
obj.session_data = {
"pickle": obj.session.pickle(obj.key),
"sender_key": sender_key,
"authenticated": True,
"event_ids": {},
}
obj._store_session_data()
return obj
Todo
if we already have this session in storage, we should only keep the “best” one
We can also load an inbound session from storage. In this case, we will return
None if we cannot find the given session.
@classmethod
def from_storage(
cls,
c: Client,
room_id: str,
user_id: str,
session_id: str,
key: bytes,
) -> typing.Optional["InboundMegolmSession"]:
"""Load a session from storage
Arguments:
``c``:
the client object
``room_id``:
the ID of the room that the session belongs to
``user_id``:
the ID of the user that the session belongs to
``session ID``:
the ID of the Megolm session
``key``:
a 32-byte binary used to encrypt the objects in storage
"""
obj = cls.__new__(cls)
obj.client = c
obj.room_id = room_id
obj.user_id = user_id
obj.key = key
name = f"inbound_megolm_session.{room_id}.{user_id}.{session_id}"
obj.session_data = c.storage.get(name)
if obj.session_data == None:
return None
obj.session = vodozemac.InboundGroupSession.from_pickle(
obj.session_data["pickle"],
obj.key,
)
return obj
There is a fourth way that we can get an inbound session: via a key export. We will discuss this in a later section. FIXME: link to section
Now that we have an inbound Megolm session, we can use it to decrypt an encrypted event. We first check the event contents to make sure that it is in the expected format and that it is a Megolm-encrypted message. We then use the session to decrypt the ciphertext, and then check that the decrypted contents are in the expected format, and belong to the same room as the Megolm session.
You will see in the code that we also check for replay attacks. This will be explained below; you can ignore it for now.
def decrypt(self, event: events.RoomEvent) -> dict:
"""Decrypt an ``m.room.encrypted`` event encrypted with Megolm
Arguments:
``event``:
the encrypted event
Returns the decrypted event, which will be a dict that should have ``type``
(the decrypted event type), ``content`` (the event content), and
``room_id`` (the ID of the room the event was sent to) properties.
"""
# check cleartext
schema.ensure_valid(
event.content,
{
"algorithm": str,
"sender_key": schema.Optional(str),
"device_id": schema.Optional(str),
"session_id": str,
"ciphertext": str,
},
)
if event.content["algorithm"] != MEGOLM_ALGORITHM:
raise RuntimeError("Invalid algorithm")
# decrypt ciphertext
decrypted = self.session.decrypt(event.content["ciphertext"])
plaintext = json.loads(decrypted.plaintext)
{{detect replay attacks}}
self._store_session_data(True)
# check plaintext
schema.ensure_valid(
plaintext,
{
"type": str,
"content": dict,
"room_id": str,
},
)
if plaintext["room_id"] != self.room_id:
raise RuntimeError("Mismatched room ID")
return plaintext
Tradeoff
In this implementation, we store the vodozemac inbound session every time we
decrypt a message (via the call to _store_session_data). Although this is
not strictly necessary (unlike for outbound sessions, where we must save every
time, otherwise it may encrypt multiple messages at the same ratchet index),
vodozemac caches the ratchet state so that decrypting the message with the next
ratchet index will be faster. However, this causes some potential issues.
First of all, if accessing the storage is async (which it likely will be in
most implementations), then the decrypt function will need to be async.
Secondly, if saving the session takes a long time, then we may lose any benefit
we got from caching.
There are several solutions to this. The simplest solutions are to either ignore these problems, as they are not critical, or to not re-save the session at all. Another solution (which only deals with the second problem and not the first) is to not save every time — perhaps save the session every 5 or 10 decryptions. A more complicated solution would be to create a new async task for saving the outbound sessions: something that runs in a separate thread/process/co-routine, depending on your language’s concurrency functionality. When we decrypt a message, we tell the task to store the session. The task can then schedule the storage as needed. For example, it can rate-limit the storage based on time: if we decrypt several messages in the same session in a short amount of time, it can skip some and store only the latest version of the session.
Tests
Now that we can decrypt, we can test more functionality of the outbound session as well.
For this set of tests, we will have 3 users: Alice, Bob, and Carol. Alice will encrypt messages, and we will check which messages Bob and Carol can decrypt.
@pytest.mark.asyncio
async def test_megolm_decryption(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
"device_tracker.cache.@carol:example.org": {
"device_keys": {
"OPQRSTU": {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "OPQRSTU",
"keys": {
"curve25519:OPQRSTU": "yet+another+key",
},
"user_id": "@carol:example.org",
},
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@carol:example.org",
"device_id": "OPQRSTU",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as carol:
{{megolm decryption test}}
First, Alice and Bob are in the room. We test that Alice can encrypt a message with the outbound session, and she can decrypt it using an inbound session created directly from the outbound session, and Bob can decrypt it using a inbound session created from a room key event.
room_state_tracker = rooms.RoomStateTracker(alice)
device_tracker = devices.DeviceTracker(alice)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(alice, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
alice_inbound_session = (
megolm.InboundMegolmSession.from_outbound_session(
alice,
outbound_session,
b"\x00" * 32,
)
)
assert alice_inbound_session.authenticated
assert (
alice_inbound_session.sender_key == device_keys_manager.identity_key
)
bob_inbound_session = megolm.InboundMegolmSession.from_room_key(
bob,
"@alice:example.org",
device_keys_manager.identity_key,
bob_room_key,
b"\x00" * 32,
)
assert bob_inbound_session.authenticated
assert (
bob_inbound_session.sender_key == device_keys_manager.identity_key
)
outbound_session.mark_as_sent([bob_device_key])
encrypted_content1 = outbound_session.encrypt(
"m.room.message",
{"body": "Hello World!", "msgtype": "m.text"},
)
encrypted1 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event1",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content1,
origin_server_ts=1234567890000,
)
assert alice_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
We can also test that Bob can decrypt an event using an inbound session loaded from storage.
bob_loaded_inbound_session = megolm.InboundMegolmSession.from_storage(
bob,
"!room_id",
"@alice:example.org",
encrypted_content1["session_id"],
b"\x00" * 32,
)
assert bob_loaded_inbound_session.authenticated
assert (
bob_loaded_inbound_session.sender_key
== device_keys_manager.identity_key
)
assert bob_loaded_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
We now test that Alice can load the outbound session from storage and encrypt a new message. Carol joins the room, so the encryption key will be shared with a her and we can test that Carol cannot decrypt the previously-sent message, but all three can decrypt the new message.
loaded_outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
outbound_session.session_id,
)
await alice.publisher.publish(
client.RoomTimelineUpdates(
"!room_id",
[
events.StateEvent(
room_id="!room_id",
type="m.room.member",
state_key="@carol:example.org",
sender="@carol:example.org",
content={
"membership": "join",
},
event_id="$carol_event",
origin_server_ts=1234567890123,
)
],
False,
"",
)
)
[
(carol_device_key, carol_room_key)
] = await outbound_session.get_session_key_for_sending()
carol_inbound_session = megolm.InboundMegolmSession.from_room_key(
carol,
"@alice:example.org",
device_keys_manager.identity_key,
carol_room_key,
b"\x00" * 32,
)
outbound_session.mark_as_sent([carol_device_key])
encrypted_content2 = outbound_session.encrypt(
"m.room.message",
{"body": "Bonjour!", "msgtype": "m.text"},
)
encrypted2 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event1",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content2,
origin_server_ts=1234567890000,
)
assert alice_inbound_session.decrypt(encrypted2) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Bonjour!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted2) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Bonjour!", "msgtype": "m.text"},
}
assert carol_inbound_session.decrypt(encrypted2) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Bonjour!", "msgtype": "m.text"},
}
with pytest.raises(vodozemac.MegolmDecryptionException):
carol_inbound_session.decrypt(encrypted1)
Detecting replay attacks¶
When we decrypt events, we must be careful to guard against replay attacks. A replay attack is an attack in which an attacker obtains a previously-sent ciphertext and replays it. Since the message was actually sent by the sender, recipients treat it as authentic, even though the context of the message may be different. The attacker does not need to encrypt a new message; they simply take an existing message and re-send it.
The way that we will detect replay attacks is by recording the event ID of the event decrypted by each ratchet index for a given Megolm session. In Megolm, each ratchet index is used to encrypt a single event, so if we see a ratchet index being used to encrypt multiple events, then we know that a replay attack has occurred (or the sender is buggy).
if decrypted.message_index in self.session_data["event_ids"]:
if (
self.session_data["event_ids"][decrypted.message_index]
!= event.event_id
):
raise RuntimeError("Replay attack detected")
else:
self.session_data["event_ids"][decrypted.message_index] = event.event_id
Note that we do not need to guard against replay attacks in Olm because with Olm, we can only decrypt each event once — after decryption, we ratchet the Olm session forwards so that it can no longer be used to re-decrypt the event. Since events that we encrypt with Olm are keys, which we store after decryption, we do not have a need to re-decrypt them. However, with Megolm, we allow re-decryption in order to allow users to re-read old messages.
Tests
To test this, encrypt an event, and try to decrypt it three times. The first two times, we give it the same event ID and ensure that it encrypts correctly both times. The third time, we give it a different event ID and ensure that we get an error.
@pytest.mark.asyncio
async def test_replay_detection(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
{{replay detection test}}
room_state_tracker = rooms.RoomStateTracker(alice)
device_tracker = devices.DeviceTracker(alice)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(alice, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
bob_inbound_session = megolm.InboundMegolmSession.from_room_key(
bob,
"@alice:example.org",
device_keys_manager.identity_key,
bob_room_key,
b"\x00" * 32,
)
outbound_session.mark_as_sent([bob_device_key])
encrypted_content1 = outbound_session.encrypt(
"m.room.message",
{"body": "Hello World!", "msgtype": "m.text"},
)
encrypted1 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event1",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content1,
origin_server_ts=1234567890000,
)
assert bob_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
encrypted2 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event2",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content1,
origin_server_ts=1234567890123,
)
with pytest.raises(RuntimeError):
bob_inbound_session.decrypt(encrypted2)
Todo
explain what checks we need to do
sender key matches sending user